# PubMed Extraction

Extract desired data from pubmed XML files into csv files to be used in NLP.

In [36]:
from xml.etree import ElementTree as etree
from pathlib import Path
import gzip, csv
from multiprocessing import Lock
from fastai.text import *

In [37]:
path = Path("./data/sample") # you probably want a symlink here to your data drive
dest = Path(".")      # for faster speed if not SSD-drive, set dest to a different drive
#dest = path

# Download data

In [38]:
# run once to download the pubmed data 
# info: 1200+ xml.gz files totalling ~35GB compressed (2019-05)
if 0:
    ! wget -m -np -nd ftp://ftp.ncbi.nlm.nih.gov/pubmed/baseline/    -P {path}
    ! wget -m -np -nd ftp://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/ -P {path}

In [39]:
path.ls()

[PosixPath('data/sample/pubmed19n1198.xml.gz'),
 PosixPath('data/sample/pubmed19n1157.xml.gz'),
 PosixPath('data/sample/pubmed19n1185.xml.gz'),
 PosixPath('data/sample/pubmed19n1152.xml.gz'),
 PosixPath('data/sample/pubmed19n1205.xml.gz'),
 PosixPath('data/sample/pubmed19n1172.xml.gz')]

# Extraction

In [40]:
def parallel_thread(func, arr:Collection, max_workers:int=None):
    "Call `func` on every element of `arr` in parallel using `max_workers`."
    max_workers = ifnone(max_workers, defaults.cpus)
    if max_workers<2: results = [func(o,i) for i,o in progress_bar(enumerate(arr), total=len(arr))]
    else:
        with ThreadPoolExecutor(max_workers=max_workers) as ex:
            futures = [ex.submit(func,o,i) for i,o in enumerate(arr)]
            results = []
            for f in progress_bar(concurrent.futures.as_completed(futures), total=len(arr)): results.append(f.result())
    if any([o is not None for o in results]): return results


In [41]:
# Data to extract:

# Root: PubmedArticleSet.PubmedArticle.MedlineCitation

#  Condition: # only english articles
# -- .Language == eng

# 1. pmid | title | abstract | mesh (pubmed-abstracts.csv)
# - .PMID
# - .Article:
# -- .ArticleTitle
# -- .Abstract.AbstractText
#
# 2.  MeSH data 
# - .MeshHeadingList: converted into a single record with '|' separated entries of each MeshHeading
#    and each MeshHeading is converted into 'UI/MajorTopicYN' for DescriptorName, 
#    and optional 'UI/MajorTopicYN' for QualifierName joined with '-' 
#    here is a sample of what a record might look like: D000339/Y|D004650/N-Q000706/Y 
# 
#   the lookup table for these ids is saved in a separate pubmed-mesh.csv file

csv_fn_main = dest/"pubmed-abstracts.csv"
csv_fn_mesh = dest/"pubmed-mesh.csv"

#lock = threading.Lock()
total, total_eng, total_no_abstract = 0, 0, 0
mesh_db = {}
f_out_main = open(csv_fn_main, "w")
csv_writer_main = csv.writer(f_out_main)
csv_writer_main.writerow(["pmid", "title", "abstract", "mesh"])

lock = Lock()

def mesh_db_update(k, v):
    if k in mesh_db: return
    mesh_db[k]=v
    
def mesh_db_write():
    print(f"Saving MeSh records ({len(mesh_db)})")
    with open(csv_fn_mesh, "w") as f_out_mesh:
        csv_writer_mesh = csv.writer(f_out_mesh)
        csv_writer_mesh.writerow(["meshid", "text"])
        csv_writer_mesh.writerows(mesh_db.items())

# https://stackoverflow.com/a/26435241/9201239 efficient RAM usage
def parse_entries(f, tag):
    """Yield *tag* elements from *f* xml (fn or fh) incrementaly."""
    context = iter(etree.iterparse(f, events=('start', 'end')))
    _, root = next(context) # get root element
    for event, elem in context:
        if event == 'end' and elem.tag == tag:
            yield elem
            root.clear() # free memory

def meshhead2rec(mh):
    if mh is None: return ''
    l = []
    for e in mh.find('DescriptorName'), mh.find('QualifierName'):
        if e is not None:
            ui, mt = e.attrib['UI'], e.attrib['MajorTopicYN']
            l.append(f"{ui}/{mt}")
            mesh_db_update(ui, e.text)
    return "-".join(l)
                
def meshlist2rec(mhl):
    if mhl is None: return ''
    l = [meshhead2rec(m) for m in mhl.findall('MeshHeading')]
    return "|".join(l)

#def extract(f_in, fn, files_cnt, csv_writer_main):
def extract(f_in, fn, files_cnt):
    global total, total_eng, total_no_abstract
    #print(total, total_eng, total_no_abstract)
    c, c_eng, c_no_abstract = 0, 0, 0
    rows = []
    for e in parse_entries(f_in, 'MedlineCitation'):
        c += 1
        try:
            pmid = e.find('PMID').text
            #print(pmid)

            # 1. Abstracts
            a = e.find('Article')
            if a is None: continue
                
            lang = a.find('Language')
            if lang is None or lang.text != 'eng': continue

            c_eng += 1
            title    = a.find('ArticleTitle').text
            abstract = a.find('Abstract')
            if abstract is not None:
                abstract_text = abstract.find('AbstractText').text
                #print(pmid, title, abstract)
            else:
                abstract_text = ''
                c_no_abstract += 1

            
            # 2. MeSH Data
            mesh = meshlist2rec(e.find('MeshHeadingList'))
            #print(f"MeSH: {mesh}")
            
            # 
            #with lock:
            #    csv_writer_main.writerow([pmid, title, abstract_text, mesh])
            rows.append([pmid, title, abstract_text, mesh])
                    
        except: 
            #if not pmid: pmid = "unknown"
            #print(f"{pmid} failed to parse")
            raise
        #break
        
    print(f"{files_cnt:0>4d} {fn}: recs {c:0>5d}, eng {c_eng:0>5d}, no abstract {c_no_abstract:0>5d}")
    with lock:
        csv_writer_main.writerows(rows)
        mesh_db_write()

def extract_parallel(fn, index):
    #print(f"job {index}: file: {fn}")
    with gzip.open(fn, 'rb') as f_in: extract(f_in, fn, index)
        
def process():

    files = sorted(path.glob("*.xml.gz"))
    print(f"Total files to process: {len(files)}")
    parallel(extract_parallel, files, max_workers=8)           

    # XXX: mesh writing is broken in multiproc - need to save each separately and then merge at the end of everything, as simple as `cat mesh*csv | uniq > mesh.csv` hack will do
    
    # summary
    ! wc -l {csv_fn_main} {csv_fn_mesh}
    
%time process()    

26

Total files to process: 6


0000 data/sample/pubmed19n1152.xml.gz: recs 01637, eng 01627, no abstract 00257
0005 data/sample/pubmed19n1205.xml.gz: recs 02848, eng 02845, no abstract 00442
0002 data/sample/pubmed19n1172.xml.gz: recs 03217, eng 03161, no abstract 00401
0004 data/sample/pubmed19n1198.xml.gz: recs 04936, eng 04612, no abstract 00296
0001 data/sample/pubmed19n1157.xml.gz: recs 05062, eng 05040, no abstract 00338
0003 data/sample/pubmed19n1185.xml.gz: recs 06561, eng 06433, no abstract 00693
Grand total of 0 English recs (no abstract 0) out of 0 pubmed recs
Saving MeSh records (0)
   24366 pubmed-abstracts.csv
       1 pubmed-mesh.csv
   24367 total
CPU times: user 11.3 ms, sys: 61.6 ms, total: 72.8 ms
Wall time: 4.89 s
