Skip to content

Commit

Permalink
Allow concurrent import in different directories
Browse files Browse the repository at this point in the history
  • Loading branch information
Marc Sommerhalder committed Apr 28, 2014
1 parent 142dfb7 commit 728a43b
Showing 1 changed file with 22 additions and 16 deletions.
38 changes: 22 additions & 16 deletions seantis/dir/events/sources/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,29 +387,34 @@ class ExternalEventImportScheduler(object):
_lock = Lock()

def __init__(self):
self.running = False
self.last_run = datetime.now()
self.running = {}
self.last_run = {}

@synchronized(_lock)
def handle_run(self, do_stop=False):
def handle_run(self, import_directory, do_stop=False):
"""Check if we can start importing or signal that we are finished.
Note that it is possible that requests are still blocked, i.e. when
doing the same request (e.g. '/fetch'), see ZSever/medusa/http_server.
Note that it happend that some threads died while executing, hence
Note that it happend that some threads died while executing, hence
forcing a run every 4 hours.
"""
return_value = False

if import_directory not in self.running:
self.running[import_directory] = False
if import_directory not in self.last_run:
self.last_run[import_directory] = datetime.now()

if do_stop:
self.running = False
self.last_run = datetime.now()
self.running[import_directory] = False
self.last_run[import_directory] = datetime.now()
else:
delta = datetime.now() - self.last_run
if not self.running or delta > timedelta(hours=4):
self.running = True
self.last_run = datetime.now()
delta = datetime.now() - self.last_run[import_directory]
if not self.running[import_directory] or delta > timedelta(hours=4):
self.running[import_directory] = True
self.last_run[import_directory] = datetime.now()
return_value = True

return return_value
Expand All @@ -419,12 +424,13 @@ def run(self, context, reimport=False, source_ids=[], no_shuffle=False):
len_imported = 0
len_sources = 0

if not self.handle_run():
import_directory = '/'.join(context.getPhysicalPath())

if not self.handle_run(import_directory):
log.info('already importing')
return len_imported, len_sources

directory = '/'.join(context.getPhysicalPath())
log.info('begin importing sources from %s' % (directory))
log.info('begin importing sources from %s' % (import_directory))

try:
importer = ExternalEventImporter(context)
Expand All @@ -445,8 +451,8 @@ def run(self, context, reimport=False, source_ids=[], no_shuffle=False):
finally:
context.reindexObject()
IDirectoryCatalog(context).reindex()
log.info('importing sources from %s finished' % (directory))
self.handle_run(True)
log.info('importing sources from %s finished' % (import_directory))
self.handle_run(import_directory, do_stop=True)

return len_imported, len_sources

Expand Down

0 comments on commit 728a43b

Please sign in to comment.