Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/a232725695971924_fix_412'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Dec 22, 2016
2 parents 5d5ba7a + cadb4ec commit b01f6c5
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions openprocurement/tender/competitivedialogue/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import gevent
from gevent.queue import Queue

from openprocurement_client.client import TendersClient, TendersClientSync
from openprocurement_client.client import TendersClientSync as BaseTendersClientSync
from yaml import load

from openprocurement.tender.competitivedialogue.models_constants import (
Expand Down Expand Up @@ -99,6 +99,39 @@ def prepare_lot(orig_tender, lot_id, items):
return lot


def check_status_response(func):
def func_wrapper(obj, *args, **kwargs):
try:
response = func(obj, *args, **kwargs)
except ResourceError as re:
if re.status_int == 412:
obj.headers['Cookie'] = re.response.headers['Set-Cookie']
response = func(obj, *args, **kwargs)
else:
raise ResourceError(re)
return response
return func_wrapper


class TendersClientSync(BaseTendersClientSync):

@check_status_response
def get_tender(self, *args, **kwargs):
return super(TendersClientSync, self).get_tender(*args, **kwargs)

@check_status_response
def extract_credentials(self, *args, **kwargs):
return super(TendersClientSync, self).extract_credentials(*args, **kwargs)

@check_status_response
def create_tender(self, *args, **kwargs):
return super(TendersClientSync, self).create_tender(*args, **kwargs)

@check_status_response
def patch_tender(self, *args, **kwargs):
return super(TendersClientSync, self).patch_tender(*args, **kwargs)


class CompetitiveDialogueDataBridge(object):
""" Competitive Dialogue Data Bridge """
copy_name_fields = ('title_ru', 'mode', 'procurementMethodDetails', 'title_en', 'description', 'description_en',
Expand All @@ -118,13 +151,14 @@ def __init__(self, config):
api_version=self.config_get('tenders_api_version'),
)

self.client = TendersClient(
self.client = TendersClientSync(
self.config_get('api_token'),
host_url=self.config_get('tenders_api_server'),
api_version=self.config_get('tenders_api_version'),
)

self.initial_sync_point = {}
self.initialization_event = gevent.event.Event()
self.competitive_dialogues_queue = Queue(maxsize=500) # Id tender which need to check
self.handicap_competitive_dialogues_queue = Queue(maxsize=500)
self.dialogs_stage2_put_queue = Queue(maxsize=500) # queue with new tender data
Expand Down Expand Up @@ -162,17 +196,19 @@ def initialize_sync(self, params=None, direction=None):
# initial sync point
if direction == "backward":
assert params['descending']
response = self.tenders_sync_client.sync_tenders(params,
extra_headers={'X-Client-Request-ID': generate_req_id()})
response = self.tenders_sync_client.sync_tenders(params, extra_headers={'X-Client-Request-ID': generate_req_id()})
# set values in reverse order due to 'descending' option
self.initial_sync_point = {'forward_offset': response.prev_page.offset,
'backward_offset': response.next_page.offset}
self.initialization_event.set()
logger.info("Initial sync point {}".format(self.initial_sync_point))
return response
elif not self.initial_sync_point:
raise ValueError
else:
assert 'descending' not in params
gevent.wait([self.initialization_event])
self.initialization_event.clear()
params['offset'] = self.initial_sync_point['forward_offset']
logger.info("Starting forward sync from offset {}".format(params['offset']))
return self.tenders_sync_client.sync_tenders(params,
Expand Down Expand Up @@ -428,8 +464,8 @@ def retry_patch_dialog_add_stage2_id(self):
self._patch_dialog_add_stage2_id_with_retry(dialog)
except:
logger.warn("Can't patch competitive dialogue id={0}".format(dialog['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CD_UNSUCCESSFUL_PATCH_STAGE2_ID,
"TENDER_ID": dialog['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CD_UNSUCCESSFUL_PATCH_STAGE2_ID},
{"TENDER_ID": dialog['id']}))
self.competitive_dialogues_queue.put({"id": dialog['id']})
else:
data = {"id": dialog['stage2TenderID'],
Expand Down

0 comments on commit b01f6c5

Please sign in to comment.