Skip to content

Commit

Permalink
Merge branch 'a142022530357637_fix_journal_logging'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Jun 22, 2016
2 parents b0020cd + 935ad79 commit 6fce783
Showing 1 changed file with 36 additions and 34 deletions.
70 changes: 36 additions & 34 deletions openprocurement/contracting/api/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ def generate_req_id():
return b'contracting-data-bridge-req-' + str(uuid4()).encode('ascii')


def journal_context(extra):
return dict([("JOURNAL_" + k, v) for k, v in extra.items()])
def journal_context(record={}, params={}):
for k, v in params.items():
record["JOURNAL_" + k] = v
return record


class ContractingDataBridge(object):
Expand Down Expand Up @@ -87,11 +89,11 @@ def config_get(self, name):
@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000)
def get_tender_credentials(self, tender_id):
self.client.headers.update({'X-Client-Request-ID': generate_req_id()})
logger.info("Getting credentials for tender {}".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_CREDENTIALS,
"TENDER_ID": tender_id}))
logger.info("Getting credentials for tender {}".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_CREDENTIALS},
{"TENDER_ID": tender_id}))
data = self.client.extract_credentials(tender_id)
logger.info("Got tender {} credentials".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_CREDENTIALS,
"TENDER_ID": tender_id}))
logger.info("Got tender {} credentials".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_CREDENTIALS},
{"TENDER_ID": tender_id}))
return data

@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000)
Expand Down Expand Up @@ -131,15 +133,15 @@ def get_tenders(self, params={}, direction=""):
if hasattr(tender, "lots"):
if any([1 for lot in tender['lots'] if lot['status'] == "complete"]):
logger.info('{} sync: Found multilot tender {} in status {}'.format(direction.capitalize(), tender['id'], tender['status']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_MULTILOT_COMPLETE, "TENDER_ID": tender['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_MULTILOT_COMPLETE}, {"TENDER_ID": tender['id']}))
yield tender
elif tender['status'] == "complete":
logger.info('{} sync: Found tender in complete status {}'.format(direction.capitalize(), tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_NOLOT_COMPLETE, "TENDER_ID": tender['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_NOLOT_COMPLETE}, {"TENDER_ID": tender['id']}))
yield tender
else:
logger.debug('{} sync: Skipping tender {} in status {}'.format(direction.capitalize(), tender['id'], tender['status']),
extra=journal_context({"TENDER_ID": tender['id']}))
extra=journal_context(params={"TENDER_ID": tender['id']}))

logger.info('Sleep {} sync...'.format(direction), extra=journal_context({"MESSAGE_ID": DATABRIDGE_SYNC_SLEEP}))
gevent.sleep(delay)
Expand All @@ -156,11 +158,11 @@ def get_tender_contracts(self):
db.put(tender_to_sync['id'], {'dateModified': tender_to_sync['dateModified']})
except Exception, e:
logger.exception(e)
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context({"TENDER_ID": tender_to_sync['id']}))
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context(params={"TENDER_ID": tender_to_sync['id']}))
self.tenders_queue.put(tender_to_sync)
else:
if 'contracts' not in tender:
logger.warn('!!!No contracts found in tender {}'.format(tender['id']), extra=journal_context({"TENDER_ID": tender['id']}))
logger.warn('!!!No contracts found in tender {}'.format(tender['id']), extra=journal_context(params={"TENDER_ID": tender['id']}))
continue
for contract in tender['contracts']:
if contract["status"] == "active":
Expand All @@ -169,28 +171,28 @@ def get_tender_contracts(self):
if not db.has(contract['id']):
self.contracting_client.get_contract(contract['id'])
else:
logger.info('Contract {} exists in local db'.format(contract['id']), extra=journal_context({"CONTRACT_ID": contract['id']}))
logger.info('Contract {} exists in local db'.format(contract['id']), extra=journal_context(params={"CONTRACT_ID": contract['id']}))
continue
except ResourceNotFound:
logger.info('Sync contract {} of tender {}'.format(contract['id'], tender['id']), extra=journal_context(
{"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id'], "MESSAGE_ID": DATABRIDGE_CONTRACT_TO_SYNC}))
{"MESSAGE_ID": DATABRIDGE_CONTRACT_TO_SYNC}, {"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
except Exception, e:
logger.exception(e)
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context({"TENDER_ID": tender_to_sync['id']}))
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context(params={"TENDER_ID": tender_to_sync['id']}))
self.tenders_queue.put(tender_to_sync)
break
else:
db.put(contract['id'], True)
logger.info('Contract exists {}'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_EXISTS,
"CONTRACT_ID": contract['id']}))
logger.info('Contract exists {}'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_EXISTS},
{"CONTRACT_ID": contract['id']}))
continue

contract['tender_id'] = tender['id']
contract['procuringEntity'] = tender['procuringEntity']

if not contract.get('items'):
logger.info('Copying contract {} items'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_COPY_CONTRACT_ITEMS,
"CONTRACT_ID": contract['id']}))
logger.info('Copying contract {} items'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_COPY_CONTRACT_ITEMS},
{"CONTRACT_ID": contract['id']}))
if tender.get('lots'):
related_awards = [aw for aw in tender['awards'] if aw['id'] == contract['awardID']]
if related_awards:
Expand All @@ -203,16 +205,16 @@ def get_tender_contracts(self):
contract['items'] = [item for item in tender['items'] if item['relatedLot'] == award['lotID']]
else:
logger.warn('Not found related award for contact {} of tender {}'.format(contract['id'], tender['id']),
extra=journal_context({"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
extra=journal_context(params={"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
else:
logger.debug('Copying all tender {} items into contract {}'.format(tender['id'], contract['id']),
extra=journal_context({"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
extra=journal_context(params={"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
contract['items'] = tender['items']

if not contract.get('items'):
logger.warn('Contact {} of tender {} does not contain items info'.format(contract['id'], tender['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CONTRACT_ITEMS,
"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CONTRACT_ITEMS},
{"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))

self.handicap_contracts_queue.put(contract)

Expand All @@ -221,16 +223,16 @@ def prepare_contract_data(self):
contract = self.handicap_contracts_queue.get()
try:
logger.info("Getting extra info for tender {}".format(contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_EXTRA_INFO, "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_EXTRA_INFO}, {"TENDER_ID": contract['tender_id']}))
tender_data = self.get_tender_credentials(contract['tender_id'])
except Exception, e:
logger.exception(e)
logger.info("Can't get tender credentials {}".format(contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CREDENTIALS, "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CREDENTIALS}, {"TENDER_ID": contract['tender_id']}))
self.handicap_contracts_queue.put(contract)
else:
logger.debug("Got extra info for tender {}".format(contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_EXTRA_INFO, "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_EXTRA_INFO}, {"TENDER_ID": contract['tender_id']}))
data = tender_data.data
if data.get('mode'):
contract['mode'] = data['mode']
Expand All @@ -244,18 +246,18 @@ def put_contracts(self):
contract = self.contracts_put_queue.get()
try:
logger.info("Creating contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
data = {"data": contract.toDict()}
self.contracting_client.create_contract(data)
logger.info("Successfully created contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_CREATED, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_CREATED}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
db.put(contract['id'], True)
except Exception, e:
logger.exception(e)
logger.info("Unsuccessful put for contract {0} of tender {1}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESSFUL_CREATE, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESSFUL_CREATE}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
logger.info("Schedule retry for contract {0}".format(contract['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RETRY_CREATE, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RETRY_CREATE}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
self.contracts_retry_put_queue.put(contract)
gevent.sleep(0)

Expand All @@ -264,7 +266,7 @@ def _put_with_retry(self, contract):
try:
data = {"data": contract.toDict()}
logger.info("Creating contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT}, {"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
self.contracting_client.create_contract(data)
except Exception, e:
logger.exception(e)
Expand All @@ -278,7 +280,7 @@ def retry_put_contracts(self):
except:
contract = self.contracts_retry_put_queue.get()
del contract['tender_token'] # do not reveal tender credentials in logs
logger.warn("Can't create contract {}".format(contract), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_ERROR, "CONTRACT_ID": contract['id']}))
logger.warn("Can't create contract {}".format(contract), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_ERROR}, {"CONTRACT_ID": contract['id']}))
else:
self.contracts_retry_put_queue.get()
gevent.sleep(0)
Expand All @@ -289,7 +291,7 @@ def get_tender_contracts_forward(self):
try:
for tender_data in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender_data['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS, "TENDER_ID": tender_data['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, {"TENDER_ID": tender_data['id']}))
self.tenders_queue.put(tender_data)
except Exception, e:
# TODO reset queues and restart sync
Expand All @@ -306,10 +308,10 @@ def get_tender_contracts_backward(self):
stored = db.get(tender_data['id'])
if stored and stored['dateModified'] == tender_data['dateModified']:
logger.info('Tender {} not modified from last check. Skipping'.format(tender_data['id']), extra=journal_context(
{"MESSAGE_ID": DATABRIDGE_SKIP_NOT_MODIFIED, "TENDER_ID": tender_data['id']}))
{"MESSAGE_ID": DATABRIDGE_SKIP_NOT_MODIFIED}, {"TENDER_ID": tender_data['id']}))
continue
logger.info('Backward sync: Put tender {} to process...'.format(tender_data['id']),
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS, "TENDER_ID": tender_data['id']}))
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS}, {"TENDER_ID": tender_data['id']}))
self.tenders_queue.put(tender_data)
except Exception, e:
# TODO reset queues and restart sync
Expand Down

0 comments on commit 6fce783

Please sign in to comment.