-
Notifications
You must be signed in to change notification settings - Fork 30
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
17 changed files
with
3,209 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,190 @@ | ||
import glob | ||
import os | ||
import shutil | ||
import tempfile | ||
import xml.etree.ElementTree as et | ||
import zipfile | ||
|
||
import boto3 | ||
import requests | ||
|
||
from ..parsers import ElsevierParser | ||
from ..utils import ParsedItem, strict_kwargs | ||
|
||
|
||
class ElsevierSpider: | ||
def __init__( | ||
self, | ||
acces_key_id, | ||
secret_access_key, | ||
packages_bucket_name, | ||
files_bucket_name, | ||
elsevier_consyn_key, | ||
s3_host="https://s3.cern.ch", | ||
): | ||
self.access_key_id = acces_key_id | ||
self.secret_access_key = secret_access_key | ||
self.packages_bucket_name = packages_bucket_name | ||
self.files_bucket_name = files_bucket_name | ||
self.elsevier_consyn_key = elsevier_consyn_key | ||
self.new_packages = set() | ||
self.new_xml_files = set() | ||
self.s3_host = s3_host | ||
|
||
if not ( | ||
self.access_key_id, | ||
self.secret_access_key, | ||
self.packages_bucket_name, | ||
self.files_bucket_name, | ||
): | ||
raise Exception("Missing parametrs necessary to establish s3 connection") | ||
else: | ||
self.s3_connection = self.create_s3_connection() | ||
self.s3_packages_bucket_conn = self.s3_bucket_connection( | ||
self.packages_bucket_name | ||
) | ||
self.s3_files_bucket_conn = self.s3_bucket_connection( | ||
self.files_bucket_name | ||
) | ||
|
||
def create_s3_connection(self): | ||
config = { | ||
"aws_access_key_id": self.access_key_id, | ||
"aws_secret_access_key": self.secret_access_key, | ||
} | ||
session = boto3.Session( | ||
aws_access_key_id=config.get("aws_access_key_id"), | ||
aws_secret_access_key=config.get("aws_secret_access_key"), | ||
) | ||
s3 = session.resource("s3", endpoint_url=self.s3_host) | ||
return s3 | ||
|
||
def s3_bucket_connection(self, bucket_name): | ||
bucket_connection = self.s3_connection.Bucket(bucket_name) | ||
return bucket_connection | ||
|
||
def _get_keys_names_from_bucket(self): | ||
keys = set([key.key for key in self.s3_packages_bucket_conn.objects.all()]) | ||
return keys | ||
|
||
def _download_elsevier_metadata(self): | ||
elsevier_batch_download_url = ( | ||
"https://consyn.elsevier.com/batch/atom?key=" + self.elsevier_consyn_key | ||
) | ||
packages_metadata = requests.get(elsevier_batch_download_url) | ||
return packages_metadata | ||
|
||
def _get_package_urls_from_elsevier(self): | ||
""" | ||
Extracts names and urls of the zip packages from elsevier batch feed | ||
Returns: | ||
dict(name: url): dict of zip packages names and urls | ||
""" | ||
packages_metadata = self._download_elsevier_metadata() | ||
packages_metadata_parsed = et.fromstring(packages_metadata.text) | ||
urls_for_packages = {} | ||
for children in packages_metadata_parsed.getchildren(): | ||
if "entry" in children.tag: | ||
file_data = children.getchildren() | ||
link = file_data[1].attrib["href"] | ||
urls_for_packages[file_data[0].text] = link | ||
return urls_for_packages | ||
|
||
def _get_all_new_packages(self): | ||
""" | ||
Checks which packages from elsevier batch feed are not in the s3 bucket yet | ||
Returns: | ||
dict(name: url): dict of zip packages names and urls | ||
""" | ||
urls_for_packages = self._get_package_urls_from_elsevier() | ||
bucket_data = self._get_keys_names_from_bucket() | ||
packages_not_in_bucket = { | ||
name: urls_for_packages[name] | ||
for name in urls_for_packages.keys() - bucket_data | ||
} | ||
self.new_packages = set(packages_not_in_bucket.keys()) | ||
return packages_not_in_bucket | ||
|
||
def populate_s3_bucket_with_elsevier_packages(self): | ||
""" | ||
Uploads to s3 bucket new zip folders containing xml-s for elsevier articles | ||
""" | ||
for name, url in self._get_all_new_packages().items(): | ||
if name.lower().endswith("zip"): | ||
request = requests.get(url, stream=True) | ||
request_data = request.raw | ||
self.s3_packages_bucket_conn.upload_fileobj(request_data, name) | ||
|
||
@staticmethod | ||
def _get_doi_for_xml_file(xml_file): | ||
parser = ElsevierParser(xml_file) | ||
doi = parser.get_identifier() | ||
return doi | ||
|
||
def extract_zip_packages_to_s3(self): | ||
""" | ||
Extracts the files from zip folders downloaded from elsevier and | ||
uploads them with a correct name (article doi) to the correct s3 bucket | ||
Yields: | ||
HEP records | ||
""" | ||
for package in self.new_packages: | ||
tempdir = tempfile.mkdtemp() | ||
self.s3_packages_bucket_conn.download_file( | ||
package, | ||
"{tempdir}/{package}.zip".format( | ||
tempdir=tempdir, package=package.lstrip(".ZIP") | ||
), | ||
) | ||
with zipfile.ZipFile( | ||
"{tempdir}/{package}.zip".format( | ||
tempdir=tempdir, package=package.lstrip(".ZIP") | ||
), | ||
"r", | ||
) as zip_package: | ||
zip_package.extractall(tempdir) | ||
for file in glob.iglob( | ||
"{tempdir}/**/*.xml".format(tempdir=tempdir), recursive=True | ||
): | ||
if file.endswith(".xml"): | ||
with open(file) as f: | ||
elsevier_xml = f.read() | ||
file_doi = self._get_doi_for_xml_file(elsevier_xml) | ||
self.s3_files_bucket_conn.upload_file( | ||
file, "{file_doi}.xml".format(file_doi=file_doi) | ||
) | ||
self.new_xml_files.add("{file_doi}.xml".format(file_doi=file_doi)) | ||
shutil.rmtree(tempdir) | ||
|
||
def parse_items_from_s3(self, new=True): | ||
""" | ||
Parse xml files in the s3 bucket | ||
Yields: | ||
HEP records | ||
""" | ||
tempdir = tempfile.mkdtemp() | ||
if new: | ||
files_to_parse = self.new_xml_files | ||
else: | ||
files_to_parse = [ | ||
key.key for key in self.s3_files_bucket_conn.objects.all() | ||
] | ||
for filename in files_to_parse: | ||
file_path = "{tempdir}/{filename}".format( | ||
tempdir=tempdir, filename=filename.replace("/", "_") | ||
) | ||
self.s3_files_bucket_conn.download_file(filename, file_path) | ||
with open(file_path) as f: | ||
elsevier_xml = f.read() | ||
yield self.parse_record(elsevier_xml) | ||
shutil.rmtree(tempdir) | ||
|
||
@staticmethod | ||
def parse_record(selector): | ||
"""Parse an elsevier XML exported file into a HEP record.""" | ||
parser = ElsevierParser(selector) | ||
|
||
return ParsedItem(record=parser.parse(), record_format="hep",) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
1 change: 1 addition & 0 deletions
1
tests/functional/elsevier/fixtures/elsevier/parsed_records/j.geomphys.2020.103892.xml
Large diffs are not rendered by default.
Oops, something went wrong.
Oops, something went wrong.