Skip to content

Commit

Permalink
Merge branch 'a166080492734376_immortal_jobs'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Aug 20, 2016
2 parents 1e05755 + 87ce70a commit 509cf75
Showing 1 changed file with 42 additions and 26 deletions.
68 changes: 42 additions & 26 deletions openprocurement/contracting/api/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def __init__(self, config):
super(ContractingDataBridge, self).__init__()
self.config = config
self.on_error_delay = self.config_get('on_error_sleep_delay') or 5
self.jobs_watcher_delay = self.config_get('jobs_watcher_delay') or 15
queue_size = self.config_get('buffers_size') or 500
self.full_stack_sync_delay = self.config_get('full_stack_sync_delay') or 15
self.empty_stack_sync_delay = self.config_get('empty_stack_sync_delay') or 101
Expand Down Expand Up @@ -93,6 +94,7 @@ def __init__(self, config):
)

self.initial_sync_point = {}
self.initialization_event = gevent.event.Event()
self.tenders_queue = Queue(maxsize=queue_size)
self.handicap_contracts_queue = Queue(maxsize=queue_size)
self.contracts_put_queue = Queue(maxsize=queue_size)
Expand All @@ -111,22 +113,19 @@ def get_tender_credentials(self, tender_id):
{"TENDER_ID": tender_id}))
return data

@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000)
def initialize_sync(self, params=None, direction=None):
# TODO use gevent.Event to wake up forward sync instead of checking
# initial sync point
if direction == "backward":
assert params['descending']
response = self.tenders_sync_client.sync_tenders(params, extra_headers={'X-Client-Request-ID': generate_req_id()})
# set values in reverse order due to 'descending' option
self.initial_sync_point = {'forward_offset': response.prev_page.offset,
'backward_offset': response.next_page.offset}
self.initialization_event.set() # wake up forward worker
logger.info("Initial sync point {}".format(self.initial_sync_point))
return response
elif not self.initial_sync_point:
raise ValueError
else:
assert 'descending' not in params
gevent.wait([self.initialization_event])
params['offset'] = self.initial_sync_point['forward_offset']
logger.info("Starting forward sync from offset {}".format(params['offset']))
return self.tenders_sync_client.sync_tenders(params, extra_headers={'X-Client-Request-ID': generate_req_id()})
Expand Down Expand Up @@ -405,30 +404,47 @@ def sync_single_tender(self, tender_id):
logger.info("Tender {} does not contain contracts to transfer".format(tender_id))


def _start_synchronization_workers(self):
logger.info('Starting forward and backward sync workers')
self.jobs = [gevent.spawn(self.get_tender_contracts_backward),
gevent.spawn(self.get_tender_contracts_forward)]

def _restart_synchronization_workers(self):
logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}, {}))
for j in self.jobs:
j.kill()
self._start_synchronization_workers()

def _start_contract_sculptors(self):
self.immortal_jobs = {'get_tender_contracts': gevent.spawn(self.get_tender_contracts),
'prepare_contract_data': gevent.spawn(self.prepare_contract_data),
'put_contracts': gevent.spawn(self.put_contracts),
'retry_put_contracts': gevent.spawn(self.retry_put_contracts)}

def run(self):
logger.info('Start Contracting Data Bridge', extra=journal_context({"MESSAGE_ID": DATABRIDGE_START}, {}))
self.immortal_jobs = [
gevent.spawn(self.get_tender_contracts),
gevent.spawn(self.prepare_contract_data),
gevent.spawn(self.put_contracts),
gevent.spawn(self.retry_put_contracts),
]
while True:
try:
logger.info('Starting forward and backward sync workers')
self.jobs = [
gevent.spawn(self.get_tender_contracts_backward),
gevent.spawn(self.get_tender_contracts_forward),
]
gevent.joinall(self.jobs)
except KeyboardInterrupt:
logger.info('Exiting...')
gevent.killall(self.jobs, timeout=5)
break
except Exception, e:
logger.exception(e)
self._start_contract_sculptors()
self._start_synchronization_workers()
backward_worker, forward_worker = self.jobs

logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}, {}))
try:
while True:
gevent.sleep(self.jobs_watcher_delay)
if forward_worker.dead or (backward_worker.dead and not backward_worker.successful()):
self._restart_synchronization_workers()
backward_worker, forward_worker = self.jobs

for name, job in self.immortal_jobs.items():
if job.dead:
logger.warn('Restarting {} worker'.format(name))
self.immortal_jobs[name] = gevent.spawn(getattr(self, name))

except KeyboardInterrupt:
logger.info('Exiting...')
gevent.killall(self.jobs, timeout=5)
gevent.killall(self.immortal_jobs, timeout=5)
except Exception, e:
logger.exception(e)


def main():
Expand Down

0 comments on commit 509cf75

Please sign in to comment.