Skip to content

Commit

Permalink
Merge branch 'redis_integration'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Oct 7, 2016
2 parents 055cdc7 + 4bebcc1 commit 4ba625b
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 9 deletions.
88 changes: 79 additions & 9 deletions openprocurement/contracting/api/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,53 @@
DATABRIDGE_COPY_CONTRACT_ITEMS, DATABRIDGE_MISSING_CONTRACT_ITEMS,
DATABRIDGE_GET_EXTRA_INFO, DATABRIDGE_WORKER_DIED, DATABRIDGE_START,
DATABRIDGE_GOT_EXTRA_INFO, DATABRIDGE_CREATE_CONTRACT, DATABRIDGE_EXCEPTION,
DATABRIDGE_CONTRACT_CREATED, DATABRIDGE_RETRY_CREATE,
DATABRIDGE_CONTRACT_CREATED, DATABRIDGE_RETRY_CREATE, DATABRIDGE_INFO,
DATABRIDGE_TENDER_PROCESS, DATABRIDGE_SKIP_NOT_MODIFIED,
DATABRIDGE_SYNC_SLEEP, DATABRIDGE_SYNC_RESUME, DATABRIDGE_CACHED)


logger = logging.getLogger("openprocurement.contracting.api.databridge")
# logger = logging.getLogger(__name__)

from lazydb import Db

db = Db('databridge_cache_db')
class Db(object):
""" Database proxy """

def __init__(self, config):
self.config = config

self._backend = None
self._db_name = None
self._port = None
self._host = None

if 'cache_host' in self.config:
import redis
self._backend = "redis"
self._host = self.config.get('cache_host')
self._port = self.config.get('cache_port') or 6379
self._db_name = self.config.get('cache_db_name') or 0
self.db = redis.StrictRedis(host=self._host, port=self._port,
db=self._db_name)
self.set_value = self.db.set
self.has_value = self.db.exists
else:
from lazydb import Db
self._backend = "lazydb"
self._db_name = self.config.get('cache_db_name') or 'databridge_cache_db'
self.db = Db(self._db_name)
self.set_value = self.db.put
self.has_value = self.db.has


def get(self, key):
return self.db.get(key)

def put(self, key, value):
self.set_value(key, value)

def has(self, key):
return self.has_value(key)


def generate_req_id():
Expand All @@ -58,6 +94,21 @@ class ContractingDataBridge(object):
def __init__(self, config):
super(ContractingDataBridge, self).__init__()
self.config = config

self.cache_db = Db(self.config.get('main'))

self._backend = "redis"
self._host = self.config.get('cache_host')
self._port = self.config.get('cache_port') or 6379
self._db_name = self.config.get('cache_db_name') or 0

logger.info("Caching backend: '{}', db name: '{}', host: '{}', port: '{}'".format(self.cache_db._backend,
self.cache_db._db_name,
self.cache_db._host,
self.cache_db._port),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_INFO}, {}))


self.on_error_delay = self.config_get('on_error_sleep_delay') or 5
self.jobs_watcher_delay = self.config_get('jobs_watcher_delay') or 15
queue_size = self.config_get('buffers_size') or 500
Expand Down Expand Up @@ -99,6 +150,7 @@ def __init__(self, config):
self.handicap_contracts_queue = Queue(maxsize=queue_size)
self.contracts_put_queue = Queue(maxsize=queue_size)
self.contracts_retry_put_queue = Queue(maxsize=queue_size)
self.basket = {}

def config_get(self, name):
return self.config.get('main').get(name)
Expand Down Expand Up @@ -163,12 +215,19 @@ def get_tenders(self, params={}, direction=""):
logger.debug('{} {}'.format(direction, params))
response = self.tenders_sync_client.sync_tenders(params, extra_headers={'X-Client-Request-ID': generate_req_id()})

def _put_tender_in_cache_by_contract(self, contract, tender_id):
dateModified = self.basket.get(contract['id'])
if dateModified:
# TODO: save tender in cache only if all active contracts are
# handled successfully
self.cache_db.put(tender_id, dateModified)
self.basket.pop(contract['id'], None)

def _get_tender_contracts(self):
try:
tender_to_sync = self.tenders_queue.get()
tender = self.tenders_sync_client.get_tender(tender_to_sync['id'],
extra_headers={'X-Client-Request-ID': generate_req_id()})['data']
db.put(tender_to_sync['id'], {'dateModified': tender_to_sync['dateModified']})
except Exception, e:
logger.warn('Fail to get tender info {}'.format(tender_to_sync['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_EXCEPTION}, params={"TENDER_ID": tender_to_sync['id']}))
logger.exception(e)
Expand All @@ -182,11 +241,13 @@ def _get_tender_contracts(self):
for contract in tender['contracts']:
if contract["status"] == "active":

self.basket[contract['id']] = tender_to_sync['dateModified']
try:
if not db.has(contract['id']):
if not self.cache_db.has(contract['id']):
self.contracting_client_ro.get_contract(contract['id'])
else:
logger.info('Contract {} exists in local db'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CACHED}, params={"CONTRACT_ID": contract['id']}))
self._put_tender_in_cache_by_contract(contract, tender_to_sync['id'])
continue
except ResourceNotFound:
logger.info('Sync contract {} of tender {}'.format(contract['id'], tender['id']), extra=journal_context(
Expand All @@ -200,9 +261,10 @@ def _get_tender_contracts(self):
self.tenders_queue.put(tender_to_sync)
break
else:
db.put(contract['id'], True)
self.cache_db.put(contract['id'], True)
logger.info('Contract exists {}'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_EXISTS},
{"TENDER_ID": tender_to_sync['id'], "CONTRACT_ID": contract['id']}))
self._put_tender_in_cache_by_contract(contract, tender_to_sync['id'])
continue

contract['tender_id'] = tender['id']
Expand Down Expand Up @@ -291,14 +353,17 @@ def put_contracts(self):
self.contracting_client.create_contract(data)
logger.info("Successfully created contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_CREATED}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
db.put(contract['id'], True)
except Exception, e:
logger.info("Unsuccessful put for contract {0} of tender {1}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_EXCEPTION}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
logger.exception(e)
logger.info("Schedule retry for contract {0}".format(contract['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RETRY_CREATE}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
self.contracts_retry_put_queue.put(contract)
else:
self.cache_db.put(contract['id'], True)
self._put_tender_in_cache_by_contract(contract, contract['tender_id'])

gevent.sleep(0)

@retry(stop_max_attempt_number=15, wait_exponential_multiplier=1000 * 60)
Expand All @@ -317,8 +382,13 @@ def retry_put_contracts(self):
try:
contract = self.contracts_retry_put_queue.get()
self._put_with_retry(contract)
logger.info("Successfully created contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_CREATED}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
except:
logger.warn("Can't create contract {}".format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_EXCEPTION}, {"TENDER_ID": contract['tender_id'], "CONTRACT_ID": contract['id']}))
else:
self.cache_db.put(contract['id'], True)
self._put_tender_in_cache_by_contract(contract, contract['tender_id'])
gevent.sleep(0)

def get_tender_contracts_forward(self):
Expand All @@ -341,8 +411,8 @@ def get_tender_contracts_backward(self):
params = {'opt_fields': 'status,lots', 'descending': 1, 'mode': '_all_'}
try:
for tender_data in self.get_tenders(params=params, direction="backward"):
stored = db.get(tender_data['id'])
if stored and stored['dateModified'] == tender_data['dateModified']:
stored = self.cache_db.get(tender_data['id'])
if stored and stored == tender_data['dateModified']:
logger.info('Tender {} not modified from last check. Skipping'.format(tender_data['id']), extra=journal_context(
{"MESSAGE_ID": DATABRIDGE_SKIP_NOT_MODIFIED}, {"TENDER_ID": tender_data['id']}))
continue
Expand Down
1 change: 1 addition & 0 deletions openprocurement/contracting/api/journal_msg_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@
DATABRIDGE_WORKER_DIED = "c_bridge_worker_died"
DATABRIDGE_EXCEPTION = "c_bridge_exception"
DATABRIDGE_CACHED = "c_bridge_cached"
DATABRIDGE_INFO = "c_bridge_info"
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
databridge_requires = requires + [
'PyYAML',
'gevent',
'redis',
'LazyDB',
'ExtendedJournalHandler',
'openprocurement_client>=1.0b2'
Expand Down

0 comments on commit 4ba625b

Please sign in to comment.