Skip to content

Commit

Permalink
Make SQL-based task store work nicely with forking.
Browse files Browse the repository at this point in the history
This fixes an issue in `SessionBasedDaemon`, where after forking the
daemon would refuse to perform any operation on the DB with an error
like "OperationalError: (...) could not receive data from server:
Transport endpoint is not connected".
  • Loading branch information
riccardomurri committed Apr 10, 2018
1 parent 0ce8fe2 commit f2e16dd
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 2 deletions.
3 changes: 3 additions & 0 deletions gc3libs/cmdline.py
Expand Up @@ -2495,6 +2495,7 @@ def _main(self):
"Keep running in foreground"
" as requested with `-F`/`--foreground` option ...")
else:
self.session.store.pre_fork()
# redirect all output
logfile = open(os.path.join(self.params.working_dir, 'daemon.log'), 'w')
os.dup2(logfile.fileno(), 1)
Expand All @@ -2517,6 +2518,8 @@ def _main(self):
# prematurely removed while the daemon is still
# preparing...
atexit.register(rm_f, lockfile_path)
# un-suspend session store functionality
self.session.store.post_fork()
self._start_inboxes()
self._start_server()
self.running = True
Expand Down
25 changes: 23 additions & 2 deletions gc3libs/persistence/sql.py
Expand Up @@ -212,8 +212,10 @@ def _delayed_init(self):
See `GC3Pie issue #550 <https://github.com/uzh/gc3pie/issues/550>`_
for more details and motivation.
"""
self._real_engine = sqla.create_engine(
self._to_sqlalchemy_url(self.url))
url = self._to_sqlalchemy_url(self.url)
gc3libs.log.debug(
"Initializing SQLAlchemy engine for `%s`...", url)
self._real_engine = sqla.create_engine(url)

# create schema
meta = sqla.MetaData(bind=self._real_engine)
Expand Down Expand Up @@ -244,6 +246,25 @@ def _delayed_init(self):
self._real_tables = meta.tables[self.table_name]


def pre_fork(self):
"""
Dispose current SQLAlchemy engine (if any).
A new SQLAlchemy engine will be initialized
upon the next interaction with a DB.
This method only exists to allow `SessionBasedDaemon`:class:
and similar applications that can do DB operations after
fork()ing to continue to operate, without incurring into a
SQLAlchemy "OperationalError: (...) could not receive data
from server: Transport endpoint is not connected"
"""
if self._real_engine:
self._real_engine.dispose()
self._real_engine = None
self._real_extra_fields = None
self._real_tables = None


@property
def _engine(self):
if self._real_engine is None:
Expand Down
26 changes: 26 additions & 0 deletions gc3libs/persistence/store.py
Expand Up @@ -55,6 +55,32 @@ def __init__(self, url=None):
url = Url(url)
self.url = url

def pre_fork(self):
"""
Make preparations for `fork()`ing the current process.
This should close open network connections or any other
sockets or file descriptors that cannot be used by both the
parent and child process.
The default implementation of this method does nothing; as of
2018-04-10, the only subclass making use of this functionality
is `SqlStore`:class:, which needs to dispose the SQLAlchemy
engine and re-create it after forking.
"""
pass

def post_fork(self):
"""
Restore functionality that was suspended in `pre_fork`:meth:
This method will be called after forking/daemonizing has been
successfully accomplished.
The default implementation of this method does nothing.
"""
pass

def list(self, **extra_args):
"""
Return list of IDs of saved `Job` objects.
Expand Down

0 comments on commit f2e16dd

Please sign in to comment.