# Downloads Publication Information for PANGO Lineages from the CORD-19 Data Set
**[Work in progress]**

This notebook text-mines [PANGO lineage](https://cov-lineages.org/) mentions in the titles and abstracts of publications and preprints from the CORD-19 data set. Note, the text-mined results may contain false positive!

Data sources: [PANGO Lineage Designations](https://github.com/cov-lineages/pango-designation), 
[CORD-19](https://allenai.org/data/cord-19)

References:

Rambaut A, et al., A dynamic nomenclature proposal for SARS-CoV-2 lineages to assist genomic epidemiology(2020) Nature Microbiology [doi:10.1038/s41564-020-0770-5](https://doi.org/10.1038/s41564-020-0770-5).

Lucy Lu Wang, et al., CORD-19: The COVID-19 Open Research Dataset (2020) [arXiv:2004.10706v4](https://arxiv.org/abs/2004.10706).

Author: Peter Rose (pwrose@ucsd.edu)

In [1]:
import os
import pandas as pd
import io
import dateutil
import re
from pathlib import Path
import nltk
import json, requests
from urllib.request import urlopen
from xml.etree.ElementTree import parse
import urllib
import time
import numpy as np

In [2]:
pd.options.display.max_rows = None  # display all rows
pd.options.display.max_columns = None  # display all columsns

In [3]:
NEO4J_IMPORT = "/Users/lyt/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-a1516f46-b63a-46dd-b67a-1fb59d6c5d05/import"#Path(os.getenv('NEO4J_IMPORT'))
print(NEO4J_IMPORT)

/Users/lyt/Library/Application Support/Neo4j Desktop/Application/relate-data/dbmss/dbms-a1516f46-b63a-46dd-b67a-1fb59d6c5d05/import


## Get PANGO lineages

In [4]:
pango = pd.read_csv(NEO4J_IMPORT + "/00b-PANGOLineage.csv", dtype=str)

In [5]:
pango.sample(5)

Unnamed: 0,lineage,description,alias,predecessor,l0,l1,l2,l3,levels
1402,AY.4.9,"Alias of B.1.617.2.4.9, European lineage, from...",B.1.617.2.4.9,B.1.617.2.4,AY.4.9,AY.4,AY,,3
1245,B.1.487,USA - TX,,,B.1.487,B.1,B,,3
1553,AY.91,"Alias of B.1.617.2.91, lineage in South Africa...",B.1.617.2.91,B.1.617.2,AY.91,AY,,,2
569,B.1.1.482,US Lineage,,,B.1.1.482,B.1.1,B.1,B,4
1907,B.1.200,Withdrawn: Reassigned in the current tree. Pre...,,,B.1.200,B.1,B,,3


In [6]:
lineages = pango['lineage'].unique()

In [7]:
pattern1 = re.compile(' [A-Z]{1,2}[.]\d+ ', re.IGNORECASE)
pattern2 = re.compile(' [A-Z]{1,2}[.]\d+[.]\d+ ', re.IGNORECASE)
pattern3 = re.compile(' [A-Z]{1,2}[.]\d+[.]\d+[.]+\d+ ', re.IGNORECASE)

# add WHO lineage
who_lineage = [' Alpha ', ' Beta ', ' Gamma ', ' Epsilon ',' Zeta ', ' Eta ', ' Theta  ',\
               ' Iota ', ' Kappa ', ' Lambda ', ' Mu ']
pattern4 = re.compile("|".join(who_lineage), re.IGNORECASE)

In [8]:
# add who to lineages
lineages = np.append(lineages, who_lineage)

In [9]:
# remove A B
lineages = np.delete(lineages, np.where(lineages == 'A'))
lineages = np.delete(lineages, np.where(lineages == 'B'))

## Get CORD-19 Metadata

In [10]:
CACHE = Path(NEO4J_IMPORT +'/cache/cord19/2022-03-31/metadata.csv')

In [11]:
metadata = pd.read_csv(CACHE, dtype='str')

In [12]:
metadata.fillna('', inplace=True)
#convert datetime column to just date
metadata['year'] = metadata['publish_time'].apply(lambda d: d[:4] if len(d) > 4 else '')
metadata['date'] = metadata['publish_time'].apply(lambda d: dateutil.parser.parse(d) if len(d) > 0 else '')

  after removing the cwd from sys.path.


In [13]:
print("Total number of papers", metadata.shape[0])

Total number of papers 992921


## Extract a list of PANGO lineages

Remove special characters to simply parsing for lineages in parenthesis, comma-separated lists, etc.

In [14]:
metadata['title'] = metadata['title'].replace('[()/,]', ' ', regex=True)
metadata['abstract'] = metadata['abstract'].replace('[()/,]', ' ', regex=True)

Match PANGO patterns and check agains list of known lineages.

In [15]:
pattern1 = re.compile(' [A-Z]{1,2}[.]\d+ ', re.IGNORECASE)
pattern2 = re.compile(' [A-Z]{1,2}[.]\d+[.]\d+ ', re.IGNORECASE)
pattern3 = re.compile(' [A-Z]{1,2}[.]\d+[.]\d+[.]+\d+ ', re.IGNORECASE)

# add WHO lineage
who_lineage = [' Alpha ', ' Beta ', ' Gamma ', ' Epsilon ',' Zeta ', ' Eta ', ' Theta  ',\
               ' Iota ', ' Kappa ', ' Lambda ', ' Mu ']
pattern4 = re.compile("|".join(who_lineage), re.IGNORECASE)

In [16]:
# add who to lineages
lineages = np.append(lineages, who_lineage)

In [17]:
def get_lineages(row):
    text = ' ' + row.title + ' ' + row.abstract + ' '
    lin = pattern1.findall(text) + pattern2.findall(text) + pattern3.findall(text)
    u_lin = set()
    
    
    for l in lin:
        l = l.strip()
        # check if lineage is valid (e.g., not a withdrawn lineage or false positive)
        if l in lineages:
            u_lin.add(l)
            
    return ";".join(u_lin)

### Run on whole dataset

In [None]:
metadata['lineages'] = metadata.apply(get_lineages, axis=1)

Keep only papers that map to PANGO lineages

In [None]:
hits = metadata[metadata['lineages'].str.len() > 0].copy()

### Assign CURIEs from [Identifiers.org](https://identifiers.org)

In [None]:
hits['doi'] = hits['doi'].apply(lambda x: 'doi:' + x if len(x) > 0 else '')
hits['pubmed_id'] = hits['pubmed_id'].apply(lambda x: 'pubmed:' + x if len(x) > 0 else '')
hits['pmcid'] = hits['pmcid'].apply(lambda x: 'pmc:' + x if len(x) > 0 else '')
hits['arxiv_id'] = hits['arxiv_id'].apply(lambda x: 'arxiv:' + x if len(x) > 0 else '')

In [None]:
#hits.sort_values(by=['publish_time'], ascending=False, inplace=True)

In [None]:
print("Number of matches", hits.shape[0])

In [None]:
def create_id(row):
    """Creates a unique id using the most commonly available id in priority order"""
    if row.doi != '':
        return row.doi
    elif row.pubmed_id != '':
        return row.pubmed_id
    elif row.pmcid != '':
        return row.pmcid
    elif row.arxiv_id != '':
        return row.arxiv_id
    elif row.url != '':
        return row.url
    else:
        # TODO deal with WHO papers here?
        return ''

In [None]:
hits['id'] = hits.apply(create_id, axis=1)

WHO documents seem to be copies of articles that are already present in the dataset and will be ignored for now.

In [None]:
hits.query('id != ""', inplace=True)

In [None]:
print("Total number of matches", hits.shape[0])

In [None]:
hits.to_csv(NEO4J_IMPORT + "01h-CORDLineages.csv", index=False)

## Fulltext Regrex


In [18]:
# get articles ids for specific lineage
def get_ids(lineage):
    url = requests.get(f'https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=(%22{lineage}%22%20AND%20(%22SARS-CoV-2%22%20OR%20%22COVID-19%22)%20AND%20(%22lineage%22%20OR%20%22lineages%22%20OR%20%22strain%22%20OR%20%22strains%22%20OR%20%22variants%22%20OR%20%22variants%22))%20AND%20(FIRST_PDATE:%5b2020-01-01%20)%20AND%20HAS_FT:y%20AND%20%20sort_date:y&resultType=idlist&pageSize=1000&format=json&cursorMark=*')
    text = url.text
    print(text[:5])
    results = json.loads(text)['resultList']['result']
    ids = list(map(lambda x: x['fullTextIdList']['fullTextId'][0], results))
    return ids

In [19]:
# download articles in XML and return body paragraph
def download_article(article_id):
    url = f'https://www.ebi.ac.uk/europepmc/webservices/rest/{article_id}/fullTextXML'
    xmldoc = parse(urlopen(url))
    
    # get full text
    root = xmldoc.getroot()
    text = root.findall('.//p')

    # put body paragraphs together
    ptext = ""
    for p in text:
        ptext += ''.join([x for x in p.itertext()]) + '.\n' + '\n'
    return ptext

In [20]:
# get lineage for full texts
def get_full_lineage(ptext):
    # tokenize texts into sentences
    p_sentence = nltk.tokenize.sent_tokenize(ptext)
    
    # record lineages
    linset = set()
    pair = []
    for s in p_sentence:
        s1 = re.subn('[()/,]', ' ', s)[0] # remove special chars
        lin = set(pattern1.findall(s1) + pattern2.findall(s1) + pattern3.findall(s1) + pattern4.findall(s1))

        if lin: 
            for l in lin:
                # valid lineage and not recorded
                l = l.strip()
                l = l.capitalize()
                if (l in lineages) and (l not in linset): 
                    linset.add(l)
                    pair.append([l, s])
                else: continue

    
    """
    ptext = re.subn('[()/,]', ' ', ptext)[0] # remove special chars
    lin = pattern1.findall(ptext) + pattern2.findall(ptext) + pattern3.findall(ptext)
    lin_set = set(lin)
    
    record = []
    if lin_set:
        for l in lin_set:
            
            sen = re.search(r"\.?([^\.]*{}[^\.]*)".format(l), ptext).group()
            record.append([l, sen])
    """
    return pair

In [None]:
# wrap up function take lineage ids as input and output dataframe
def extract_full(ids):
    full_regrex = []
    
    if not ids:  
        return None
    for i in ids:
        try:
            body_text = download_article(i) # get body text
            record = get_full_lineage(body_text) # extract lineages in text
            [x.append(i) for x in record] # attach article id to lineage record
            full_regrex.append(pd.DataFrame(record))
        except urllib.error.HTTPError as exc:
            time.sleep(5) # wait 10 seconds and then make http request again
            continue
    df_fulltext = pd.concat(full_regrex)
    df_fulltext.columns = ['lineage', 'string', 'ID']
    return df_fulltext


    

#### test on B.1.1.7


In [None]:
lineage = 'B.1.1.7'
ids = get_ids(lineage)

full_regrex = []
for i in ids:
    try: 
        path = Path(f'{lineage}/{i}.txt')
        
        # if file not exist, get body text and save to file
        if not path.is_file():
            body_text = download_article(i)
            path.parent.mkdir(parents=True, exist_ok=True)
            with path.open("w", encoding ="utf-8") as f:
                f.write(body_text)
                f.close()
        else: # otherwise retrieve text
            body_text = path.read_text()
        
        
        record = get_full_lineage(body_text) # get lineages
        [x.append(i) for x in record] # attach article id to lineage record
        full_regrex.append(pd.DataFrame(record))
    except urllib.error.HTTPError as exc:
        time.sleep(10) # wait 10 seconds and then make http request again
        continue

fulltext_lineage = pd.concat(full_regrex)

In [None]:
fulltext_lineage.to_csv('B_1_1_7.csv',index=False, header = ['lineage', 'string contains lineage', 'ID'])

#### test on P.1

In [None]:
lineage = 'P.1'
ids = get_ids(lineage)

full_regrex = []
for i in ids:
    try: 
        path = Path(f'{lineage}/{i}.txt')
        
        # if file not exist, get body text and save to file
        if not path.is_file():
            body_text = download_article(i)
            path.parent.mkdir(parents=True, exist_ok=True)
            with path.open("w", encoding ="utf-8") as f:
                f.write(body_text)
                f.close()
        else: # otherwise retrieve text
            body_text = path.read_text()
        
        
        record = get_full_lineage(body_text) # get lineages
        [x.append(i) for x in record] # attach article id to lineage record
        full_regrex.append(pd.DataFrame(record))
    except urllib.error.HTTPError as exc:
        time.sleep(10) # wait 10 seconds and then make http request again
        continue

fulltext_lineage = pd.concat(full_regrex)

In [None]:
fulltext_lineage.to_csv('P_1.csv',index=False, header = ['lineage', 'string contains lineage', 'ID'])

### manual check possible false postives
#### B.1.1.7

In [None]:
b117 = pd.read_csv('B_1_1_7.csv')

In [None]:
b117 = b117.drop(['Unnamed: 0'],axis = 1)
b117.columns = ['lineage', 'string contains lineage', 'ID']

In [None]:
# 1. same sentence with many lineages are counted as positive, we remove those 
b117_sub = b117[~ b117.duplicated('string contains lineage',keep=False)]

In [None]:
# 2. extract articles with only one lineage, which are possibly FP
IDs = b117_sub.groupby('ID').lineage.count().loc[lambda p : p == 1].index

In [None]:
b117_sub = b117_sub.set_index('ID').loc[IDs]

In [None]:
# manual check
#print(b117_sub['string contains lineage'].str.cat(sep = '\n '))

#### P.1

In [None]:
p1 = pd.read_csv("P_1.csv")
p1_sub = p1[~ p1.duplicated('string contains lineage',keep=False)]
IDs_1 = p1_sub.groupby('ID').lineage.count().loc[lambda p : p == 1].index

In [None]:
p1_sub = p1_sub.set_index('ID').loc[IDs_1]

In [None]:
p1_sub[p1_sub['string contains lineage'].str.contains('The nucleic acid')]

## Start Generalization

In [22]:
import time

def iter_lineage(l_):
    
    #for l_ in lineage: This is redundant in mapping
     
    start = time.time()
    print(f'start {l_}')
    #get ids
    ids = get_ids(l_)
    try:
        # if lineage processed before, run on new ids
        path = Path(f'{l_}_df.csv')
        if path.is_file():
            print('exist')
            df = pd.read_csv(path)
            id_ran = set(df.ID)
            id_to_run = set(ids) - id_ran
            print(f'new id {id_to_run}')
            df_new = extract_full(id_to_run) # merge with original df
            ddf = pd.concat([df, df_new], axis = 0)
            print('done merge')

        else: # otherwise run on all ids
            id_to_run = ids
            ddf = extract_full(id_to_run)

        ddf.to_csv(f'{l_}_df.csv',index=False, \
                                    header = ['lineage', 'string', 'ID'])
        end = time.time()
        print(f'done with {l_}, time duration --- seconds --- {end - start} \n')
    except:
        id_to_run = ids
        ddf = extract_full(id_to_run)
        ddf.to_csv(f'{l_}_df.csv',index=False, \
                                    header = ['lineage', 'string', 'ID'])
        end = time.time()
        print(f'done with {l_}, time duration --- seconds --- {end - start} \n')
        


In [None]:
i1 = get_ids('A.1')
i2 = get_ids('A.2')

In [None]:
d1 = extract_full(i1[:10])
d2 = extract_full(i2[:10])

In [None]:
i1[:5],i2[:5]

In [None]:
d1.shape

In [None]:
d2.shape

In [31]:
l_10 = lineages[:10]
l_10


array(['A.1', 'A.2', 'A.2.2', 'A.2.3', 'A.2.4', 'A.2.5', 'A.2.5.1',
       'A.2.5.2', 'A.2.5.3', 'A.3'], dtype=object)

### parallel running

In [23]:
from dask.distributed import Client, progress

In [24]:
client = Client(n_workers=4, threads_per_worker=1, memory_limit="4 GiB")
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 4,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:56938,Workers: 4
Dashboard: http://127.0.0.1:8787/status,Total threads: 4
Started: Just now,Total memory: 16.00 GiB

0,1
Comm: tcp://127.0.0.1:56950,Total threads: 1
Dashboard: http://127.0.0.1:56953/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:56941,
Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-15l82zrc,Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-15l82zrc

0,1
Comm: tcp://127.0.0.1:56949,Total threads: 1
Dashboard: http://127.0.0.1:56952/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:56943,
Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-epr7irnc,Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-epr7irnc

0,1
Comm: tcp://127.0.0.1:56957,Total threads: 1
Dashboard: http://127.0.0.1:56959/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:56942,
Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-e2380nu8,Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-e2380nu8

0,1
Comm: tcp://127.0.0.1:56951,Total threads: 1
Dashboard: http://127.0.0.1:56956/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:56944,
Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-pkor2z8c,Local directory: /Users/lyt/Desktop/COVID-19/covid-19-community/notebooks/dataprep/dask-worker-space/worker-pkor2z8c


In [33]:
import dask.bag as db

b = db.from_sequence(l_10[4:5])

In [35]:
start = time.time()
result = b.map(lambda l: get_ids(l))
print(result.compute())
end = time.time()
print('Total time:', end-start)

[['PMC9205280', 'PMC9034743', 'PMC9088647', 'PMC8856871', 'PMC8876558', 'PPR435130', 'PMC8777887', 'PMC8707919', 'PPR428671', 'PPR416637', 'PMC8535385', 'PPR357574', 'PMC8189619', 'PMC8139174', 'PMC8290271', 'PMC8250244', 'PMC7238228']]
Total time: 1.6019988059997559


In [41]:
list(db.from_sequence(result.take(1)[0]))

['PMC9205280',
 'PMC9034743',
 'PMC9088647',
 'PMC8856871',
 'PMC8876558',
 'PPR435130',
 'PMC8777887',
 'PMC8707919',
 'PPR428671',
 'PPR416637',
 'PMC8535385',
 'PPR357574',
 'PMC8189619',
 'PMC8139174',
 'PMC8290271',
 'PMC8250244',
 'PMC7238228']

In [94]:
a = get_ids('A.2')

{"ver


In [95]:
len(a)

255

In [96]:
b = db.from_sequence(a)

In [98]:
b.take(3)

('PMC8725908', 'PMC8725896')

In [91]:
db.from_sequence(range(1, 101), npartitions=10)

(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

### Normal loop running

In [35]:
start = time.time()
for i in l_10[6:9]:
    iter_lineage(i)
end = time.time()

start A.2.5.1
{"ver
exist
new id set()
done merge
done with A.2.5.1, time duration --- seconds --- 2.1773431301116943 

start A.2.5.2
{"ver
exist
new id set()
done merge
done with A.2.5.2, time duration --- seconds --- 1.438655138015747 

start A.2.5.3
{"ver
exist
new id set()
done merge
done with A.2.5.3, time duration --- seconds --- 1.508655071258545 



In [36]:
print('Total time:', end-start)

Total time: 5.12507700920105


In [37]:
b = db.from_sequence(l_10[6:9])
start = time.time()
result = b.map(lambda l: iter_lineage(l))
result.compute()
end = time.time()
print('Total time:', end-start)

Total time: 4.430984973907471


### Bag lineage and run whole procude --> runtime error

In [28]:
b = db.from_sequence(l_10)
start = time.time()
result = b.map(lambda l: iter_lineage(l))
result.compute()
end = time.time()
print('Total time:', end-start)

start A.2.5.1
{"ver
done with A.2.5.1, time duration --- seconds --- 19.129374265670776 

start A.2.5.2
{"ver
done with A.2.5.2, time duration --- seconds --- 16.55321502685547 

start A.3
{"ver
done with A.3, time duration --- seconds --- 2249.48464679718 

start A.2.4
{"ver
done with A.2.4, time duration --- seconds --- 103.93581199645996 

start A.2.5
{"ver
done with A.2.5, time duration --- seconds --- 362.15064811706543 

start A.2.5.3
{"ver
done with A.2.5.3, time duration --- seconds --- 10.48508906364441 



KeyboardInterrupt: 

start A.2.2
{"ver
done with A.2.2, time duration --- seconds --- 233.4598832130432 

start A.2.3
{"ver
done with A.2.3, time duration --- seconds --- 56.44733285903931 

start A.2
{"ver
done with A.2, time duration --- seconds --- 2160.0888769626617 





start A.1
{"ver




### Break down and parallel on id

In [29]:
def parellel_iter1(l_): 
    
    #for l_ in lineage: This is redundant in mapping
     
    start = time.time()
    print(f'start {l_}')
    #get ids
    ids = get_ids(l_)

    try:
        # if lineage processed before, run on new ids
        path = Path(f'{l_}_df.csv')
        if path.is_file():
            df = pd.read_csv(path)
            id_ran = set(df.ID)
            id_to_run = set(ids) - id_ran
            if id_to_run: 
                bag_id = db.from_sequence(id_to_run)
                dds = bag_id.map(lambda i: extract_full_parallel2(i))
                df_new = dds.take(1)[0]
                ddf = pd.concat([df, df_new], axis = 0)

        else: # otherwise run on all ids
            id_to_run = ids
            bag_id = db.from_sequence(id_to_run)
            dds = bag_id.map(lambda i: extract_full_parallel2(i)) 
            ddf = dds.take(1)[0]

        ddf.to_dataframe().to_csv(f'{l_}_df.csv',index=False, \
                                    header = ['lineage', 'string', 'ID'])
        end = time.time()
        print(f'done with {l_}, time duration --- seconds --- {end - start} \n')
    except:
        id_to_run = ids
        bag_id = db.from_sequence(id_to_run)
        dds = bag_id.map(lambda i: extract_full_parallel2(i))
        ddf = dds.take(1)[0]
        ddf.to_csv(f'{l_}_df.csv',index=False, \
                                    header = ['lineage', 'string', 'ID'])
        end = time.time()
        print(f'done with {l_}, time duration --- seconds --- {end - start} \n')
        


In [99]:
def extract_full(ids):
    full_regrex = []
    
    if not ids:  
        return None
    for i in ids:
        try:
            body_text = download_article(i) # get body text
            record = get_full_lineage(body_text) # extract lineages in text
            [x.append(i) for x in record] # attach article id to lineage record
            full_regrex.append(pd.DataFrame(record))
        except urllib.error.HTTPError as exc:
            time.sleep(5) # wait 10 seconds and then make http request again
            continue
    df_fulltext = pd.concat(full_regrex)
    df_fulltext.columns = ['lineage', 'string', 'ID']
    return df_fulltext

In [67]:
# no looping
def extract_full_parallel(ids):
    #ids = dic['ids']
    full_regrex = []
    
    """if not ids:  
        return None
    for i in ids:"""
    
    try:
        body_text = download_article(ids) # get body text
        record = get_full_lineage(body_text) # extract lineages in text
        [x.append(ids) for x in record] # attach article id to lineage record
        full_regrex.append(pd.DataFrame(record))
    except urllib.error.HTTPError as exc:
        time.sleep(5) # wait 5 seconds and then make http request again
        #continue
    
    
    df_fulltext = pd.DataFrame(full_regrex)
    print(df_fulltext)
    df_fulltext.columns = ['lineage', 'string', 'ID']
    return df_fulltext

In [None]:
# looping over ids
def extract_full_parallel2(ids):
    #ids = dic['ids']
    full_regrex = []
    """
    if not ids:  
        return None
    """
    for i in ids:
        try:
            body_text = download_article(i) # get body text
            record = get_full_lineage(body_text) # extract lineages in text
            [x.append(i) for x in record] # attach article id to lineage record
            full_regrex.append(pd.DataFrame(record))
        except urllib.error.HTTPError as exc:
            time.sleep(5) # wait 10 seconds and then make http request again
            continue
    df_fulltext = pd.concat(full_regrex)
    df_fulltext.columns = ['lineage', 'string', 'ID']
    return df_fulltext

In [None]:
### BAG

In [None]:
def query(lineage):
    # get articles ids for specific lineage:
    url = requests.get(f'https://www.ebi.ac.uk/europepmc/webservices/rest/search?query=(%22{lineage}%22%20AND%20(%22SARS-CoV-2%22%20OR%20%22COVID-19%22)%20AND%20(%22lineage%22%20OR%20%22lineages%22%20OR%20%22strain%22%20OR%20%22strains%22%20OR%20%22variants%22%20OR%20%22variants%22))%20AND%20(FIRST_PDATE:%5b2020-01-01%20)%20AND%20HAS_FT:y%20AND%20%20sort_date:y&resultType=idlist&pageSize=1000&format=json&cursorMark=*')
    text = url.text
    print(text[:10])
    results = json.loads(text)['resultList']['result']
    ids = list(map(lambda x: x['fullTextIdList']['fullTextId'][0], results))
    return {'lineage': lineage, 'ids': ids}

In [None]:
lineages[7:8]

In [None]:
## try if work on lineage A.1 & A2.5.2

In [69]:
def parallel_iter(lineages): 
    
    for l_ in lineages: #This is redundant in mapping
        start = time.time()
        print(f'start {l_}')
        #get ids
        b = db.from_sequence(l_)
        result = b.map(lambda record: get_ids(record))
        ids = result.take(1)[0]

        try:
            # if lineage processed before, run on new ids
            path = Path(f'{l_}_df.csv')
            if path.is_file():
                print('exist')
                df = pd.read_csv(path)
                id_ran = set(df.ID)
                id_to_run = set(ids) - id_ran
                print(f'new id {id_to_run}')
                df_new = extract_full(id_to_run) # merge with original df
                print(df_new)
                ddf = pd.concat([df, df_new], axis = 0)
                print('merged')
                #if id_ran != set(ids): 
                    #print('new id to run')
                    #dds = result.filter(lambda x: x not in list(id_ran)).map(lambda i: extract_full_parallel2(i))
                    #df_new = dds.take(1)[0]

                #else: return # if no updates do nothing
                ddf.to_csv(f'{l_}_df.csv',index=False, \
                                            header = ['lineage', 'string', 'ID'])
                end = time.time()
                print(f'done with {l_}, time duration --- seconds --- {end - start} \n')
            else: # otherwise run on all ids
                raise Exception('run on all')
        except:
            #result = db.from_sequence(ids)
            dds = extract_full(ids)
            #ddf = dds.take(1)[0]
            ddf.to_csv(f'{l_}_df.csv',index=False, \
                                        header = ['lineage', 'string', 'ID'])
            end = time.time()
            print(f'done with {l_}, time duration --- seconds --- {end - start} \n')



In [78]:
ids = {'PMC9205280', 'PMC8251645', 'PMC7650373', 'PMC7741497'}
b = db.from_sequence(ids,npartitions=2)
b.take(4)

('PMC9205280', 'PMC8251645')

In [70]:
parallel_iter('A.2.2')

hello
start A.2.2
exist
new id {'PMC9205280', 'PMC8251645', 'PMC7650373', 'PMC7741497'}


KeyboardInterrupt: 

In [132]:
s = pd.read_csv('A.2.5.2_df.csv')
s[15:].to_csv('A.2.5.2_df.csv')
start = time.time()
iter_lineage('A.2.5.2')
end = time.time()
print('Total time:', end-start)

start A.2.5.2
{"ver
exist
new id {'PMC9145602'}
done merge
done with A.2.5.2, time duration --- seconds --- 19.290800094604492 

Total time: 19.291192054748535


In [108]:
# break down iter code for debugging
l_ = 'A.2.5.2'
start = time.time()
b = db.from_sequence([l_])
result = b.map(lambda record: get_ids(record))
path = Path(f'{l_}_df.csv')
if path.is_file():
    print('exist')
    df = pd.read_csv(path)
    ids = result.take(1)[0]
    
    id_ran = set(df.ID)
    print(id_ran)
    print(ids)
    if id_ran != set(ids): 
        print('here')
        dds = result.filter(lambda x: x not in list(id_ran)).map(lambda i: extract_full_parallel2(i))
        print('found')
        df_new = dds.take(1)[0]
        ddf = pd.concat([df, df_new], axis = 0)
end = time.time()
print('Total time:', end-start)

exist
{'PMC9088647', 'PMC8525575'}
['PMC9145602', 'PMC9088647', 'PMC8525575']
here
found
Total time: 15.104039907455444


In [124]:
start = time.time()
for i in l_10[2:4]:
    iter_lineage(i)
end = time.time()

start A.2.2
{"ver
exist
new id {'PMC8920968', 'PMC7650373', 'PMC9205280', 'PMC9012253', 'PMC9230982', 'PMC9088647', 'PMC8251645', 'PMC7741497'}
done merge
done with A.2.2, time duration --- seconds --- 212.11726903915405 

start A.2.3
{"ver
exist
new id {'PPR417362', 'PMC8662802', 'PMC8390340', 'PMC9088647', 'PMC7350918', 'PMC9174147'}
done merge
done with A.2.3, time duration --- seconds --- 79.7303831577301 



In [None]:
end-start

In [134]:
s = pd.read_csv('A.2.2_df.csv')
s[30:].to_csv('A.2.2_df.csv')

In [135]:
s = pd.read_csv('A.2.3_df.csv')
s[30:].to_csv('A.2.3_df.csv')

In [112]:
# parallel
b = db.from_sequence(lineages[6:9])
start = time.time()
result = b.map(lambda record: parallel_iter(record))
result.compute()
end = time.time()
print('Total time:', end-start)

Total time: 3.3931920528411865


In [47]:
b = db.from_sequence(lineages[2:4])
start = time.time()
result = b.map(lambda record: parallel_iter(record))
result.compute()
end = time.time()
print('Total time:', end-start)

Function:  execute_task
args:      ((<function safe_take at 0x7fa6d210ad40>, 1, (<function map_chunk at 0x7fa6d210a560>, <function parallel_iter.<locals>.<lambda> at 0x7fa6d6c504d0>, [['PMC9174147']], None, {}), True))
kwargs:    {}
Exception: 'NameError("name \'i\' is not defined")'

Function:  execute_task
args:      ((<function safe_take at 0x7fa4959d7680>, 1, (<function map_chunk at 0x7fa4959d6e60>, <function parallel_iter.<locals>.<lambda> at 0x7fa49a09c050>, [['PMC9230982']], None, {}), True))
kwargs:    {}
Exception: 'NameError("name \'i\' is not defined")'



KeyboardInterrupt: 

In [None]:
# with new id added: 179
# nothing changed: 
# start new: 

## One example of break down

In [64]:
s = pd.read_csv('A.2.5.1_df.csv')
s[:-30].to_csv('A.2.5.1_df.csv')


In [66]:
start = time.time()
parallel_iter('A.2.5.1')
end = time.time()
print('Total time:', end-start)

start A.2.5.1
exist
new id to run


Function:  execute_task
args:      ((<function safe_take at 0x7fac6cb895f0>, 1, (<function map_chunk at 0x7fac6cb88dd0>, <function parallel_iter.<locals>.<lambda> at 0x7fac6c376a70>, [(<class 'filter'>, <function parallel_iter.<locals>.<lambda> at 0x7fac6c22fd40>, (<function map_chunk at 0x7fac6cb88dd0>, <function parallel_iter.<locals>.<lambda> at 0x7fac6c22fcb0>, [['A.2.5.1']], None, {}))], None, {}), True))
kwargs:    {}
Exception: 'TypeError("unhashable type: \'list\'")'



done with A.2.5.1, time duration --- seconds --- 16.260791063308716 

Total time: 16.26163601875305


In [49]:
import dask.bag as db
b = db.from_sequence(lineages[2:4])
print(lineages[6:9])
print(b.take(2))
start = time.time()
result = b.map(lambda record: get_ids(record))
print('get id')
print(result.take(1))
dds = result.map(lambda x: extract_full_parallel2(x))
print(dds.compute())
end = time.time()
print('Total time:', end-start)

['A.2.5.1' 'A.2.5.2' 'A.2.5.3']
('A.2.2',)
get id
(['PMC9230982', 'PMC9205280', 'PMC9012253', 'PMC9088647', 'PMC8920968', 'PMC8883582', 'PPR453676', 'PPR439656', 'PPR439367', 'PMC8688474', 'PPR425039', 'PMC8561373', 'PMC8251645', 'PPR361607', 'PMC7705438', 'PMC7650373', 'PMC7741497'],)


KeyboardInterrupt: 

In [None]:
result.take(2)

In [None]:
b = db.from_sequence([lineages[7:8]])
print(lineages[7:8])
start = time.time()
result = b.map(lambda record: query(record))
print('get id')
ddf = result.map(lambda x: extract_full_parallel(x))
ddf.compute()
end = time.time()
print(end - start)

In [None]:
# get dataframe
ddf.take(1)[0]

In [None]:
# download articles in XML and return body paragraph
def process(article_id):
    url = f'https://www.ebi.ac.uk/europepmc/webservices/rest/{article_id}/fullTextXML'
    xmldoc = parse(urlopen(url))
    
    # get full text
    root = xmldoc.getroot()
    text = root.findall('.//p')

    # put body paragraphs together
    ptext = ""
    for p in text:
        ptext += ''.join([x for x in p.itertext()]) + '.\n' + '\n'
    
    # tokenize texts into sentences
    p_sentence = nltk.tokenize.sent_tokenize(ptext)
    
    # record lineages
    linset = set()
    pair = []
    for s in p_sentence:
        s1 = re.subn('[()/,]', ' ', s)[0] # remove special chars
        lin = set(pattern1.findall(s1) + pattern2.findall(s1) + pattern3.findall(s1) + pattern4.findall(s1))

        if lin: 
            for l in lin:
                # valid lineage and not recorded
                l = l.strip()
                l = l.capitalize()
                if (l in lineages) and (l not in linset): 
                    linset.add(l)
                    pair.append([l, s])
                else: continue
    
    

In [None]:
# get lineage for full texts
def get_full_lineage(ptext):
    # tokenize texts into sentences
    p_sentence = nltk.tokenize.sent_tokenize(ptext)
    
    # record lineages
    linset = set()
    pair = []
    for s in p_sentence:
        s1 = re.subn('[()/,]', ' ', s)[0] # remove special chars
        lin = set(pattern1.findall(s1) + pattern2.findall(s1) + pattern3.findall(s1) + pattern4.findall(s1))

        if lin: 
            for l in lin:
                # valid lineage and not recorded
                l = l.strip()
                l = l.capitalize()
                if (l in lineages) and (l not in linset): 
                    linset.add(l)
                    pair.append([l, s])
                else: continue
    return pair
    
    """
    ptext = re.subn('[()/,]', ' ', ptext)[0] # remove special chars
    lin = pattern1.findall(ptext) + pattern2.findall(ptext) + pattern3.findall(ptext)
    lin_set = set(lin)
    
    record = []
    if lin_set:
        for l in lin_set:
            
            sen = re.search(r"\.?([^\.]*{}[^\.]*)".format(l), ptext).group()
            record.append([l, sen])
    """
    

In [None]:
# wrap up function take lineage ids as input and output dataframe
def extract_full(ids):
    full_regrex = []
    
    if not ids:  
        return None
    for i in ids:
        try:
            body_text = download_article(i) # get body text
            record = get_full_lineage(body_text) # extract lineages in text
            [x.append(i) for x in record] # attach article id to lineage record
            full_regrex.append(pd.DataFrame(record))
        except urllib.error.HTTPError as exc:
            time.sleep(10) # wait 10 seconds and then make http request again
            continue
    df_fulltext = pd.concat(full_regrex)
    df_fulltext.columns = ['lineage', 'string', 'ID']
    return df_fulltext


    

In [None]:
result.map(lambda x: 