In [39]:
from collections import defaultdict
import gzip
import json
import os
from pathlib import Path
from pprint import pprint

from elasticsearch import Elasticsearch
from elasticsearch.helpers import streaming_bulk
from elasticsearch.helpers.errors import BulkIndexError
from pymongo import MongoClient
import requests
from toolz import assoc, dissoc
from tqdm.notebook import tqdm

In [None]:
import gzip
with gzip.open('ADS_DOI.tsv.gz') as f:
    ads_dois = [line.decode().strip().split("\t")[-1] for line in f]

In [None]:
print(f"{len(ads_dois):,} DOIs in ADS. Example: '{ads_dois[0]}'")

In [None]:
base_uri = "https://api.openalex.org"
headers = {"User-Agent": "mailto:donny@polyneme.xyz"}

In [None]:
n_total = next(grp for grp in requests.get(
    base_uri + "/works",
    params={"group_by": "has_doi"},
    headers=headers,
).json()["group_by"] if grp["key"] == 'true')['count']
print(f"{n_total:,}", "DOIs in OpenAlex, so ratio of ADS/OpenAlex totals is",
      f"{len(ads_dois)/n_total:.1%}.",)

## OpenAlex-snapshot coverage of ADS DOIs

In [None]:
!du -hs /data/openalex-snapshot/

In [None]:
!head /data/openalex-snapshot/RELEASE_NOTES.txt

In [None]:
works_parts = sorted(Path("/data/openalex-snapshot/data/works/").glob("**/*.gz"))
len(works_parts)

In [None]:
with open("/data/openalex-snapshot/data/works/manifest") as f:
    manifest = json.load(f)
    record_count = manifest["meta"]["record_count"]
record_count

In [None]:
ads_dois_set = set(ads_dois)

In [None]:
client = MongoClient(host="host.docker.internal")

In [None]:
docs = []

client.openalex.drop_collection("works")

pbar = tqdm(total=record_count)
for part in works_parts:
    with gzip.open(part, 'rb') as f:
        for line in f:
            doc = json.loads(line)
            if isinstance(doc.get("doi"), str) and doc["doi"][16:] in ads_dois_set:
                docs.append(doc)
            pbar.update(1)
            if len(docs) == 10000:
                print("dumping...")
                client.openalex.works.insert_many(docs)
                docs = []

# Indexing in ElasticSearch

In [2]:
# Password for the 'elastic' user generated by Elasticsearch
ELASTIC_PASSWORD = os.environ.get("ELASTIC_PASSWORD")

# Create the client instance
client = Elasticsearch(
    "https://es01:9200",
    # docker cp nmdc-elasticsearch_es01_1:/usr/share/elasticsearch/config/certs/ca/ca.crt ./repl/
    ca_certs="ca.crt",
    basic_auth=("elastic", ELASTIC_PASSWORD)
)

client.info().body

{'name': 'es01',
 'cluster_name': 'query-eval-elasticsearch-cluster',
 'cluster_uuid': 'stwEBCc6QouWYh3vev-xzA',
 'version': {'number': '8.5.2',
  'build_flavor': 'default',
  'build_type': 'docker',
  'build_hash': 'a846182fa16b4ebfcc89aa3c11a11fd5adf3de04',
  'build_date': '2022-11-17T18:56:17.538630285Z',
  'build_snapshot': False,
  'lucene_version': '9.4.1',
  'minimum_wire_compatibility_version': '7.17.0',
  'minimum_index_compatibility_version': '7.0.0'},
 'tagline': 'You Know, for Search'}

In [3]:
print(client.cat.health())

1669757123 21:25:23 query-eval-elasticsearch-cluster green 3 3 28 14 0 0 0 0 - 100.0%



In [4]:
mdb = MongoClient(host="host.docker.internal")["openalex"]

In [6]:
index_name = 'works'

if client.indices.exists(index=index_name):
    client.indices.delete(index=index_name)

with open(f"{index_name}.json") as f:
    index_body = json.load(f)["template"]

client.indices.create(index=index_name, **index_body)

ObjectApiResponse({'acknowledged': True, 'shards_acknowledged': True, 'index': 'works'})

In [45]:
def generate_actions():
    for d in mdb.works.find():
        action = dict(**dissoc(d, "_id"), _id=d["id"], _index=index_name)
        aii = {}
        action["abstract_inverted_index"] = action["abstract_inverted_index"] or {}
        for token, positions in action["abstract_inverted_index"].items():
            for p in positions:
                aii[p] = token
        action["abstract_inverted_index"] = aii
        yield action

In [None]:
print("Indexing documents...")
number_of_docs = mdb.works.estimated_document_count()
pbar = tqdm(unit="docs", total=number_of_docs)
successes = 0
errors = None
try:
    for ok, action in streaming_bulk(
        client=client, index="works", actions=generate_actions(),
    ):
        pbar.update(1)
        successes += ok
except BulkIndexError as e:
    errors = e
print("Indexed %d/%d documents" % (successes, number_of_docs))

Indexing documents...


  0%|          | 0/8440000 [00:00<?, ?docs/s]

In [42]:
for e in errors.errors:
    pprint(dissoc(e["index"], "data"))

{'_id': 'https://openalex.org/W4220743981',
 '_index': 'works',
 'error': {'caused_by': {'reason': 'field name cannot contain only dots',
                         'type': 'illegal_argument_exception'},
           'reason': 'failed to parse field [abstract_inverted_index] of type '
                     '[flattened] in document with id '
                     "'https://openalex.org/W4220743981'. Preview of field's "
                     "value: 'null'",
           'type': 'mapper_parsing_exception'},
 'status': 400}
{'_id': 'https://openalex.org/W4220851147',
 '_index': 'works',
 'error': {'caused_by': {'reason': 'field name cannot contain only dots',
                         'type': 'illegal_argument_exception'},
           'reason': 'failed to parse field [abstract_inverted_index] of type '
                     '[flattened] in document with id '
                     "'https://openalex.org/W4220851147'. Preview of field's "
                     "value: 'null'",
           'type': 'mapper