Skip to content

Commit

Permalink
Merge branch 'a168264359610926_bridge_configuration'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Aug 16, 2016
2 parents c227e6b + cec8f61 commit 55ec2f4
Showing 1 changed file with 24 additions and 14 deletions.
38 changes: 24 additions & 14 deletions openprocurement/contracting/api/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import os
import argparse

from copy import deepcopy
from retrying import retry
from uuid import uuid4

Expand Down Expand Up @@ -60,29 +59,40 @@ def __init__(self, config):
super(ContractingDataBridge, self).__init__()
self.config = config
self.on_error_delay = self.config_get('on_error_sleep_delay') or 5
queue_size = self.config_get('buffers_size') or 500
self.full_stack_sync_delay = self.config_get('full_stack_sync_delay') or 15
self.empty_stack_sync_delay = self.config_get('empty_stack_sync_delay') or 101

api_server = self.config_get('tenders_api_server')
api_version = self.config_get('tenders_api_version')
ro_api_server = self.config_get('public_tenders_api_server') or api_server

self.tenders_sync_client = TendersClientSync('',
host_url=self.config_get('tenders_api_server'),
api_version=self.config_get('tenders_api_version'),
host_url=ro_api_server, api_version=api_version,
)

self.client = TendersClient(
self.config_get('api_token'),
host_url=self.config_get('tenders_api_server'),
api_version=self.config_get('tenders_api_version'),
host_url=api_server, api_version=api_version,
)

self.contracting_client = ContractingClient(
self.config_get('api_token'),
host_url=self.config_get('tenders_api_server'),
api_version=self.config_get('tenders_api_version')
host_url=api_server, api_version=api_version
)

self.contracting_client_ro = self.contracting_client
if self.config_get('public_tenders_api_server'):
self.contracting_client_ro = ContractingClient(
self.config_get('api_token'),
host_url=ro_api_server, api_version=api_version
)

self.initial_sync_point = {}
self.tenders_queue = Queue(maxsize=500)
self.handicap_contracts_queue = Queue(maxsize=500)
self.contracts_put_queue = Queue(maxsize=500)
self.contracts_retry_put_queue = Queue(maxsize=500)
self.tenders_queue = Queue(maxsize=queue_size)
self.handicap_contracts_queue = Queue(maxsize=queue_size)
self.contracts_put_queue = Queue(maxsize=queue_size)
self.contracts_retry_put_queue = Queue(maxsize=queue_size)

def config_get(self, name):
return self.config.get('main').get(name)
Expand Down Expand Up @@ -124,9 +134,9 @@ def get_tenders(self, params={}, direction=""):
tenders_list = response.data
params['offset'] = response.next_page.offset

delay = 101
delay = self.empty_stack_sync_delay
if tenders_list:
delay = 15
delay = self.full_stack_sync_delay
logger.info("Client {} params: {}".format(direction, params))
for tender in tenders_list:
if tender['status'] in ("active.qualification",
Expand Down Expand Up @@ -171,7 +181,7 @@ def _get_tender_contracts(self):

try:
if not db.has(contract['id']):
self.contracting_client.get_contract(contract['id'])
self.contracting_client_ro.get_contract(contract['id'])
else:
logger.info('Contract {} exists in local db'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CACHED}, params={"CONTRACT_ID": contract['id']}))
continue
Expand Down

0 comments on commit 55ec2f4

Please sign in to comment.