Skip to content

Commit

Permalink
Merge pull request #126 from ITVaan/make_bridge_immortal
Browse files Browse the repository at this point in the history
Make bridge immortal
  • Loading branch information
vmaksymiv committed Feb 21, 2017
2 parents cdbd48f + 7e1de03 commit e91fcb0
Showing 1 changed file with 26 additions and 7 deletions.
33 changes: 26 additions & 7 deletions openprocurement/tender/competitivedialogue/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -651,10 +651,6 @@ def get_competitive_dialogue_backward(self):
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS},
{"TENDER_ID": tender_data['id']}))
self.competitive_dialogues_queue.put(tender_data)
except ResourceError as re:
logger.warn('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
logger.error("Error response {}".format(re.message))
raise re
except Exception, e:
# TODO reset queues and restart sync
logger.warn('Backward worker died!', extra=journal_context({"MESSAGE_ID": DATABRIDGE_WORKER_DIED}, {}))
Expand Down Expand Up @@ -697,21 +693,44 @@ def _start_competitive_wokers(self):
def _restart_synchronization_workers(self):
logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}, {}))
for j in self.jobs:
j.kill()
j.kill(timeout=5)
self._start_competitive_wokers()

def run(self):
self._start_competitive_sculptors()
self._start_competitive_wokers()
backward_worker, forward_worker = self.jobs

counter = 0
try:
while True:
gevent.sleep(self.jobs_watcher_delay)
if counter == 20:
logger.info(
"""Current state: First stages in processing {competitive_dialogues_queue};
Prepared data for second stage {handicap_competitive_dialogues_queue};
Prepared data with owner and token {dialogs_stage2_put_queue};
Retry prepared data with owner and token {dialogs_stage2_retry_put_queue};
Data with second stage ID {dialog_stage2_id_queue};
Retry data with second stage ID {dialog_retry_stage2_id_queue};
Data with new status and first stage ID {dialogs_stage2_patch_queue}
Retry data with new status and first stage ID {dialogs_stage2_retry_patch_queue}
Data with new status for first stage {dialog_set_complete_queue}
Retry data with new status for first stage {dialog_retry_set_complete_queue}""".format(
competitive_dialogues_queue=self.competitive_dialogues_queue.qsize(),
handicap_competitive_dialogues_queue=self.handicap_competitive_dialogues_queue.qsize(),
dialogs_stage2_put_queue=self.dialogs_stage2_put_queue.qsize(),
dialogs_stage2_retry_put_queue=self.dialogs_stage2_retry_put_queue.qsize(),
dialog_stage2_id_queue=self.dialog_stage2_id_queue.qsize(),
dialog_retry_stage2_id_queue=self.dialog_retry_stage2_id_queue.qsize(),
dialogs_stage2_patch_queue=self.dialogs_stage2_patch_queue.qsize(),
dialogs_stage2_retry_patch_queue=self.dialogs_stage2_retry_patch_queue.qsize(),
dialog_set_complete_queue=self.dialog_set_complete_queue.qsize(),
dialog_retry_set_complete_queue=self.dialog_retry_set_complete_queue.qsize()))
counter = 0
counter += 1
if forward_worker.dead or (backward_worker.dead and not backward_worker.successful()):
self._restart_synchronization_workers()
backward_worker, forward_worker = self.jobs
logger.info('Starting forward and backward sync workers')
except KeyboardInterrupt:
logger.info('Exiting...')
gevent.killall(self.jobs, timeout=5)
Expand Down

0 comments on commit e91fcb0

Please sign in to comment.