Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-26302: Use new registry method to reset connection pool. #116

Merged
merged 1 commit into from
Mar 2, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,23 @@ def start(self, butler, quantumExecutor, startMethod=None):
startMethod : `str`, optional
Start method from `multiprocessing` module.
"""
# Butler can have live database connections which is a problem with
# fork-type activation. Make a pickle of butler to pass that across
# fork. Unpickling of quantum has to happen after butler, this is why
# Unpickling of quantum has to happen after butler, this is why
# it is pickled manually here.
butler_pickle = pickle.dumps(butler)
quantum_pickle = pickle.dumps(self.qnode.quantum)
taskDef = self.qnode.taskDef
logConfigState = CliLog.configState
mp_ctx = multiprocessing.get_context(startMethod)
self.process = mp_ctx.Process(
target=_Job._executeJob,
args=(quantumExecutor, taskDef, quantum_pickle, butler_pickle, logConfigState),
args=(quantumExecutor, taskDef, quantum_pickle, butler, logConfigState),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was surprised pickling works here, because of the DimensionUniverse pickle implementation that doesn't really save anything. But I see it worked before, and you'r just slightly changing how it works. Is there an explicit workaround for the DimensionUniverse pickling that just isn't in this diff?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DimensionUniverse is not pickled but it is re-initialized by registry when we re-instantiate butler (or if the existing butler is used with fork). This is why I need to delay Quantum un-pickling until afer butler is unpickled (by multiprocessing if spawn method is used).

name=f"task-{self.qnode.nodeId.number}"
)
self.process.start()
self.started = time.time()
self._state = JobState.RUNNING

@staticmethod
def _executeJob(quantumExecutor, taskDef, quantum_pickle, butler_pickle, logConfigState):
def _executeJob(quantumExecutor, taskDef, quantum_pickle, butler, logConfigState):
"""Execute a job with arguments.

Parameters
Expand All @@ -114,15 +111,18 @@ def _executeJob(quantumExecutor, taskDef, quantum_pickle, butler_pickle, logConf
Task definition structure.
quantum_pickle : `bytes`
Quantum for this task execution in pickled form.
butler_pickle : `bytes`
Data butler instance in pickled form.
butler : `lss.daf.butler.Butler`
Data butler instance.
"""
if logConfigState and not CliLog.configState:
# means that we are in a new spawned Python process and we have to
# re-initialize logging
CliLog.replayConfigState(logConfigState)

butler = pickle.loads(butler_pickle)
# have to reset connection pool to avoid sharing connections
if butler is not None:
butler.registry.resetConnectionPool()

quantum = pickle.loads(quantum_pickle)
quantumExecutor.execute(taskDef, quantum, butler)

Expand Down