Skip to content

Commit

Permalink
Merge pull request #341 from inspirehep/improve-logging
Browse files Browse the repository at this point in the history
pipeline: addition of structlog
  • Loading branch information
drjova committed Oct 11, 2023
2 parents cdafeb1 + 5c3c99d commit 8cdbdd0
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 14 deletions.
60 changes: 46 additions & 14 deletions hepcrawl/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,22 @@
import os
from six.moves.urllib.parse import urlparse

import shutil
import pprint
import logging

import requests
from inspire_utils.record import get_value
from scrapy import Request

from scrapy.pipelines.files import FilesPipeline
from scrapy.utils.project import get_project_settings
import logging
import pprint
import requests
import shutil
import structlog

from .api import CrawlResult
from .settings import FILES_STORE
from .utils import RecordFile


LOGGER = logging.getLogger(name=__name__)

STRUCT_LOGGER = structlog.get_logger()

class DocumentsPipeline(FilesPipeline):
"""Download all the documents the record passed to download.
Expand Down Expand Up @@ -104,6 +103,11 @@ def __init__(self):

def open_spider(self, spider):
self.results_data = []
self.logger = STRUCT_LOGGER.bind(
name="Harvesting pipeline",
spider=spider.name,
scrape_job=os.environ.get('SCRAPY_JOB'),
)

def process_item(self, item, spider):
"""Add the crawl result to the results data after processing it.
Expand All @@ -130,6 +134,19 @@ def process_item(self, item, spider):
)
crawl_result = CrawlResult.from_parsed_item(item).to_dict()
self.results_data.append(crawl_result)
ids = []
titles = get_value(crawl_result, 'titles.title', default=[])
dois = get_value(crawl_result, 'dois.value', default=[])
arxiv_eprints = get_value(crawl_result, 'arxiv_eprints.value', default=[])
report_numbers = get_value(crawl_result, 'report_numbers.value', default=[])

self.logger.info(
'Processing item.',
titles=titles,
dois=dois,
arxiv_eprints=arxiv_eprints,
report_numbers=report_numbers,
)
return crawl_result

def _prepare_payload(self, spider):
Expand All @@ -145,18 +162,20 @@ def _prepare_payload(self, spider):
) for result in self.results_data
]
if spider.state.get('errors'):
errors = [
{'exception': str(err['exception']), 'sender':str(err['sender'])}
for err in spider.state['errors']
]
payload_list.append(
dict(
job_id=os.environ['SCRAPY_JOB'],
results_uri=os.environ['SCRAPY_FEED_URI'],
results_data=[],
log_file=None,
errors=[
{'exception': str(err['exception']), 'sender':str(err['sender'])}
for err in spider.state['errors']
]
errors=errors,
)
)
self.logger.error("Error.", errors=errors)
return payload_list

@staticmethod
Expand Down Expand Up @@ -212,6 +231,8 @@ def open_spider(self, spider):

))
super(InspireCeleryPushPipeline, self).open_spider(spider=spider)
self.logger.info('Start processing.')


def close_spider(self, spider):
"""Post results to BROKER API."""
Expand All @@ -236,16 +257,27 @@ def close_spider(self, spider):
logger.info('Triggering celery task: %s.', task_endpoint)

for kwargs in self._prepare_payload(spider):
self.logger.info(
'Finish Processing.',
number_of_results=self.count,
)
logger.debug(
' Sending results:\n %s',
pprint.pformat(kwargs),
)

res = self.celery.send_task(task_endpoint, kwargs=kwargs)
celery_task_info_payload = {
'celery_task_id': res.id,
'scrapy_job_id': os.environ.get('SCRAPY_JOB')
}
logger.info('Sent celery task %s', pprint.pformat(celery_task_info_payload))

self.logger.info(
'Celery task sent.',
celery_task_id=res.id,
)
else:
self.logger.info(
'No results.',
number_of_results=self.count,
)
self._cleanup(spider)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
'pylatexenc~=2.9',
'queuelib==1.5.0',
'sentry-sdk==1.3.0',
'structlog==20.1.0',
]

tests_require = [
Expand Down

0 comments on commit 8cdbdd0

Please sign in to comment.