Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/podaac 5065 #49

Merged
merged 6 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated cmr-umm-updater to newer version and added input argument to disable association removal
- PODAAC-4976
- Added dateutil dependency to Jupyter-test workflow
- PODAAC-5065
- Calculate how much shared memory will be used and determine to use multicore or singlecore concise.
### Changed
### Deprecated
### Removed
Expand Down
47 changes: 44 additions & 3 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import math
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
Expand All @@ -25,6 +26,37 @@ def shared_memory_size():
return int(default_memory_size)


def max_var_memory(file_list, var_info, max_dims):
"""
function to get the maximum shared memory that will be used for variables

Parameters
----------
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""

max_var_mem = 0
for file in file_list:
with nc.Dataset(file, 'r') as origin_dataset:

for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables.get(var_name)

if ds_var is None:
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
var_size = math.prod(target_shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)
joshgarde marked this conversation as resolved.
Show resolved Hide resolved
else:
var_size = math.prod(ds_var.shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)

return max_var_mem


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
"""
Automagically run merging in an optimized mode determined by the environment
Expand All @@ -44,15 +76,22 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
"""

if process_count == 1:
_run_single_core(merged_dataset, file_list, var_info, max_dims)
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
else:
# Merging is bottlenecked at the write process which is single threaded
# so spinning up more than 2 processes for read/write won't scale the
# optimization
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)

max_var_mem = max_var_memory(file_list, var_info, max_dims)
max_memory_size = round(shared_memory_size() * .95)

def _run_single_core(merged_dataset, file_list, var_info, max_dims):
if max_var_mem < max_memory_size:
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)
else:
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
"""
Run the variable merge in the current thread/single-core mode

Expand All @@ -67,6 +106,8 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims):
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
"""

logger.info("Running single core ......")
for i, file in enumerate(file_list):
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)
Expand Down