Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.2.0 #9

Merged
merged 13 commits into from
Dec 2, 2021
20 changes: 20 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,26 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Added
### Changed
### Deprecated
### Removed
### Fixed
### Security

## [0.2.0]

### Added
- history_json attribute is populated
### Changed
### Deprecated
### Removed
### Fixed
- Fixed bug where VariableInfo equality fails when _FillValue is np.nan
### Security

## [0.1.0]

### Added
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,4 @@ There are comprehensive unit tests for Concise. The tests can be run as follows:
```shell script
poetry run pytest tests/
```

78 changes: 76 additions & 2 deletions podaac/merger/preprocess_worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import json
import os
import queue
from copy import deepcopy
from datetime import datetime, timezone
from multiprocessing import Manager, Process
import queue

import importlib_metadata
import netCDF4 as nc
import numpy as np

Expand Down Expand Up @@ -71,6 +76,54 @@ def merge_metadata(merged_metadata, subset_metadata):
merged_attrs[attr_name] = False # mark as inconsistent


def construct_history(input_files):
"""
Construct history JSON entry for this concatenation operation
https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42

Parameters
----------
input_files : list
List of input files

Returns
-------
dict
History JSON constructed for this concat operation
"""
base_names = list(map(os.path.basename, input_files))
history_json = {
"date_time": datetime.now(tz=timezone.utc).isoformat(),
"derived_from": base_names,
"program": 'concise',
"version": importlib_metadata.distribution('podaac-concise').version,
"parameters": f'input_files={input_files}',
"program_ref": "https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD",
"$schema": "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json"
}
return history_json


def retrieve_history(dataset):
"""
Retrieve history_json field from NetCDF dataset, if it exists

Parameters
----------
dataset : netCDF4.Dataset
NetCDF Dataset representing a single granule

Returns
-------
dict
history_json field
"""
if 'history_json' not in dataset.ncattrs():
return []
history_json = dataset.getncattr('history_json')
return json.loads(history_json)


def _run_single_core(file_list):
"""
Run the granule preprocessing in the current thread/single-core mode
Expand All @@ -90,14 +143,22 @@ def _run_single_core(file_list):
max_dims = {}
var_metadata = {}
group_metadata = {}
history_json = []

for file in file_list:
with nc.Dataset(file, 'r') as dataset:
dataset.set_auto_maskandscale(False)
process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
history_json.extend(retrieve_history(dataset))

group_list.sort() # Ensure insertion order doesn't matter between granules

history_json.append(construct_history(file_list))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
)

return {
'group_list': group_list,
'max_dims': max_dims,
Expand Down Expand Up @@ -156,6 +217,7 @@ def _run_multi_core(file_list, process_count):
max_dims = {}
var_metadata = {}
group_metadata = {}
history_json = []

for result in results:
# The following data should be consistent between granules and
Expand All @@ -176,6 +238,15 @@ def _run_multi_core(file_list, process_count):
merge_metadata(var_metadata, result['var_metadata'])
merge_metadata(group_metadata, result['group_metadata'])

# Merge history_json entries from input files
history_json.extend(result['history_json'])

history_json.append(construct_history(file_list))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
)

return {
'group_list': group_list,
'max_dims': max_dims,
Expand Down Expand Up @@ -207,6 +278,7 @@ def _run_worker(in_queue, results):
var_info = {}
var_metadata = {}
group_metadata = {}
history_json = []

while not in_queue.empty():
try:
Expand All @@ -218,6 +290,7 @@ def _run_worker(in_queue, results):
with nc.Dataset(file, 'r') as dataset:
dataset.set_auto_maskandscale(False)
process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
history_json.extend(retrieve_history(dataset))

group_list.sort() # Ensure insertion order doesn't matter between granules

Expand All @@ -227,7 +300,8 @@ def _run_worker(in_queue, results):
'max_dims': max_dims,
'var_info': var_info,
'var_metadata': var_metadata,
'group_metadata': group_metadata
'group_metadata': group_metadata,
'history_json': history_json
})


Expand Down
6 changes: 5 additions & 1 deletion podaac/merger/variable_info.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Wrapper used to manage variable metadata"""
import numpy as np


class VariableInfo:
Expand Down Expand Up @@ -46,6 +47,9 @@ def __eq__(self, other):
self.dim_order == other.dim_order and
self.datatype == other.datatype and
self.name == other.name and
self.fill_value == other.fill_value and
(
self.fill_value == other.fill_value or
np.array_equal(self.fill_value, other.fill_value, equal_nan=True)
) and
self.group_path == other.group_path
)
Loading