Skip to content

Commit

Permalink
Feature/podaac 5065 (#49)
Browse files Browse the repository at this point in the history
* put a try exception around running multicore, if multicore fails try single core

* fix pyling

* update changelog

* add memory calculation to determine to run single core or multicore

* fix pylint and flake8

* fix how to get a variables byte size
  • Loading branch information
sliu008 authored Nov 22, 2022
1 parent c8ff819 commit e419465
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 3 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Updated cmr-umm-updater to newer version and added input argument to disable association removal
- PODAAC-4976
- Added dateutil dependency to Jupyter-test workflow
- PODAAC-5065
- Calculate how much shared memory will be used and determine to use multicore or singlecore concise.
### Changed
### Deprecated
### Removed
Expand Down
47 changes: 44 additions & 3 deletions podaac/merger/merge_worker.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Preprocessing methods and the utilities to automagically run them in single-thread/multiprocess modes"""

import math
import multiprocessing
from multiprocessing.shared_memory import SharedMemory
import queue
Expand All @@ -25,6 +26,37 @@ def shared_memory_size():
return int(default_memory_size)


def max_var_memory(file_list, var_info, max_dims):
"""
function to get the maximum shared memory that will be used for variables
Parameters
----------
file_list : list
List of file paths to be processed
var_info : dict
Dictionary of variable paths and associated VariableInfo
"""

max_var_mem = 0
for file in file_list:
with nc.Dataset(file, 'r') as origin_dataset:

for var_path, var_meta in var_info.items():
ds_group, var_name = resolve_group(origin_dataset, var_path)
ds_var = ds_group.variables.get(var_name)

if ds_var is None:
target_shape = tuple(max_dims[f'/{dim}'] for dim in var_meta.dim_order)
var_size = math.prod(target_shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)
else:
var_size = math.prod(ds_var.shape) * var_meta.datatype.itemsize
max_var_mem = max(var_size, max_var_mem)

return max_var_mem


def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logger):
"""
Automagically run merging in an optimized mode determined by the environment
Expand All @@ -44,15 +76,22 @@ def run_merge(merged_dataset, file_list, var_info, max_dims, process_count, logg
"""

if process_count == 1:
_run_single_core(merged_dataset, file_list, var_info, max_dims)
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)
else:
# Merging is bottlenecked at the write process which is single threaded
# so spinning up more than 2 processes for read/write won't scale the
# optimization
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)

max_var_mem = max_var_memory(file_list, var_info, max_dims)
max_memory_size = round(shared_memory_size() * .95)

def _run_single_core(merged_dataset, file_list, var_info, max_dims):
if max_var_mem < max_memory_size:
_run_multi_core(merged_dataset, file_list, var_info, max_dims, 2, logger)
else:
_run_single_core(merged_dataset, file_list, var_info, max_dims, logger)


def _run_single_core(merged_dataset, file_list, var_info, max_dims, logger):
"""
Run the variable merge in the current thread/single-core mode
Expand All @@ -67,6 +106,8 @@ def _run_single_core(merged_dataset, file_list, var_info, max_dims):
max_dims : dict
Dictionary of dimension paths and maximum dimensions found during preprocessing
"""

logger.info("Running single core ......")
for i, file in enumerate(file_list):
with nc.Dataset(file, 'r') as origin_dataset:
origin_dataset.set_auto_maskandscale(False)
Expand Down

0 comments on commit e419465

Please sign in to comment.