Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/fix_lot_items_processing'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Dec 19, 2016
2 parents 62495e5 + 09a4617 commit 5d5ba7a
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 31 deletions.
93 changes: 63 additions & 30 deletions openprocurement/tender/competitivedialogue/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import os
import argparse
import copy
from functools import partial
from restkit.errors import ResourceError

from retrying import retry
Expand Down Expand Up @@ -63,8 +64,11 @@ def journal_context(record=None, params=None):

def get_item_by_related_lot(items, lot_id):
for item in items:
if item['relatedLot'] == lot_id:
yield item
try:
if item['relatedLot'] == lot_id:
yield item
except KeyError:
raise KeyError('Item should contain \'relatedLot\' field.')


def get_lot_by_id(tender, lot_id):
Expand Down Expand Up @@ -134,6 +138,7 @@ def __init__(self, config):

self.dialog_set_complete_queue = Queue(maxsize=500)
self.dialog_retry_set_complete_queue = Queue(maxsize=500)
self.jobs_watcher_delay = self.config_get('jobs_watcher_delay') or 15

def config_get(self, name):
return self.config.get('main').get(name)
Expand Down Expand Up @@ -210,7 +215,7 @@ def get_tenders(self, params, direction=""):
def get_competitive_dialogue_data(self):
while True:
try:
tender_to_sync = self.competitive_dialogues_queue.get() # Get competitive dialogue which we want to sync
tender_to_sync = self.competitive_dialogues_queue.peek() # Get competitive dialogue which we want to sync
tender = self.tenders_sync_client.get_tender(tender_to_sync['id'])['data'] # Try get data by tender id
except Exception, e:
# If we have something problems then put tender back to queue
Expand Down Expand Up @@ -295,6 +300,7 @@ def get_competitive_dialogue_data(self):
if feature['relatedItem'] in old_lots.keys():
new_tender['features'].append(feature)
new_tender['shortlistedFirms'] = short_listed_firms.values()
self.competitive_dialogues_queue.get()
self.handicap_competitive_dialogues_queue.put(new_tender)

def prepare_new_tender_data(self):
Expand Down Expand Up @@ -591,35 +597,62 @@ def get_competitive_dialogue_backward(self):
else:
logger.info('Backward data sync finished.')

def run(self):
def catch_exception(self, exc, name):
"""Restarting job"""
if name == 'get_competitive_dialogue_data':
tender = self.competitive_dialogues_queue.get() # delete invalid tender from queue
logger.info('Remove invalid tender {}'.format(tender.id))
self.immortal_jobs[name] = gevent.spawn(getattr(self, name))
self.immortal_jobs[name].link_exception(partial(self.catch_exception, name=name))

def _start_competitive_sculptors(self):
logger.info('Start Competitive Dialogue Data Bridge')
self.immortal_jobs = [
gevent.spawn(self.get_competitive_dialogue_data),
gevent.spawn(self.prepare_new_tender_data),
gevent.spawn(self.put_tender_stage2),
gevent.spawn(self.retry_put_tender_stage2),
gevent.spawn(self.patch_dialog_add_stage2_id),
gevent.spawn(self.retry_patch_dialog_add_stage2_id),
gevent.spawn(self.patch_new_tender_status),
gevent.spawn(self.retry_patch_new_tender_status),
gevent.spawn(self.path_dialog_status),
gevent.spawn(self.retry_patch_dialog_status)
self.immortal_jobs = {
'get_competitive_dialogue_data': gevent.spawn(self.get_competitive_dialogue_data),
'prepare_new_tender_data': gevent.spawn(self.prepare_new_tender_data),
'put_tender_stage2': gevent.spawn(self.put_tender_stage2),
'retry_put_tender_stage2': gevent.spawn(self.retry_put_tender_stage2),
'patch_dialog_add_stage2_id': gevent.spawn(self.patch_dialog_add_stage2_id),
'retry_patch_dialog_add_stage2_id': gevent.spawn(self.retry_patch_dialog_add_stage2_id),
'patch_new_tender_status': gevent.spawn(self.patch_new_tender_status),
'retry_patch_new_tender_status': gevent.spawn(self.retry_patch_new_tender_status),
'path_dialog_status': gevent.spawn(self.path_dialog_status),
'retry_patch_dialog_status': gevent.spawn(self.retry_patch_dialog_status)
}
for name, job in self.immortal_jobs.items():
job.link_exception(partial(self.catch_exception, name=name))

def _start_competitive_wokers(self):
self.jobs = [
gevent.spawn(self.get_competitive_dialogue_backward),
gevent.spawn(self.get_competitive_dialogue_forward),
]
while True:
try:
logger.info('Starting forward and backward sync workers')
self.jobs = [
gevent.spawn(self.get_competitive_dialogue_backward),
gevent.spawn(self.get_competitive_dialogue_forward),
]
gevent.joinall(self.jobs)
except KeyboardInterrupt:
logger.info('Exiting...')
gevent.killall(self.jobs, timeout=5)
break
except Exception, e:
logger.exception(e)
gevent.joinall(self.jobs)

def _restart_synchronization_workers(self):
logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}, {}))
for j in self.jobs:
j.kill()
self._start_competitive_wokers()

def run(self):
self._start_competitive_sculptors()
self._start_competitive_wokers()
backward_worker, forward_worker = self.jobs

try:
while True:
gevent.sleep(self.jobs_watcher_delay)
if forward_worker.dead or (backward_worker.dead and not backward_worker.successful()):
self._restart_synchronization_workers()
backward_worker, forward_worker = self.jobs
logger.info('Starting forward and backward sync workers')
except KeyboardInterrupt:
logger.info('Exiting...')
gevent.killall(self.jobs, timeout=5)
gevent.killall(self.immortal_jobs, timeout=5)
except Exception, e:
logger.exception(e)
logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}))


Expand All @@ -638,4 +671,4 @@ def main():


if __name__ == "__main__":
main()
main()
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
import os

version = '1.0.4'
version = '1.0.6'

requires = [
'setuptools'
Expand Down

0 comments on commit 5d5ba7a

Please sign in to comment.