Skip to content

Commit

Permalink
feature/issue-6: History metadata (#7)
Browse files Browse the repository at this point in the history
* history_json attribute is updated

* New test data

* Updated changelog

* Fixed linting warnings
  • Loading branch information
skorper authored Nov 22, 2021
1 parent d935497 commit 62ac5f4
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 23 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]

### Added
- history_json attribute is populated
### Changed
### Deprecated
### Removed
Expand Down
78 changes: 76 additions & 2 deletions podaac/merger/preprocess_worker.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import json
import os
import queue
from copy import deepcopy
from datetime import datetime, timezone
from multiprocessing import Manager, Process
import queue

import importlib_metadata
import netCDF4 as nc
import numpy as np

Expand Down Expand Up @@ -71,6 +76,54 @@ def merge_metadata(merged_metadata, subset_metadata):
merged_attrs[attr_name] = False # mark as inconsistent


def construct_history(input_files):
"""
Construct history JSON entry for this concatenation operation
https://wiki.earthdata.nasa.gov/display/TRT/In-File+Provenance+Metadata+-+TRT-42
Parameters
----------
input_files : list
List of input files
Returns
-------
dict
History JSON constructed for this concat operation
"""
base_names = list(map(os.path.basename, input_files))
history_json = {
"date_time": datetime.now(tz=timezone.utc).isoformat(),
"derived_from": base_names,
"program": 'concise',
"version": importlib_metadata.distribution('podaac-concise').version,
"parameters": f'input_files={input_files}',
"program_ref": "https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD",
"$schema": "https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json"
}
return history_json


def retrieve_history(dataset):
"""
Retrieve history_json field from NetCDF dataset, if it exists
Parameters
----------
dataset : netCDF4.Dataset
NetCDF Dataset representing a single granule
Returns
-------
dict
history_json field
"""
if 'history_json' not in dataset.ncattrs():
return []
history_json = dataset.getncattr('history_json')
return json.loads(history_json)


def _run_single_core(file_list):
"""
Run the granule preprocessing in the current thread/single-core mode
Expand All @@ -90,14 +143,22 @@ def _run_single_core(file_list):
max_dims = {}
var_metadata = {}
group_metadata = {}
history_json = []

for file in file_list:
with nc.Dataset(file, 'r') as dataset:
dataset.set_auto_maskandscale(False)
process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
history_json.extend(retrieve_history(dataset))

group_list.sort() # Ensure insertion order doesn't matter between granules

history_json.append(construct_history(file_list))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
)

return {
'group_list': group_list,
'max_dims': max_dims,
Expand Down Expand Up @@ -156,6 +217,7 @@ def _run_multi_core(file_list, process_count):
max_dims = {}
var_metadata = {}
group_metadata = {}
history_json = []

for result in results:
# The following data should be consistent between granules and
Expand All @@ -176,6 +238,15 @@ def _run_multi_core(file_list, process_count):
merge_metadata(var_metadata, result['var_metadata'])
merge_metadata(group_metadata, result['group_metadata'])

# Merge history_json entries from input files
history_json.extend(result['history_json'])

history_json.append(construct_history(file_list))
group_metadata[group_list[0]]['history_json'] = json.dumps(
history_json,
default=str
)

return {
'group_list': group_list,
'max_dims': max_dims,
Expand Down Expand Up @@ -207,6 +278,7 @@ def _run_worker(in_queue, results):
var_info = {}
var_metadata = {}
group_metadata = {}
history_json = []

while not in_queue.empty():
try:
Expand All @@ -218,6 +290,7 @@ def _run_worker(in_queue, results):
with nc.Dataset(file, 'r') as dataset:
dataset.set_auto_maskandscale(False)
process_groups(dataset, group_list, max_dims, group_metadata, var_metadata, var_info)
history_json.extend(retrieve_history(dataset))

group_list.sort() # Ensure insertion order doesn't matter between granules

Expand All @@ -227,7 +300,8 @@ def _run_worker(in_queue, results):
'max_dims': max_dims,
'var_info': var_info,
'var_metadata': var_metadata,
'group_metadata': group_metadata
'group_metadata': group_metadata,
'history_json': history_json
})


Expand Down
45 changes: 24 additions & 21 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ python = "^3.9"
netCDF4 = "^1.5.6"
numpy = "^1.20.3"
harmony-service-lib = "^1.0.9"
importlib-metadata = "^4.8.1"

[tool.poetry.dev-dependencies]
pytest = "^6.1"
Expand Down
Binary file not shown.
Binary file not shown.
61 changes: 61 additions & 0 deletions tests/test_merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import netCDF4 as nc
import numpy as np
import pytest
import json
import os
import importlib_metadata

from podaac.merger import merge

Expand Down Expand Up @@ -169,3 +172,61 @@ def test_compare_java_single(self):

def test_compare_java_multi(self):
self.run_java_verification('python_merge_multi.nc')

def test_history(self):
data_dir = 'no_groups'
output_name_single = 'test_history_single.nc'
output_name_multi = 'test_history_multi.nc'
data_path = self.__test_data_path.joinpath(data_dir)
input_files = list(data_path.iterdir())

def assert_valid_history(merged_dataset, input_files):
input_files = [os.path.basename(file_name) for file_name in input_files]
history_json = json.loads(merged_dataset.getncattr('history_json'))[-1]
assert 'date_time' in history_json
assert history_json.get('program') == 'concise'
assert history_json.get('derived_from') == input_files
assert history_json.get('version') == importlib_metadata.distribution('podaac-concise').version
assert 'input_files=' in history_json.get('parameters')
assert history_json.get('program_ref') == 'https://cmr.earthdata.nasa.gov:443/search/concepts/S2153799015-POCLOUD'
assert history_json.get('$schema') == 'https://harmony.earthdata.nasa.gov/schemas/history/0.1.0/history-v0.1.0.json'

# Single core mode
merge.merge_netcdf_files(
input_files=input_files,
output_file=self.__output_path.joinpath(output_name_single),
process_count=1
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single))
assert_valid_history(merged_dataset, input_files)

merged_dataset.close()

# Multi core mode
merge.merge_netcdf_files(
input_files=input_files,
output_file=self.__output_path.joinpath(output_name_multi),
process_count=2
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_multi))
assert_valid_history(merged_dataset, input_files)

merged_dataset.close()

# Run again, but use l2ss-py output which contains existing
# history_json. Concise history should contain new entry plus
# all entries from input files
data_path = self.__test_data_path.joinpath('l2ss_py_output')
input_files = list(data_path.iterdir())
merge.merge_netcdf_files(
input_files=input_files,
output_file=self.__output_path.joinpath(output_name_single),
process_count=1
)
merged_dataset = nc.Dataset(self.__output_path.joinpath(output_name_single))
assert_valid_history(merged_dataset, input_files)
history_json = json.loads(merged_dataset.getncattr('history_json'))
assert len(history_json) == 3
assert history_json[0]['program'] == 'l2ss-py'
assert history_json[1]['program'] == 'l2ss-py'
assert history_json[2]['program'] == 'concise'

0 comments on commit 62ac5f4

Please sign in to comment.