Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/a330126682794336_fix_bridge_q_ov…
Browse files Browse the repository at this point in the history
…erflow'
  • Loading branch information
vmaksymiv committed Jul 6, 2017
2 parents 5d9f17e + a231f63 commit 31c23f5
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 20 deletions.
55 changes: 36 additions & 19 deletions openprocurement/tender/competitivedialogue/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,22 +157,24 @@ def __init__(self, config):
api_version=self.config_get('tenders_api_version'),
)

def_queue_size = 500
def_watcher_delay = 15
self.initial_sync_point = {}
self.initialization_event = gevent.event.Event()
self.competitive_dialogues_queue = Queue(maxsize=500) # Id tender which need to check
self.handicap_competitive_dialogues_queue = Queue(maxsize=500)
self.dialogs_stage2_put_queue = Queue(maxsize=500) # queue with new tender data
self.dialogs_stage2_retry_put_queue = Queue(maxsize=500)
self.competitive_dialogues_queue = Queue(maxsize=def_queue_size) # Id tender which need to check
self.handicap_competitive_dialogues_queue = Queue(maxsize=def_queue_size)
self.dialogs_stage2_put_queue = Queue(maxsize=def_queue_size) # queue with new tender data
self.dialogs_stage2_retry_put_queue = Queue(maxsize=def_queue_size)

self.dialog_stage2_id_queue = Queue(maxsize=500)
self.dialog_retry_stage2_id_queue = Queue(maxsize=500)
self.dialog_stage2_id_queue = Queue(maxsize=def_queue_size)
self.dialog_retry_stage2_id_queue = Queue(maxsize=def_queue_size)

self.dialogs_stage2_patch_queue = Queue(maxsize=500)
self.dialogs_stage2_retry_patch_queue = Queue(maxsize=500)
self.dialogs_stage2_patch_queue = Queue(maxsize=def_queue_size)
self.dialogs_stage2_retry_patch_queue = Queue(maxsize=def_queue_size)

self.dialog_set_complete_queue = Queue(maxsize=500)
self.dialog_retry_set_complete_queue = Queue(maxsize=500)
self.jobs_watcher_delay = self.config_get('jobs_watcher_delay') or 15
self.dialog_set_complete_queue = Queue(maxsize=def_queue_size)
self.dialog_retry_set_complete_queue = Queue(maxsize=def_queue_size)
self.jobs_watcher_delay = self.config_get('jobs_watcher_delay') or def_watcher_delay

def config_get(self, name):
return self.config.get('main').get(name)
Expand Down Expand Up @@ -256,9 +258,15 @@ def get_competitive_dialogue_data(self):
except Exception, e:
# If we have something problems then put tender back to queue
logger.exception(e)
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']),
logger.info('Putting tender {} back to tenders queue...'.format(tender_to_sync['id']),
extra=journal_context(params={"TENDER_ID": tender_to_sync['id']}))

self.competitive_dialogues_queue.get() # Remove erroneous item from the queue, to put it as the last item
# What if another thread put one more item after get but before put?
self.competitive_dialogues_queue.put(tender_to_sync)

logger.info('Tender {} put back to tenders queue. Que size: {}'.format(
tender_to_sync['id'], self.competitive_dialogues_queue.qsize()))
else:
if 'stage2TenderID' in tender:
try:
Expand All @@ -269,12 +277,20 @@ def get_competitive_dialogue_data(self):
{"TENDER_ID": tender['id']}))
else:
if tender_stage2.get('status') in self.allowed_statuses:
logger.info('For dialog {0} tender stage 2 already exists, need only patch'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_ONLY_PATCH},
{"TENDER_ID": tender['id']}))
patch_data = {"id": tender['id'],
"status": "complete"}
self.dialog_set_complete_queue.put(patch_data)
if tender.get('status') == 'complete':
logger.warn('Dialog {0} already has complete status - silently removing from initial queue.'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_ONLY_PATCH},
{"TENDER_ID": tender['id']}))
self.competitive_dialogues_queue.get() # Remove from the queue
else:
logger.info('For dialog {0} tender stage 2 already exists, need only patch'.format(tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_ONLY_PATCH},
{"TENDER_ID": tender['id']}))
patch_data = {"id": tender['id'],
"status": "complete"}
self.competitive_dialogues_queue.get() # Remove from the queue
self.dialog_set_complete_queue.put(patch_data)

continue
elif tender_stage2.get('status') in self.rewrite_statuses:
logger.info('Tender stage 2 id={0} has bad status need to create new '.format(tender['id']),
Expand Down Expand Up @@ -721,7 +737,8 @@ def run(self):
gevent.sleep(self.jobs_watcher_delay)
if counter == 20:
logger.info(
"""Current state: First stages in processing {competitive_dialogues_queue};
"""Current state:
First stages in processing {competitive_dialogues_queue};
Prepared data for second stage {handicap_competitive_dialogues_queue};
Prepared data with owner and token {dialogs_stage2_put_queue};
Retry prepared data with owner and token {dialogs_stage2_retry_put_queue};
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from setuptools import setup, find_packages
import os

version = '1.0.15'
version = '1.0.17'

requires = [
'setuptools'
Expand Down

0 comments on commit 31c23f5

Please sign in to comment.