From b59a03fa728b1b5eaa408cc2e8d44ddc31b8b220 Mon Sep 17 00:00:00 2001 From: James McKinney <26463+jpmckinney@users.noreply.github.com> Date: Wed, 20 Apr 2022 12:44:49 -0400 Subject: [PATCH] paraguay: Reduce duplication and fix bugs - paraguay_hacienca: Set last_request scalar, but middleware expects last_requests list. - Set access_token_scheduled_at on each attempt, not only on the first attempt. --- kingfisher_scrapy/downloadermiddlewares.py | 48 +++++++---- .../spiders/paraguay_dncp_base.py | 76 ++++++----------- .../spiders/paraguay_dncp_records.py | 6 +- .../spiders/paraguay_dncp_releases.py | 4 +- .../spiders/paraguay_hacienda.py | 83 ++++++++----------- 5 files changed, 100 insertions(+), 117 deletions(-) diff --git a/kingfisher_scrapy/downloadermiddlewares.py b/kingfisher_scrapy/downloadermiddlewares.py index 46a90fa9..4214804d 100644 --- a/kingfisher_scrapy/downloadermiddlewares.py +++ b/kingfisher_scrapy/downloadermiddlewares.py @@ -21,6 +21,22 @@ class ParaguayAuthMiddleware: Tokens should be generated and assigned just before sending a request, but Scrapy does not provide any way to do this, which in turn means that sometimes we accidently send expired tokens. For now, the issue seems to be avoided by setting the number of concurrent requests to 1, at cost of download speed. + + .. code-block:: python + + class Paraguay: + name = 'paraguay' + + # ParaguayAuthMiddleware + access_token = None + access_token_scheduled_at = None + # The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler. + access_token_maximum_age = 14 * 60 + access_token_request_failed = False + requests_backlog = [] + + def build_access_token_request(self): + return scrapy.Request("https://example.com") """ def __init__(self, spider): @@ -33,36 +49,36 @@ def from_crawler(cls, crawler): def process_request(self, request, spider): if 'auth' in request.meta and request.meta['auth'] is False: return - if spider.auth_failed: - spider.crawler.engine.close_spider(spider, 'auth_failed') + if spider.access_token_request_failed: + spider.crawler.engine.close_spider(spider, 'access_token_request_failed') raise IgnoreRequest("Max attempts to get an access token reached. Stopping crawl...") request.headers['Authorization'] = spider.access_token if self._expires_soon(spider): - # SAVE the last request to continue after getting the token - spider.last_requests.append(request) - spider.logger.info('Saving request for after getting the token: %s', request.url) - # spider MUST implement the request_access_token method - return spider.request_access_token() + return spider.add_request_to_backlog_and_build_access_token_request(spider, request) def process_response(self, request, response, spider): if response.status == 401 or response.status == 429: - spider.logger.info('Time transcurred: %s', (datetime.now() - spider.start_time).total_seconds()) + age = (datetime.now() - spider.access_token_scheduled_at).total_seconds() + spider.logger.info('Access token age: %ss', age) spider.logger.info('%s returned for request to %s', response.status, request.url) if not spider.access_token == request.headers['Authorization'] and self._expires_soon(spider): - # SAVE the last request to continue after getting the token - spider.last_requests.append(request) - spider.logger.info('Saving request for after getting the token: %s', request.url) - # spider MUST implement the request_access_token method - return spider.request_access_token() + return self.add_request_to_backlog_and_build_access_token_request(spider, request) request.headers['Authorization'] = spider.access_token return request return response + def add_request_to_backlog_and_build_access_token_request(self, spider, request): + spider.requests_backlog.append(request) + spider.logger.info('Added request to backlog until token received: %s', request.url) + return spider.build_access_token_request() + @staticmethod def _expires_soon(spider): - if spider.start_time and spider.access_token: - # The spider must implement the expires_soon method. - return spider.expires_soon(datetime.now() - spider.start_time) + if spider.access_token and spider.access_token_scheduled_at: + age = (datetime.now() - spider.access_token_scheduled_at).total_seconds() + if age < spider.access_token_maximum_age: + return False + spider.logger.info('Access token age: %ss', age) return True diff --git a/kingfisher_scrapy/spiders/paraguay_dncp_base.py b/kingfisher_scrapy/spiders/paraguay_dncp_base.py index c45e3bb6..4393f365 100644 --- a/kingfisher_scrapy/spiders/paraguay_dncp_base.py +++ b/kingfisher_scrapy/spiders/paraguay_dncp_base.py @@ -22,24 +22,23 @@ class ParaguayDNCPBase(SimpleSpider): default_from_date = '2010-01-01T00:00:00' date_required = True - # request limits: since we can't control when Scrapy decides to send a - # request, values here are slightly less than real limits. - start_time = None + # ParaguayAuthMiddleware access_token = None - auth_failed = False - last_requests = [] - request_time_limit = 13 # in minutes - base_url = 'https://contrataciones.gov.py/datos/api/v3/doc' - auth_url = f'{base_url}/oauth/token' - request_token = None + access_token_scheduled_at = None + # The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler. + access_token_maximum_age = 13 * 60 + access_token_request_failed = False + requests_backlog = [] + + # Local max_attempts = 10 + url_prefix = 'https://contrataciones.gov.py/datos/api/v3/doc/' @classmethod def from_crawler(cls, crawler, *args, **kwargs): spider = super().from_crawler(crawler, *args, **kwargs) spider.request_token = crawler.settings.get('KINGFISHER_PARAGUAY_DNCP_REQUEST_TOKEN') - if spider.request_token is None: raise MissingEnvVarError('KINGFISHER_PARAGUAY_DNCP_REQUEST_TOKEN is not set.') @@ -50,7 +49,7 @@ def start_requests(self): yield self.build_request( url, formatter=parameters('fecha_desde'), - # send duplicate requests when the token expired and in the continuation of last_requests saved. + # send duplicate requests when the token expired and in the continuation of requests_backlog saved. dont_filter=True, callback=self.parse_pages ) @@ -68,8 +67,8 @@ def urls_builder(self): start_date = end_date - interval # We request active/complete tenders and planned ones separately to ensure we don't exceed the 10000 # results per request limit. - url_base = f'{self.base_url}/search/processes?fecha_desde={start_date.strftime(self.date_format)}-04:00' \ - f'&fecha_hasta={end_date.strftime(self.date_format)}-04:00&items_per_page=10000 ' + url_base = f'{self.url_prefix}search/processes?fecha_desde={start_date.strftime(self.date_format)}' \ + f'-04:00&fecha_hasta={end_date.strftime(self.date_format)}-04:00&items_per_page=10000 ' # We request the active or successful tenders by using the "publicacion_llamado" filter. url_tender = f'{url_base}&tipo_fecha=publicacion_llamado' # And the planned ones with the "fecha_release" and tender.id=planned filters. @@ -77,16 +76,15 @@ def urls_builder(self): end_date = start_date - timedelta(seconds=1) yield from [url_tender, url_planning] - def request_access_token(self): - """ Requests a new access token """ - attempt = 0 - self.start_time = datetime.now() + def build_access_token_request(self, attempt=0): self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts) + self.access_token_scheduled_at = datetime.now() + return scrapy.Request( - self.auth_url, + f'{url_prefix}oauth/token', method='POST', - headers={'accept': 'application/json', 'Content-Type': 'application/json'}, + headers={'Accept': 'application/json', 'Content-Type': 'application/json'}, body=json.dumps({'request_token': self.request_token}), meta={'attempt': attempt + 1, 'auth': False}, callback=self.parse_access_token, @@ -96,43 +94,32 @@ def request_access_token(self): def parse_access_token(self, response): if self.is_http_success(response): - r = response.json() - token = r.get('access_token') + token = response.json().get('access_token') if token: self.logger.info('New access token: %s', token) self.access_token = token # continue scraping where it stopped after getting the token - while self.last_requests: - yield self.last_requests.pop(0) + while self.requests_backlog: + yield self.requests_backlog.pop(0) else: attempt = response.request.meta['attempt'] if attempt == self.max_attempts: self.logger.error('Max attempts to get an access token reached.') - self.auth_failed = True + self.access_token_request_failed = True raise AccessTokenError() else: - self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts) - yield scrapy.Request( - self.auth_url, - method='POST', - headers={'accept': 'application/json', 'Content-Type': 'application/json'}, - body=json.dumps({'request_token': self.request_token}), - meta={'attempt': attempt + 1, 'auth': False}, - callback=self.parse_access_token, - dont_filter=True, - priority=1000 - ) + yield self.build_access_token_request(attempt=attempt) else: self.logger.error('Authentication failed. Status code: %s', response.status) - self.auth_failed = True + self.access_token_request_failed = True raise AccessTokenError() @handle_http_error def parse_pages(self, response): - content = response.json() - for url in self.get_files_to_download(content): + data = response.json() + for url in self.get_files_to_download(data): yield self.build_request(url, formatter=components(-1), dont_filter=True) - pagination = content['pagination'] + pagination = data['pagination'] if pagination['current_page'] < pagination['total_pages']: page = pagination['current_page'] + 1 url = replace_parameters(response.request.url, page=page) @@ -144,14 +131,5 @@ def parse_pages(self, response): ) @abstractmethod - def get_files_to_download(self, content): + def get_files_to_download(self, data): pass - - def expires_soon(self, time_diff): - """ Tells if the access token will expire soon (required by - ParaguayAuthMiddleware) - """ - if time_diff.total_seconds() < ParaguayDNCPBase.request_time_limit * 60: - return False - self.logger.info('Time_diff: %s', time_diff.total_seconds()) - return True diff --git a/kingfisher_scrapy/spiders/paraguay_dncp_records.py b/kingfisher_scrapy/spiders/paraguay_dncp_records.py index 8e1f4ea2..d182a4dd 100644 --- a/kingfisher_scrapy/spiders/paraguay_dncp_records.py +++ b/kingfisher_scrapy/spiders/paraguay_dncp_records.py @@ -24,6 +24,6 @@ class ParaguayDNCPRecords(ParaguayDNCPBase): # SimpleSpider data_type = 'record_package' - def get_files_to_download(self, content): - for record in content['records']: - yield f"{self.base_url}/ocds/record/{record['ocid']}" + def get_files_to_download(self, data): + for record in data['records']: + yield f"{self.url_prefix}/ocds/record/{record['ocid']}" diff --git a/kingfisher_scrapy/spiders/paraguay_dncp_releases.py b/kingfisher_scrapy/spiders/paraguay_dncp_releases.py index 73dfb07f..ff395567 100644 --- a/kingfisher_scrapy/spiders/paraguay_dncp_releases.py +++ b/kingfisher_scrapy/spiders/paraguay_dncp_releases.py @@ -21,7 +21,7 @@ class ParaguayDNCPReleases(ParaguayDNCPBase): # SimpleSpider data_type = 'release_package' - def get_files_to_download(self, content): - for record in content['records']: + def get_files_to_download(self, data): + for record in data['records']: for release in record['releases']: yield release['url'] diff --git a/kingfisher_scrapy/spiders/paraguay_hacienda.py b/kingfisher_scrapy/spiders/paraguay_hacienda.py index 5d51ebfb..9b08a36b 100644 --- a/kingfisher_scrapy/spiders/paraguay_hacienda.py +++ b/kingfisher_scrapy/spiders/paraguay_hacienda.py @@ -28,16 +28,22 @@ class ParaguayHacienda(BaseSpider): 'CONCURRENT_REQUESTS': 1, } - start_time = None + # BaseSpider + dont_truncate = True + + # ParaguayAuthMiddleware access_token = None - auth_failed = False - last_request = None + access_token_scheduled_at = None + # The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler. + access_token_maximum_age = 14 * 60 + access_token_request_failed = False + requests_backlog = [] + + # Local max_attempts = 5 - base_list_url = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/pagos/cdp?page={}' + url_prefix = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/' + list_url_prefix = f'{url_prefix}pagos/cdp?page=' release_ids = [] - request_time_limit = 14.0 - data_type = 'release_package' - dont_truncate = True @classmethod def from_crawler(cls, crawler, *args, **kwargs): @@ -55,27 +61,28 @@ def start_requests(self): # Paraguay Hacienda has a service that return all the ids that we need to get the releases packages # so we first iterate over this list that is paginated yield self.build_request( - self.base_list_url.format(1), + f'{self.list_url_prefix}1', formatter=parameters('page'), meta={ 'meta': True, 'first': True, }, - # send duplicate requests when the token expired and in the continuation of last_request saved. + # send duplicate requests when the token expired and in the continuation of requests_backlog saved. dont_filter=True, ) @handle_http_error def parse(self, response): + package_url_prefix = f'{url_prefix}ocds/release-package/' + data = response.json() - pattern = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/ocds/release-package/{}' # If is the first URL, we need to iterate over all the pages to get all the process ids to query if response.request.meta['first']: total = data['meta']['totalPages'] for page in range(2, total + 1): yield self.build_request( - self.base_list_url.format(page), + f'{self.list_url_prefix}{page}', formatter=parameters('page'), meta={ 'meta': True, @@ -92,7 +99,7 @@ def parse(self, response): if row['idLlamado'] and row['idLlamado'] not in self.release_ids: self.release_ids.append(row['idLlamado']) yield self.build_request( - pattern.format(row['idLlamado']), + f'{package_url_prefix}{row["idLlamado"]}', formatter=components(-1), meta={ 'meta': False, @@ -101,20 +108,21 @@ def parse(self, response): dont_filter=True ) else: - yield self.build_file_from_response(response, data_type=self.data_type) + yield self.build_file_from_response(response, data_type='release_package') - def request_access_token(self): - """ Requests a new access token """ - attempt = 0 - self.start_time = datetime.now() + def build_access_token_request(self, body=None, attempt=0): self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts) - payload = {"clientSecret": self.client_secret} + + if body is None: + body = json.dumps({"clientSecret": self.client_secret}) + + self.access_token_scheduled_at = datetime.now() return scrapy.Request( - "https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/auth/token", + f'{url_prefix}auth/token', method='POST', - headers={"Authorization": self.request_token, "Content-Type": "application/json"}, - body=json.dumps(payload), + headers={'Authorization': self.request_token, 'Content-Type': 'application/json'}, + body=body, meta={'attempt': attempt + 1, 'auth': False}, callback=self.parse_access_token, dont_filter=True, @@ -123,41 +131,22 @@ def request_access_token(self): def parse_access_token(self, response): if self.is_http_success(response): - r = response.json() - token = r.get('accessToken') + token = response.json().get('accessToken') if token: self.logger.info('New access token: %s', token) - self.access_token = 'Bearer ' + token + self.access_token = f'Bearer {token}' # continue scraping where it stopped after getting the token - yield self.last_request + while self.requests_backlog: + yield self.requests_backlog.pop(0) else: attempt = response.request.meta['attempt'] if attempt == self.max_attempts: self.logger.error('Max attempts to get an access token reached.') - self.auth_failed = True + self.access_token_request_failed = True raise AccessTokenError() else: - self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts) - yield scrapy.Request( - "https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/auth/token", - method='POST', - headers={"Authorization": self.request_token, "Content-Type": "application/json"}, - body=response.request.body, - meta={'attempt': attempt + 1, 'auth': False}, - callback=self.parse_access_token, - dont_filter=True, - priority=1000 - ) + yield self.build_access_token_request(response.request.body, attempt=attempt) else: self.logger.error('Authentication failed. Status code: %s', response.status) - self.auth_failed = True + self.access_token_request_failed = True raise AccessTokenError() - - def expires_soon(self, time_diff): - """ Tells if the access token will expire soon (required by - ParaguayAuthMiddleware) - """ - if time_diff.total_seconds() < ParaguayHacienda.request_time_limit * 60: - return False - self.logger.info('Time_diff: %s', time_diff.total_seconds()) - return True