Skip to content

Commit

Permalink
Merge branch 'a142022530357637_fix_journal_logging'
Browse files Browse the repository at this point in the history
  • Loading branch information
vmaksymiv committed Jun 9, 2016
2 parents 7b02e25 + 92ff6a5 commit 8b3c9a0
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 33 deletions.
84 changes: 52 additions & 32 deletions openprocurement/contracting/api/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@
from openprocurement_client.contract import ContractingClient
from openprocurement_client.client import ResourceNotFound
from yaml import load
from openprocurement.contracting.api.journal_msg_ids import (
DATABRIDGE_RESTART, DATABRIDGE_GET_CREDENTIALS, DATABRIDGE_GOT_CREDENTIALS,
DATABRIDGE_FOUND_MULTILOT_COMPLETE, DATABRIDGE_FOUND_NOLOT_COMPLETE,
DATABRIDGE_CONTRACT_TO_SYNC, DATABRIDGE_CONTRACT_EXISTS,
DATABRIDGE_COPY_CONTRACT_ITEMS, DATABRIDGE_MISSING_CONTRACT_ITEMS,
DATABRIDGE_GET_EXTRA_INFO, DATABRIDGE_MISSING_CREDENTIALS,
DATABRIDGE_GOT_EXTRA_INFO, DATABRIDGE_CREATE_CONTRACT,
DATABRIDGE_CONTRACT_CREATED, DATABRIDGE_UNSUCCESSFUL_CREATE,
DATABRIDGE_RETRY_CREATE, DATABRIDGE_CREATE_ERROR, DATABRIDGE_TENDER_PROCESS,
DATABRIDGE_SKIP_NOT_MODIFIED, DATABRIDGE_SYNC_SLEEP, DATABRIDGE_SYNC_RESUME)


logger = logging.getLogger("openprocurement.contracting.api.databridge")
Expand All @@ -37,6 +47,10 @@ def generate_req_id():
return b'contracting-data-bridge-req-' + str(uuid4()).encode('ascii')


def journal_context(extra):
return dict([("JOURNAL_" + k, v) for k, v in extra.items()])


class ContractingDataBridge(object):
""" Contracting Data Bridge """

Expand Down Expand Up @@ -73,9 +87,11 @@ def config_get(self, name):
@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000)
def get_tender_credentials(self, tender_id):
self.client.headers.update({'X-Client-Request-ID': generate_req_id()})
logger.info("Getting credentials for tender {}".format(tender_id), extra={"TENDER_ID": tender_id})
logger.info("Getting credentials for tender {}".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_CREDENTIALS,
"TENDER_ID": tender_id}))
data = self.client.extract_credentials(tender_id)
logger.info("Got tender {} credentials".format(tender_id), extra={"TENDER_ID": tender_id})
logger.info("Got tender {} credentials".format(tender_id), extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_CREDENTIALS,
"TENDER_ID": tender_id}))
return data

@retry(stop_max_attempt_number=5, wait_exponential_multiplier=1000)
Expand Down Expand Up @@ -115,19 +131,19 @@ def get_tenders(self, params={}, direction=""):
if hasattr(tender, "lots"):
if any([1 for lot in tender['lots'] if lot['status'] == "complete"]):
logger.info('{} sync: Found multilot tender {} in status {}'.format(direction.capitalize(), tender['id'], tender['status']),
extra={"TENDER_ID": tender['id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_MULTILOT_COMPLETE, "TENDER_ID": tender['id']}))
yield tender
elif tender['status'] == "complete":
logger.info('{} sync: Found tender in complete status {}'.format(direction.capitalize(), tender['id']),
extra={"TENDER_ID": tender['id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_FOUND_NOLOT_COMPLETE, "TENDER_ID": tender['id']}))
yield tender
else:
logger.debug('{} sync: Skipping tender {} in status {}'.format(direction.capitalize(), tender['id'], tender['status']),
extra={"TENDER_ID": tender['id']})
extra=journal_context({"TENDER_ID": tender['id']}))

logger.info('Sleep {} sync...'.format(direction))
logger.info('Sleep {} sync...'.format(direction), extra=journal_context({"MESSAGE_ID": DATABRIDGE_SYNC_SLEEP}))
gevent.sleep(delay)
logger.info('Restore {} sync'.format(direction))
logger.info('Restore {} sync'.format(direction), extra=journal_context({"MESSAGE_ID": DATABRIDGE_SYNC_RESUME}))
logger.debug('{} {}'.format(direction, params))
response = self.tenders_sync_client.sync_tenders(params, extra_headers={'X-Client-Request-ID': generate_req_id()})

Expand All @@ -140,11 +156,11 @@ def get_tender_contracts(self):
db.put(tender_to_sync['id'], {'dateModified': tender_to_sync['dateModified']})
except Exception, e:
logger.exception(e)
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra={"TENDER_ID": tender_to_sync['id']})
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context({"TENDER_ID": tender_to_sync['id']}))
self.tenders_queue.put(tender_to_sync)
else:
if 'contracts' not in tender:
logger.warn('!!!No contracts found in tender {}'.format(tender['id']), extra={"TENDER_ID": tender['id']})
logger.warn('!!!No contracts found in tender {}'.format(tender['id']), extra=journal_context({"TENDER_ID": tender['id']}))
continue
for contract in tender['contracts']:
if contract["status"] == "active":
Expand All @@ -153,26 +169,28 @@ def get_tender_contracts(self):
if not db.has(contract['id']):
self.contracting_client.get_contract(contract['id'])
else:
logger.info('Contract {} exists in local db'.format(contract['id']), extra={"CONTRACT_ID": contract['id']})
logger.info('Contract {} exists in local db'.format(contract['id']), extra=journal_context({"CONTRACT_ID": contract['id']}))
continue
except ResourceNotFound:
logger.info('Sync contract {} of tender {}'.format(contract['id'], tender['id']), extra={"CONTRACT_ID": contract['id'],
"TENDER_ID": tender['id']})
logger.info('Sync contract {} of tender {}'.format(contract['id'], tender['id']), extra=journal_context(
{"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id'], "MESSAGE_ID": DATABRIDGE_CONTRACT_TO_SYNC}))
except Exception, e:
logger.exception(e)
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra={"TENDER_ID": tender_to_sync['id']})
logger.info('Put tender {} back to tenders queue'.format(tender_to_sync['id']), extra=journal_context({"TENDER_ID": tender_to_sync['id']}))
self.tenders_queue.put(tender_to_sync)
break
else:
db.put(contract['id'], True)
logger.info('Contract exists {}'.format(contract['id']), extra={"CONTRACT_ID": contract['id']})
logger.info('Contract exists {}'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_EXISTS,
"CONTRACT_ID": contract['id']}))
continue

contract['tender_id'] = tender['id']
contract['tender_id'] = tender['id']
contract['procuringEntity'] = tender['procuringEntity']

if not contract.get('items'):
logger.info('Copying contract {} items'.format(contract['id']), extra={"CONTRACT_ID": contract['id']})
logger.info('Copying contract {} items'.format(contract['id']), extra=journal_context({"MESSAGE_ID": DATABRIDGE_COPY_CONTRACT_ITEMS,
"CONTRACT_ID": contract['id']}))
if tender.get('lots'):
related_awards = [aw for aw in tender['awards'] if aw['id'] == contract['awardID']]
if related_awards:
Expand All @@ -185,15 +203,16 @@ def get_tender_contracts(self):
contract['items'] = [item for item in tender['items'] if item['relatedLot'] == award['lotID']]
else:
logger.warn('Not found related award for contact {} of tender {}'.format(contract['id'], tender['id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']})
extra=journal_context({"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
else:
logger.debug('Copying all tender {} items into contract {}'.format(tender['id'], contract['id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']})
extra=journal_context({"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))
contract['items'] = tender['items']

if not contract.get('items'):
logger.warn('Contact {} of tender {} does not contain items info'.format(contract['id'], tender['id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CONTRACT_ITEMS,
"CONTRACT_ID": contract['id'], "TENDER_ID": tender['id']}))

self.handicap_contracts_queue.put(contract)

Expand All @@ -202,16 +221,16 @@ def prepare_contract_data(self):
contract = self.handicap_contracts_queue.get()
try:
logger.info("Getting extra info for tender {}".format(contract['tender_id']),
extra={"TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GET_EXTRA_INFO, "TENDER_ID": contract['tender_id']}))
tender_data = self.get_tender_credentials(contract['tender_id'])
except Exception, e:
logger.exception(e)
logger.info("Can't get tender credentials {}".format(contract['tender_id']),
extra={"TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_MISSING_CREDENTIALS, "TENDER_ID": contract['tender_id']}))
self.handicap_contracts_queue.put(contract)
else:
logger.debug("Got extra info for tender {}".format(contract['tender_id']),
extra={"TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_GOT_EXTRA_INFO, "TENDER_ID": contract['tender_id']}))
data = tender_data.data
if data.get('mode'):
contract['mode'] = data['mode']
Expand All @@ -225,18 +244,18 @@ def put_contracts(self):
contract = self.contracts_put_queue.get()
try:
logger.info("Creating contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
data = {"data": contract.toDict()}
self.contracting_client.create_contract(data)
logger.info("Successfully created contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CONTRACT_CREATED, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
db.put(contract['id'], True)
except Exception, e:
logger.exception(e)
logger.info("Unsuccessful put for contract {0} of tender {1}".format(contract['id'], contract['tender_id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_UNSUCCESSFUL_CREATE, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
logger.info("Schedule retry for contract {0}".format(contract['id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_RETRY_CREATE, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
self.contracts_retry_put_queue.put(contract)
gevent.sleep(0)

Expand All @@ -245,7 +264,7 @@ def _put_with_retry(self, contract):
try:
data = {"data": contract.toDict()}
logger.info("Creating contract {} of tender {}".format(contract['id'], contract['tender_id']),
extra={"CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_CONTRACT, "CONTRACT_ID": contract['id'], "TENDER_ID": contract['tender_id']}))
self.contracting_client.create_contract(data)
except Exception, e:
logger.exception(e)
Expand All @@ -259,7 +278,7 @@ def retry_put_contracts(self):
except:
contract = self.contracts_retry_put_queue.get()
del contract['tender_token'] # do not reveal tender credentials in logs
logger.warn("Can't create contract {}".format(contract), extra={"CONTRACT_ID": contract['id']})
logger.warn("Can't create contract {}".format(contract), extra=journal_context({"MESSAGE_ID": DATABRIDGE_CREATE_ERROR, "CONTRACT_ID": contract['id']}))
else:
self.contracts_retry_put_queue.get()
gevent.sleep(0)
Expand All @@ -270,7 +289,7 @@ def get_tender_contracts_forward(self):
try:
for tender_data in self.get_tenders(params=params, direction="forward"):
logger.info('Forward sync: Put tender {} to process...'.format(tender_data['id']),
extra={"TENDER_ID": tender_data['id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS, "TENDER_ID": tender_data['id']}))
self.tenders_queue.put(tender_data)
except Exception, e:
# TODO reset queues and restart sync
Expand All @@ -286,10 +305,11 @@ def get_tender_contracts_backward(self):
for tender_data in self.get_tenders(params=params, direction="backward"):
stored = db.get(tender_data['id'])
if stored and stored['dateModified'] == tender_data['dateModified']:
logger.info('Tender {} not modified from last check. Skipping'.format(tender_data['id']), extra={"TENDER_ID": tender_data['id']})
logger.info('Tender {} not modified from last check. Skipping'.format(tender_data['id']), extra=journal_context(
{"MESSAGE_ID": DATABRIDGE_SKIP_NOT_MODIFIED, "TENDER_ID": tender_data['id']}))
continue
logger.info('Backward sync: Put tender {} to process...'.format(tender_data['id']),
extra={"TENDER_ID": tender_data['id']})
extra=journal_context({"MESSAGE_ID": DATABRIDGE_TENDER_PROCESS, "TENDER_ID": tender_data['id']}))
self.tenders_queue.put(tender_data)
except Exception, e:
# TODO reset queues and restart sync
Expand Down Expand Up @@ -369,7 +389,7 @@ def run(self):
except Exception, e:
logger.exception(e)

logger.warn("Restarting synchronization")
logger.warn("Restarting synchronization", extra=journal_context({"MESSAGE_ID": DATABRIDGE_RESTART}))


def main():
Expand Down
21 changes: 21 additions & 0 deletions openprocurement/contracting/api/journal_msg_ids.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
DATABRIDGE_RESTART = "c_bridge_restart"
DATABRIDGE_GET_CREDENTIALS = "c_bridge_get_tender_credentials"
DATABRIDGE_GOT_CREDENTIALS = "c_bridge_got_tender_credentials"
DATABRIDGE_FOUND_MULTILOT_COMPLETE = "c_bridge_found_multilot"
DATABRIDGE_FOUND_NOLOT_COMPLETE = "c_bridge_found_nolot"
DATABRIDGE_CONTRACT_TO_SYNC = "c_bridge_contract_to_sync"
DATABRIDGE_CONTRACT_EXISTS = "c_bridge_contract_exists"
DATABRIDGE_COPY_CONTRACT_ITEMS = "c_bridge_prepare_items"
DATABRIDGE_MISSING_CONTRACT_ITEMS = "c_bridge_missing_c_items"
DATABRIDGE_GET_EXTRA_INFO = "c_bridge_get_credentials"
DATABRIDGE_MISSING_CREDENTIALS = "c_bridge_missing_credentials"
DATABRIDGE_GOT_EXTRA_INFO = "c_bridge_got_credentials"
DATABRIDGE_CREATE_CONTRACT = "c_bridge_create_contract"
DATABRIDGE_CONTRACT_CREATED = "c_bridge_contract_created"
DATABRIDGE_UNSUCCESSFUL_CREATE = "c_bridge_unsuccessful_create"
DATABRIDGE_RETRY_CREATE = "c_bridge_create_retry"
DATABRIDGE_CREATE_ERROR = "c_bridge_create_error"
DATABRIDGE_TENDER_PROCESS = "c_bridge_tender_process"
DATABRIDGE_SKIP_NOT_MODIFIED = "c_bridge_not_modified"
DATABRIDGE_SYNC_SLEEP = "c_bridge_sleep"
DATABRIDGE_SYNC_RESUME = "c_bridge_resume"
2 changes: 1 addition & 1 deletion openprocurement/contracting/api/views/contract.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def post(self):
if save_contract(self.request):
self.LOGGER.info('Created contract {} ({})'.format(contract.id, contract.contractID),
extra=context_unpack(self.request, {'MESSAGE_ID': 'contract_create'},
{'contract_id': contract.id, 'contractID': contract.contractID}))
{'contract_id': contract.id, 'contractID': contract.contractID or ''}))
self.request.response.status = 201
return {
'data': contract.serialize("view"),
Expand Down

0 comments on commit 8b3c9a0

Please sign in to comment.