Skip to content

Commit

Permalink
paraguay: Reduce duplication and fix bugs
Browse files Browse the repository at this point in the history
- paraguay_hacienca: Set last_request scalar, but middleware expects last_requests list.
- Set access_token_scheduled_at on each attempt, not only on the first attempt.
  • Loading branch information
jpmckinney committed Apr 20, 2022
1 parent 34fc1ee commit b59a03f
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 117 deletions.
48 changes: 32 additions & 16 deletions kingfisher_scrapy/downloadermiddlewares.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,22 @@ class ParaguayAuthMiddleware:
Tokens should be generated and assigned just before sending a request, but Scrapy does not provide any way to do
this, which in turn means that sometimes we accidently send expired tokens. For now, the issue seems to be avoided
by setting the number of concurrent requests to 1, at cost of download speed.
.. code-block:: python
class Paraguay:
name = 'paraguay'
# ParaguayAuthMiddleware
access_token = None
access_token_scheduled_at = None
# The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler.
access_token_maximum_age = 14 * 60
access_token_request_failed = False
requests_backlog = []
def build_access_token_request(self):
return scrapy.Request("https://example.com")
"""

def __init__(self, spider):
Expand All @@ -33,36 +49,36 @@ def from_crawler(cls, crawler):
def process_request(self, request, spider):
if 'auth' in request.meta and request.meta['auth'] is False:
return
if spider.auth_failed:
spider.crawler.engine.close_spider(spider, 'auth_failed')
if spider.access_token_request_failed:
spider.crawler.engine.close_spider(spider, 'access_token_request_failed')
raise IgnoreRequest("Max attempts to get an access token reached. Stopping crawl...")
request.headers['Authorization'] = spider.access_token
if self._expires_soon(spider):
# SAVE the last request to continue after getting the token
spider.last_requests.append(request)
spider.logger.info('Saving request for after getting the token: %s', request.url)
# spider MUST implement the request_access_token method
return spider.request_access_token()
return spider.add_request_to_backlog_and_build_access_token_request(spider, request)

def process_response(self, request, response, spider):
if response.status == 401 or response.status == 429:
spider.logger.info('Time transcurred: %s', (datetime.now() - spider.start_time).total_seconds())
age = (datetime.now() - spider.access_token_scheduled_at).total_seconds()
spider.logger.info('Access token age: %ss', age)
spider.logger.info('%s returned for request to %s', response.status, request.url)
if not spider.access_token == request.headers['Authorization'] and self._expires_soon(spider):
# SAVE the last request to continue after getting the token
spider.last_requests.append(request)
spider.logger.info('Saving request for after getting the token: %s', request.url)
# spider MUST implement the request_access_token method
return spider.request_access_token()
return self.add_request_to_backlog_and_build_access_token_request(spider, request)
request.headers['Authorization'] = spider.access_token
return request
return response

def add_request_to_backlog_and_build_access_token_request(self, spider, request):
spider.requests_backlog.append(request)
spider.logger.info('Added request to backlog until token received: %s', request.url)
return spider.build_access_token_request()

@staticmethod
def _expires_soon(spider):
if spider.start_time and spider.access_token:
# The spider must implement the expires_soon method.
return spider.expires_soon(datetime.now() - spider.start_time)
if spider.access_token and spider.access_token_scheduled_at:
age = (datetime.now() - spider.access_token_scheduled_at).total_seconds()
if age < spider.access_token_maximum_age:
return False
spider.logger.info('Access token age: %ss', age)
return True


Expand Down
76 changes: 27 additions & 49 deletions kingfisher_scrapy/spiders/paraguay_dncp_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,24 +22,23 @@ class ParaguayDNCPBase(SimpleSpider):
default_from_date = '2010-01-01T00:00:00'
date_required = True

# request limits: since we can't control when Scrapy decides to send a
# request, values here are slightly less than real limits.
start_time = None
# ParaguayAuthMiddleware
access_token = None
auth_failed = False
last_requests = []
request_time_limit = 13 # in minutes
base_url = 'https://contrataciones.gov.py/datos/api/v3/doc'
auth_url = f'{base_url}/oauth/token'
request_token = None
access_token_scheduled_at = None
# The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler.
access_token_maximum_age = 13 * 60
access_token_request_failed = False
requests_backlog = []

# Local
max_attempts = 10
url_prefix = 'https://contrataciones.gov.py/datos/api/v3/doc/'

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)

spider.request_token = crawler.settings.get('KINGFISHER_PARAGUAY_DNCP_REQUEST_TOKEN')

if spider.request_token is None:
raise MissingEnvVarError('KINGFISHER_PARAGUAY_DNCP_REQUEST_TOKEN is not set.')

Expand All @@ -50,7 +49,7 @@ def start_requests(self):
yield self.build_request(
url,
formatter=parameters('fecha_desde'),
# send duplicate requests when the token expired and in the continuation of last_requests saved.
# send duplicate requests when the token expired and in the continuation of requests_backlog saved.
dont_filter=True,
callback=self.parse_pages
)
Expand All @@ -68,25 +67,24 @@ def urls_builder(self):
start_date = end_date - interval
# We request active/complete tenders and planned ones separately to ensure we don't exceed the 10000
# results per request limit.
url_base = f'{self.base_url}/search/processes?fecha_desde={start_date.strftime(self.date_format)}-04:00' \
f'&fecha_hasta={end_date.strftime(self.date_format)}-04:00&items_per_page=10000 '
url_base = f'{self.url_prefix}search/processes?fecha_desde={start_date.strftime(self.date_format)}' \
f'-04:00&fecha_hasta={end_date.strftime(self.date_format)}-04:00&items_per_page=10000 '
# We request the active or successful tenders by using the "publicacion_llamado" filter.
url_tender = f'{url_base}&tipo_fecha=publicacion_llamado'
# And the planned ones with the "fecha_release" and tender.id=planned filters.
url_planning = f'{url_base}&tender.id=planned&tipo_fecha=fecha_release'
end_date = start_date - timedelta(seconds=1)
yield from [url_tender, url_planning]

def request_access_token(self):
""" Requests a new access token """
attempt = 0
self.start_time = datetime.now()
def build_access_token_request(self, attempt=0):
self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts)

self.access_token_scheduled_at = datetime.now()

return scrapy.Request(
self.auth_url,
f'{url_prefix}oauth/token',
method='POST',
headers={'accept': 'application/json', 'Content-Type': 'application/json'},
headers={'Accept': 'application/json', 'Content-Type': 'application/json'},
body=json.dumps({'request_token': self.request_token}),
meta={'attempt': attempt + 1, 'auth': False},
callback=self.parse_access_token,
Expand All @@ -96,43 +94,32 @@ def request_access_token(self):

def parse_access_token(self, response):
if self.is_http_success(response):
r = response.json()
token = r.get('access_token')
token = response.json().get('access_token')
if token:
self.logger.info('New access token: %s', token)
self.access_token = token
# continue scraping where it stopped after getting the token
while self.last_requests:
yield self.last_requests.pop(0)
while self.requests_backlog:
yield self.requests_backlog.pop(0)
else:
attempt = response.request.meta['attempt']
if attempt == self.max_attempts:
self.logger.error('Max attempts to get an access token reached.')
self.auth_failed = True
self.access_token_request_failed = True
raise AccessTokenError()
else:
self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts)
yield scrapy.Request(
self.auth_url,
method='POST',
headers={'accept': 'application/json', 'Content-Type': 'application/json'},
body=json.dumps({'request_token': self.request_token}),
meta={'attempt': attempt + 1, 'auth': False},
callback=self.parse_access_token,
dont_filter=True,
priority=1000
)
yield self.build_access_token_request(attempt=attempt)
else:
self.logger.error('Authentication failed. Status code: %s', response.status)
self.auth_failed = True
self.access_token_request_failed = True
raise AccessTokenError()

@handle_http_error
def parse_pages(self, response):
content = response.json()
for url in self.get_files_to_download(content):
data = response.json()
for url in self.get_files_to_download(data):
yield self.build_request(url, formatter=components(-1), dont_filter=True)
pagination = content['pagination']
pagination = data['pagination']
if pagination['current_page'] < pagination['total_pages']:
page = pagination['current_page'] + 1
url = replace_parameters(response.request.url, page=page)
Expand All @@ -144,14 +131,5 @@ def parse_pages(self, response):
)

@abstractmethod
def get_files_to_download(self, content):
def get_files_to_download(self, data):
pass

def expires_soon(self, time_diff):
""" Tells if the access token will expire soon (required by
ParaguayAuthMiddleware)
"""
if time_diff.total_seconds() < ParaguayDNCPBase.request_time_limit * 60:
return False
self.logger.info('Time_diff: %s', time_diff.total_seconds())
return True
6 changes: 3 additions & 3 deletions kingfisher_scrapy/spiders/paraguay_dncp_records.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,6 @@ class ParaguayDNCPRecords(ParaguayDNCPBase):
# SimpleSpider
data_type = 'record_package'

def get_files_to_download(self, content):
for record in content['records']:
yield f"{self.base_url}/ocds/record/{record['ocid']}"
def get_files_to_download(self, data):
for record in data['records']:
yield f"{self.url_prefix}/ocds/record/{record['ocid']}"
4 changes: 2 additions & 2 deletions kingfisher_scrapy/spiders/paraguay_dncp_releases.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class ParaguayDNCPReleases(ParaguayDNCPBase):
# SimpleSpider
data_type = 'release_package'

def get_files_to_download(self, content):
for record in content['records']:
def get_files_to_download(self, data):
for record in data['records']:
for release in record['releases']:
yield release['url']
83 changes: 36 additions & 47 deletions kingfisher_scrapy/spiders/paraguay_hacienda.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,22 @@ class ParaguayHacienda(BaseSpider):
'CONCURRENT_REQUESTS': 1,
}

start_time = None
# BaseSpider
dont_truncate = True

# ParaguayAuthMiddleware
access_token = None
auth_failed = False
last_request = None
access_token_scheduled_at = None
# The maximum age is less than the API's limit, since we don't precisely control Scrapy's scheduler.
access_token_maximum_age = 14 * 60
access_token_request_failed = False
requests_backlog = []

# Local
max_attempts = 5
base_list_url = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/pagos/cdp?page={}'
url_prefix = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/'
list_url_prefix = f'{url_prefix}pagos/cdp?page='
release_ids = []
request_time_limit = 14.0
data_type = 'release_package'
dont_truncate = True

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
Expand All @@ -55,27 +61,28 @@ def start_requests(self):
# Paraguay Hacienda has a service that return all the ids that we need to get the releases packages
# so we first iterate over this list that is paginated
yield self.build_request(
self.base_list_url.format(1),
f'{self.list_url_prefix}1',
formatter=parameters('page'),
meta={
'meta': True,
'first': True,
},
# send duplicate requests when the token expired and in the continuation of last_request saved.
# send duplicate requests when the token expired and in the continuation of requests_backlog saved.
dont_filter=True,
)

@handle_http_error
def parse(self, response):
package_url_prefix = f'{url_prefix}ocds/release-package/'

data = response.json()
pattern = 'https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/ocds/release-package/{}'

# If is the first URL, we need to iterate over all the pages to get all the process ids to query
if response.request.meta['first']:
total = data['meta']['totalPages']
for page in range(2, total + 1):
yield self.build_request(
self.base_list_url.format(page),
f'{self.list_url_prefix}{page}',
formatter=parameters('page'),
meta={
'meta': True,
Expand All @@ -92,7 +99,7 @@ def parse(self, response):
if row['idLlamado'] and row['idLlamado'] not in self.release_ids:
self.release_ids.append(row['idLlamado'])
yield self.build_request(
pattern.format(row['idLlamado']),
f'{package_url_prefix}{row["idLlamado"]}',
formatter=components(-1),
meta={
'meta': False,
Expand All @@ -101,20 +108,21 @@ def parse(self, response):
dont_filter=True
)
else:
yield self.build_file_from_response(response, data_type=self.data_type)
yield self.build_file_from_response(response, data_type='release_package')

def request_access_token(self):
""" Requests a new access token """
attempt = 0
self.start_time = datetime.now()
def build_access_token_request(self, body=None, attempt=0):
self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts)
payload = {"clientSecret": self.client_secret}

if body is None:
body = json.dumps({"clientSecret": self.client_secret})

self.access_token_scheduled_at = datetime.now()

return scrapy.Request(
"https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/auth/token",
f'{url_prefix}auth/token',
method='POST',
headers={"Authorization": self.request_token, "Content-Type": "application/json"},
body=json.dumps(payload),
headers={'Authorization': self.request_token, 'Content-Type': 'application/json'},
body=body,
meta={'attempt': attempt + 1, 'auth': False},
callback=self.parse_access_token,
dont_filter=True,
Expand All @@ -123,41 +131,22 @@ def request_access_token(self):

def parse_access_token(self, response):
if self.is_http_success(response):
r = response.json()
token = r.get('accessToken')
token = response.json().get('accessToken')
if token:
self.logger.info('New access token: %s', token)
self.access_token = 'Bearer ' + token
self.access_token = f'Bearer {token}'
# continue scraping where it stopped after getting the token
yield self.last_request
while self.requests_backlog:
yield self.requests_backlog.pop(0)
else:
attempt = response.request.meta['attempt']
if attempt == self.max_attempts:
self.logger.error('Max attempts to get an access token reached.')
self.auth_failed = True
self.access_token_request_failed = True
raise AccessTokenError()
else:
self.logger.info('Requesting access token, attempt %s of %s', attempt + 1, self.max_attempts)
yield scrapy.Request(
"https://datos.hacienda.gov.py:443/odmh-api-v1/rest/api/v1/auth/token",
method='POST',
headers={"Authorization": self.request_token, "Content-Type": "application/json"},
body=response.request.body,
meta={'attempt': attempt + 1, 'auth': False},
callback=self.parse_access_token,
dont_filter=True,
priority=1000
)
yield self.build_access_token_request(response.request.body, attempt=attempt)
else:
self.logger.error('Authentication failed. Status code: %s', response.status)
self.auth_failed = True
self.access_token_request_failed = True
raise AccessTokenError()

def expires_soon(self, time_diff):
""" Tells if the access token will expire soon (required by
ParaguayAuthMiddleware)
"""
if time_diff.total_seconds() < ParaguayHacienda.request_time_limit * 60:
return False
self.logger.info('Time_diff: %s', time_diff.total_seconds())
return True

0 comments on commit b59a03f

Please sign in to comment.