Skip to content

Commit

Permalink
Merge ddeee7d into b471fc0
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav committed Jan 17, 2018
2 parents b471fc0 + ddeee7d commit b1f2b8d
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 54 deletions.
9 changes: 6 additions & 3 deletions openprocurement/edge/databridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@
'retries_count': 10,
'queue_timeout': 3,
'bulk_save_limit': 1000,
'bulk_save_interval': 5
'bulk_save_interval': 5,
'historical': False,
'token': ""
}

DEFAULTS = {
Expand Down Expand Up @@ -171,8 +173,9 @@ def create_api_client(self):
try:
api_client = APIClient(
host_url=self.api_host, user_agent=client_user_agent,
api_version=self.api_version, key='',
resource=self.workers_config['resource'])
api_version=self.api_version, key=self.workers_config['token'],
resource=self.workers_config['resource']
)
client_id = uuid.uuid4().hex
logger.info('Started api_client {}'.format(
api_client.session.headers['User-Agent']),
Expand Down
72 changes: 64 additions & 8 deletions openprocurement/edge/tests/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import unittest
import uuid
import logging
from copy import deepcopy
from copy import deepcopy, copy
from gevent import sleep, idle
from gevent.queue import Queue, Empty
from mock import MagicMock, patch, call
Expand Down Expand Up @@ -33,7 +33,8 @@ class TestResourceItemWorker(unittest.TestCase):
'retries_count': 2,
'queue_timeout': 0.3,
'bulk_save_limit': 1,
'bulk_save_interval': 0.1
'bulk_save_interval': 0.1,
'historical': False
}

def tearDown(self):
Expand Down Expand Up @@ -175,6 +176,61 @@ def test__get_resource_item_from_queue(self):
self.assertEqual(resource_item, None)
del worker

@patch('openprocurement_client.client.TendersClient')
def test__get_item_with_historical(self, mock_api_client):
item = {
'id': uuid.uuid4().hex,
'dateModified': datetime.datetime.utcnow().isoformat()
}
api_clients_queue = Queue()
client_dict = {
'id': uuid.uuid4().hex,
'request_interval': 0.02,
'client': mock_api_client
}
api_clients_queue.put(client_dict)
api_clients_info =\
{client_dict['id']: {'drop_cookies': False, 'request_durations': {}}}
retry_queue = Queue()
return_dict = {
'data': {
'id': item['id'],
'dateModified': datetime.datetime.utcnow().isoformat()
},
'x_revision_n': "1"
}
worker = ResourceItemWorker(api_clients_queue=api_clients_queue,
config_dict=self.worker_config,
retry_resource_items_queue=retry_queue,
api_clients_info=api_clients_info)
worker.db = dict()

mock_api_client.get_resource_item_historical.return_value = deepcopy(return_dict)
result = list(worker._get_item_with_historical(client_dict, item))
self.assertEqual(item['id'] + "-1", result[0]['id'])
self.assertEqual(1, len(result))

return_dict["x_revision_n"] = "2"
mock_api_client.get_resource_item_historical.side_effect = [deepcopy(return_dict), deepcopy(return_dict)]

result = list(worker._get_item_with_historical(client_dict, item))
self.assertEqual(item['id'] + "-1", result[0]['id'])
self.assertEqual(item['id'] + "-2", result[1]['id'])
self.assertEqual(2, len(result))

mock_api_client.get_resource_item_historical.side_effect = [deepcopy(return_dict), deepcopy(return_dict)]
worker.db[copy(return_dict['data']['id']) + "-1"] = True

result = list(worker._get_item_with_historical(client_dict, item))
self.assertEqual(item['id'] + "-2", result[0]['id'])
self.assertEqual(1, len(result))
#
mock_api_client.get_resource_item_historical.side_effect = [deepcopy(return_dict), deepcopy(return_dict)]
worker.db[copy(return_dict['data']['id']) + "-2"] = True

result = list(worker._get_item_with_historical(client_dict, item))
self.assertEqual(0, len(result))

@patch('openprocurement_client.client.TendersClient')
def test__get_resource_item_from_public(self, mock_api_client):
item = {
Expand Down Expand Up @@ -206,6 +262,7 @@ def test__get_resource_item_from_public(self, mock_api_client):
# Success test
self.assertEqual(worker.api_clients_queue.qsize(), 1)
api_client = worker._get_api_client_dict()
api_client['historical'] = self.worker_config['historical']
self.assertEqual(api_client['request_interval'], 0.02)
self.assertEqual(worker.api_clients_queue.qsize(), 0)
public_item = worker._get_resource_item_from_public(api_client, item)
Expand Down Expand Up @@ -335,8 +392,7 @@ def test__add_to_bulk(self):

# Successfull adding to bulk
start_length = len(worker.bulk)
worker._add_to_bulk(resource_item_dict, queue_resource_item,
resource_item_doc_dict)
worker._add_to_bulk(resource_item_dict, resource_item_doc_dict)
end_length = len(worker.bulk)
self.assertGreater(end_length, start_length)

Expand All @@ -345,8 +401,7 @@ def test__add_to_bulk(self):
new_resource_item_dict = deepcopy(resource_item_dict)
new_resource_item_dict['dateModified'] =\
datetime.datetime.utcnow().isoformat()
worker._add_to_bulk(new_resource_item_dict, queue_resource_item,
resource_item_doc_dict)
worker._add_to_bulk(new_resource_item_dict, resource_item_doc_dict)
end_length = len(worker.bulk)
self.assertEqual(start_length, end_length)

Expand All @@ -357,7 +412,7 @@ def test__add_to_bulk(self):
'id': queue_resource_item['id'],
'_id': queue_resource_item['id'],
'dateModified': old_date_modified
}, queue_resource_item, resource_item_dict)
}, resource_item_dict)
end_length = len(worker.bulk)
self.assertEqual(start_length, end_length)
del worker
Expand Down Expand Up @@ -449,7 +504,8 @@ def test__run(self, mocked_logger, mock_get_from_public, mocked_save_bulk):
api_client_dict = {
'id': uuid.uuid4().hex,
'client': client,
'request_interval': 0
'request_interval': 0,
'historical': self.worker_config['historical']
}
client.session.headers = {'User-Agent': 'Test-Agent'}
self.api_clients_info = {
Expand Down
108 changes: 65 additions & 43 deletions openprocurement/edge/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from iso8601 import parse_date
from pytz import timezone
from requests.exceptions import ConnectionError
import logging
import logging.config
import time
from openprocurement_client.exceptions import (
Expand Down Expand Up @@ -117,42 +116,63 @@ def _get_resource_item_from_queue(self):
else:
return None

def _get_item_with_historical(self, api_client_dict, queue_resource_item):
resource_id = queue_resource_item['id']
client = api_client_dict['client']
current = client.get_resource_item_historical(resource_id)
revisions_number = int(current['x_revision_n'])
# there can't be 0 revisions
for index in range(1, revisions_number):
if "{}-{}".format(resource_id, index) not in self.db:
doc = client.get_resource_item_historical(
resource_id, index)['data']
doc['id'] += "-{}".format(index)
yield doc
if "{}-{}".format(resource_id, revisions_number) not in self.db:
doc = current['data']
doc['id'] += "-{}".format(revisions_number)
yield doc

def _get_resource_item_from_public(self, api_client_dict,
queue_resource_item):
try:
logger.debug('Request interval {} sec. for client {}'.format(
api_client_dict['request_interval'],
api_client_dict['client'].session.headers['User-Agent']))
start = time.time()
resource_item = api_client_dict['client'].get_resource_item(
queue_resource_item['id']).get('data')
self.api_clients_info[api_client_dict['id']][
'request_durations'][datetime.now()] = time.time() - start
self.api_clients_info[api_client_dict['id']]['request_interval'] =\
api_client_dict['request_interval']
logger.debug('Recieved from API {}: {} {}'.format(
self.config['resource'][:-1], resource_item['id'],
resource_item['dateModified']))
if api_client_dict['request_interval'] > 0:
api_client_dict['request_interval'] -=\
self.config['client_dec_step_timeout']
if resource_item['dateModified'] <\
queue_resource_item['dateModified']:
logger.info(
'Client {} got not actual {} document {} from public '
'server.'.format(
api_client_dict['client'].session.headers[
'User-Agent'], self.config['resource'][:-1],
queue_resource_item['id']),
extra={'MESSAGE_ID': 'not_actual_docs'})
self.add_to_retry_queue({
'id': queue_resource_item['id'],
'dateModified': queue_resource_item['dateModified']
})
self.api_clients_queue.put(api_client_dict)
logger.debug('PUT API CLIENT: {}'.format(api_client_dict['id']),
extra={'MESSAGE_ID': 'put_client'})
return None # Not actual
if self.config['historical']:
resource_item = self._get_item_with_historical(
api_client_dict=api_client_dict, queue_resource_item=queue_resource_item)
else:
resource_item = api_client_dict['client'].get_resource_item(
queue_resource_item['id']).get('data')
self.api_clients_info[api_client_dict['id']][
'request_durations'][datetime.now()] = time.time() - start
self.api_clients_info[api_client_dict['id']]['request_interval'] =\
api_client_dict['request_interval']
logger.debug('Recieved from API {}: {} {}'.format(
self.config['resource'][:-1], resource_item['id'],
resource_item['dateModified']))
if api_client_dict['request_interval'] > 0:
api_client_dict['request_interval'] -=\
self.config['client_dec_step_timeout']
if resource_item['dateModified'] <\
queue_resource_item['dateModified']:
logger.info(
'Client {} got not actual {} document {} from public '
'server.'.format(
api_client_dict['client'].session.headers[
'User-Agent'], self.config['resource'][:-1],
queue_resource_item['id']),
extra={'MESSAGE_ID': 'not_actual_docs'})
self.add_to_retry_queue({
'id': queue_resource_item['id'],
'dateModified': queue_resource_item['dateModified']
})
self.api_clients_queue.put(api_client_dict)
logger.debug('PUT API CLIENT: {}'.format(api_client_dict['id']),
extra={'MESSAGE_ID': 'put_client'})
return None # Not actual
self.api_clients_queue.put(api_client_dict)
logger.debug('PUT API CLIENT: {}'.format(api_client_dict['id']),
extra={'MESSAGE_ID': 'put_client'})
Expand Down Expand Up @@ -256,8 +276,7 @@ def _get_resource_item_from_public(self, api_client_dict,
})
return None

def _add_to_bulk(self, resource_item, queue_resource_item,
resource_item_doc):
def _add_to_bulk(self, resource_item, resource_item_doc=None):
resource_item['doc_type'] = self.config['resource'][:-1].title()
resource_item['_id'] = resource_item['id']
if resource_item_doc:
Expand Down Expand Up @@ -364,7 +383,6 @@ def _run(self):
sleep(self.config['worker_sleep'])
continue

# Try get resource item from local storage
try:
# Resource object from local db server
resource_item_doc = self.db.get(queue_resource_item['id'])
Expand Down Expand Up @@ -393,6 +411,20 @@ def _run(self):
logger.debug('PUT API CLIENT: {}'.format(api_client_dict['id']),
extra={'MESSAGE_ID': 'put_client'})
continue
# Try to get resource item from public server
if self.config['historical']:
resource_items = self._get_resource_item_from_public(api_client_dict, queue_resource_item)
for item in resource_items:
self._add_to_bulk(item)
else:
resource_item = self._get_resource_item_from_public(
api_client_dict, queue_resource_item)
if resource_item is None:
continue

# Add docs to bulk
self._add_to_bulk(resource_item, resource_item_doc)

except Exception as e:
self.api_clients_queue.put(api_client_dict)
logger.debug('PUT API CLIENT: {}'.format(api_client_dict['id']),
Expand All @@ -406,16 +438,6 @@ def _run(self):
extra={'MESSAGE_ID': 'exceptions'})
continue

# Try get resource item from public server
resource_item = self._get_resource_item_from_public(
api_client_dict, queue_resource_item)
if resource_item is None:
continue

# Add docs to bulk
self._add_to_bulk(resource_item, queue_resource_item,
resource_item_doc)

# Save/Update docs in db
self._save_bulk_docs()

Expand Down

0 comments on commit b1f2b8d

Please sign in to comment.