Skip to content

Commit

Permalink
Fix Postgres connection error in multi-process case (DM-26539)
Browse files Browse the repository at this point in the history
My previous workaround for passing butler to subprocess was broken
because new connection was made in the parent process. Replacing with a
new workaround that passes butler in pickle form across fork.
  • Loading branch information
andy-slac committed Aug 31, 2020
1 parent 37abb6e commit f42c8ff
Showing 1 changed file with 23 additions and 5 deletions.
28 changes: 23 additions & 5 deletions python/lsst/ctrl/mpexec/mpGraphExecutor.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
# -------------------------------
# Imports of standard modules --
# -------------------------------
import copy
from enum import Enum
import logging
import multiprocessing
import pickle
import time

# -----------------------------
Expand Down Expand Up @@ -74,19 +74,37 @@ def start(self, butler, quantumExecutor):
Executor for single quantum.
"""
# Butler can have live database connections which is a problem with
# fork-type activation. Make a copy of butler, this guarantees that
# no database is open right after copy.
butler = copy.copy(butler)
# fork-type activation. Make a pickle of butler to pass that actross
# fork.
butler_pickle = pickle.dumps(butler)
taskDef = self.qdata.taskDef
quantum = self.qdata.quantum
self.process = multiprocessing.Process(
target=quantumExecutor.execute, args=(taskDef, quantum, butler),
target=self._executeJob,
args=(quantumExecutor, taskDef, quantum, butler_pickle),
name=f"task-{self.qdata.index}"
)
self.process.start()
self.started = time.time()
self.state = JobState.RUNNING

def _executeJob(self, quantumExecutor, taskDef, quantum, butler_pickle):
"""Execute a job with arguments.
Parameters
----------
quantumExecutor : `QuantumExecutor`
Executor for single quantum.
taskDef : `~lsst.pipe.base.TaskDef`
Task definition structure.
quantum : `~lsst.daf.butler.Quantum`
Quantum for this task execution.
butler_pickle : `bytes`
Data butler instance in pickled form.
"""
butler = pickle.loads(butler_pickle)
quantumExecutor.execute(taskDef, quantum, butler)

def stop(self):
"""Stop the process.
"""
Expand Down

0 comments on commit f42c8ff

Please sign in to comment.