Skip to content

Commit

Permalink
Merge pull request #193 from kyocum/fix/py38spawn
Browse files Browse the repository at this point in the history
Support p38 MP spawn context; simplify internals
  • Loading branch information
kyocum committed Aug 12, 2021
2 parents 6c74499 + 69ea5cc commit c3ef7e8
Show file tree
Hide file tree
Showing 19 changed files with 856 additions and 799 deletions.
44 changes: 38 additions & 6 deletions disdat/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@ def __init__(self,
self.data_context = local_context
elif isinstance(local_context, str):
self.data_context = self._fs.get_context(local_context)
if self.data_context is None:
raise Exception("Unable to create Bundle: no context found with name[{}]".format(local_context))
else:
raise Exception("Unable to create Bundle because no context found with name[{}]".format(local_context))
raise Exception("Unable to create Bundle: local_context is not str or DataContext")
except Exception as e:
_logger.error("Unable to allocate bundle in context: {} ".format(local_context, e))
return
Expand Down Expand Up @@ -177,6 +179,35 @@ def __init__(self,
self.add_data(data)
self.close()

def __getstate__(self):
""" Manual serialization for pickling bundles
We need to remove references to the data context, the DisdatFS, and
underlying protobuf. Data contexts have DB connections, DisdatFS points
to the current data context, and protobufs should be serialized to string using
the protobuf serializer.
"""
state = self.__dict__.copy()
# convert the data context object to a name
state['data_context'] = self.data_context.get_local_name()
# no need to carry around the fs
del state['_fs']
# convert underlying pb to a string
state['pb'] = self.pb.SerializeToString()
return state

def __setstate__(self, state):
""" Deserialize the context, fs, and pb fields """
self.__dict__.update(state)
# Restore a pointer to DisdatFS
self._fs = _get_fs()
# Restore a pointer to the data context
assert isinstance(self.data_context, str)
self.data_context = self._fs.get_context(self.data_context)
# Restore pb object
pb = self._pb_type()
pb.ParseFromString(self.pb)
self.pb = pb

def abandon(self):
""" Remove on-disk state of the bundle if it is abandoned before it is closed.
that were left !closed have their directories harvested.
Expand All @@ -186,7 +217,7 @@ def abandon(self):
a forked child process might have closed their copy while the parent deletes theirs.
"""
self._check_open()
_logger.debug(f"Disdat api clean_abandoned removing bundle obj [{id(self)}] process[{os.getpid()}")
_logger.debug(f"Disdat api abandon bundle obj [{id(self)}] process[{os.getpid()}] uuid[{self.uuid}]")
PipeBase.rm_bundle_dir(self._local_dir, self.uuid)

def _check_open(self):
Expand Down Expand Up @@ -364,19 +395,20 @@ def add_dependencies(self, bundles, arg_names=None):
Args:
bundles (Union[list `api.Bundle`, `api.Bundle`]): Another bundle that may have been used to produce this one
arg_names (list[str]): argument names of the dependencies. Optional, otherwise default 'arg_<i>' used.
arg_names (Union[list str, str]): Optional argument names of the dependencies. Default 'arg_<i>' used.
Returns:
self
"""
self._check_open()
curr_count = LineageRecord.dependency_count(self.pb.lineage)
if isinstance(bundles, collections.Iterable):
if arg_names is None:
arg_names = ['_arg_{}'.format(i) for i in range(0, len(bundles))]
arg_names = ['_arg_{}'.format(i) for i in range(0+curr_count, len(bundles)+curr_count)]
LineageRecord.add_deps_to_lr(self.pb.lineage, [(b.processing_name, b.uuid, an) for an, b in zip(arg_names, bundles)])
else:
if arg_names is None:
arg_names = '_arg_0'
arg_names = '_arg_{}'.format(curr_count)
LineageRecord.add_deps_to_lr(self.pb.lineage, [(bundles.processing_name, bundles.uuid, arg_names)])
return self

Expand Down Expand Up @@ -499,7 +531,7 @@ def open(self, force_uuid=None):
force_uuid (str): DEPRECATING - do not use. Force to open a bundle with a specific bundle.
Returns:
None
Bundle
"""
if self._closed:
_logger.error("Bundle is closed -- unable to re-open.")
Expand Down

0 comments on commit c3ef7e8

Please sign in to comment.