In [1]:
import os
import shutil
import requests
import json
import time
from datetime import datetime
import pandas as pd
from dask.distributed import Client, progress

In [2]:
pango_url = 'https://raw.githubusercontent.com/cov-lineages/pango-designation/master/lineage_notes.txt'

In [3]:
pango = pd.read_csv(pango_url, sep='\t', skiprows=1, dtype=str, names=['lineage', 'description'])

In [4]:
pango['lineage'] = pango['lineage'].str.strip()

In [5]:
pango.head()

Unnamed: 0,lineage,description
0,A,One of the two original haplotypes of the pand...
1,A.1,USA lineage
2,A.2,Mostly Spanish lineage now includes South and ...
3,A.2.2,Australian lineage
4,A.2.3,Scottish lineage


In [6]:
pango = pango[1:100]
pango.shape

(99, 2)

In [7]:
client = Client(n_workers=4, threads_per_worker=1, memory_limit="4 GiB")
client

distributed.diskutils - INFO - Found stale lock file and directory '/Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-wasw_g6b', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-0k7_x4il', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-jt0apznb', purging
distributed.diskutils - INFO - Found stale lock file and directory '/Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-8znowaii', purging


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:50766,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:50781,Total threads: 1
Dashboard: http://127.0.0.1:50783/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50772,
Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-wy2m8cnv,Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-wy2m8cnv

0,1
Comm: tcp://127.0.0.1:50780,Total threads: 1
Dashboard: http://127.0.0.1:50782/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50769,
Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-bfs7e_4s,Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-bfs7e_4s

0,1
Comm: tcp://127.0.0.1:50786,Total threads: 1
Dashboard: http://127.0.0.1:50787/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50771,
Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-_8r832jz,Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-_8r832jz

0,1
Comm: tcp://127.0.0.1:50777,Total threads: 1
Dashboard: http://127.0.0.1:50778/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:50770,
Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-6eqt7pum,Local directory: /Users/Peter/GitRepositories/lineages/notebooks/dask-worker-space/worker-6eqt7pum


In [8]:
def query(record):
    # get articles ids for specific lineage:
    lineage = record.get('lineage')
    url = requests.get(f'https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=(%22{lineage}%22%20AND%20(%22SARS-CoV-2%22%20OR%20%22COVID-19%22)%20AND%20(%22lineage%22%20OR%20%22lineages%22%20OR%20%22strain%22%20OR%20%22strains%22%20OR%20%22variants%22%20OR%20%22variants%22))%20AND%20(FIRST_PDATE:%5b2020-01-01%20)%20AND%20HAS_FT:y%20AND%20%20sort_date:y&resultType=idlist&pageSize=1000&format=json&cursorMark=*')
    text = url.text
    results = json.loads(text)['resultList']['result']
    ids = list(map(lambda x: x['fullTextIdList']['fullTextId'][0], results))
    return {'lineage': lineage, 'ids': ids}

In [9]:
import dask.bag as db

b = db.from_sequence(pango.to_dict('records'))
b.take(1)

({'lineage': 'A.1', 'description': 'USA lineage'},)

In [10]:
start = time.time()
result = b.map(lambda record: query(record))
print(result.compute())
end = time.time()
print('Total time:', end-start)

[{'lineage': 'A.1', 'ids': ['PMC8725908', 'PMC8725896', 'PMC9129230', 'PMC9120130', 'PMC9121641', 'PMC9117272', 'PMC9113741', 'PMC9106700', 'PMC9097882', 'PMC9088089', 'PMC9090742', 'PMC9085353', 'PMC9083482', 'PMC9080054', 'PMC9076030', 'PMC9069976', 'PMC9114873', 'PMC9065598', 'PMC9108166', 'PMC9057982', 'PMC9061721', 'PMC9113349', 'PMC9101745', 'PMC9089897', 'PMC9040489', 'PMC9035363', 'PMC9034743', 'PMC9089791', 'PMC9020587', 'PMC9015644', 'PMC9012253', 'PMC9005225', 'PMC9025891', 'PMC8994061', 'PMC9088647', 'PMC8977517', 'PMC8978495', 'PMC8971476', 'PMC8967087', 'PMC9044950', 'PMC9005350', 'PMC9031820', 'PMC8986086', 'PMC8954379', 'PMC8930394', 'PMC8923830', 'PMC8927725', 'PMC8923340', 'PMC8920075', 'PMC8917961', 'PMC8917962', 'PMC8908544', 'PMC8958014', 'PMC8983140', 'PMC8920961', 'PMC8856871', 'PMC8914012', 'PMC8983041', 'PMC8855624', 'PMC8849905', 'PPR453627', 'PMC8841208', 'PMC8839799', 'PPR452891', 'PMC8840742', 'PPR450180', 'PMC8967715', 'PMC8800542', 'PMC8800493', 'PMC87864