# Incremental Pre-processor

This notebook is given paths to two different metadata files from two CORD-19 datasets from different dates, and computes the diffs between the two datasets. The first date (`d1`) should always be prior to the second date (`d2`) and the objective is to compute the file diffs between `d1` and `d2`. The notebook will work out:
* deletions -- papers which exist in the dataset for `d1` but were deleted from the dataset for `d2`.
* additions -- papers which did not exist in the dataset for `d1` but were added to the dataset for `d2`.

## Initialize Dask Cluster

In [1]:
from dask_saturn.core import describe_sizes

describe_sizes()

{'medium': 'Medium - 2 cores - 4 GB RAM',
 'large': 'Large - 2 cores - 16 GB RAM',
 'xlarge': 'XLarge - 4 cores - 32 GB RAM',
 '2xlarge': '2XLarge - 8 cores - 64 GB RAM',
 '4xlarge': '4XLarge - 16 cores - 128 GB RAM',
 '8xlarge': '8XLarge - 32 cores - 256 GB RAM',
 '12xlarge': '12XLarge - 48 cores - 384 GB RAM',
 '16xlarge': '16XLarge - 64 cores - 512 GB RAM',
 'g4dnxlarge': 'T4-XLarge - 4 cores - 16 GB RAM - 1 GPU',
 'g4dn4xlarge': 'T4-4XLarge - 16 cores - 64 GB RAM - 1 GPU',
 'g4dn8xlarge': 'T4-8XLarge - 32 cores - 128 GB RAM - 1 GPU',
 'p32xlarge': 'V100-2XLarge - 8 cores - 61 GB RAM - 1 GPU',
 'p38xlarge': 'V100-8XLarge - 32 cores - 244 GB RAM - 4 GPU',
 'p316xlarge': 'V100-16XLarge - 64 cores - 488 GB RAM - 8 GPU'}

In [2]:
from dask.distributed import Client, wait
from dask_saturn import SaturnCluster
import time

n_workers = 10
cluster = SaturnCluster(n_workers=n_workers, scheduler_size='2xlarge', worker_size='4xlarge', nthreads=16)
client = Client(cluster)
cluster

[2020-09-30 23:35:25] INFO - dask-saturn | Starting cluster. Status: pending
[2020-09-30 23:35:31] INFO - dask-saturn | Starting cluster. Status: pending
[2020-09-30 23:35:44] INFO - dask-saturn | Cluster is ready


VBox(children=(HTML(value='<h2>SaturnCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n   …

In [3]:
while len(client.scheduler_info()['workers']) < n_workers:
    print('Waiting for workers, got', len(client.scheduler_info()['workers']))
    time.sleep(30)
print('Done!')

Waiting for workers, got 0
Waiting for workers, got 0
Waiting for workers, got 3
Done!


In [4]:
import boto3
import dask.dataframe as dd
import json
import numpy as np
import os
import pandas as pd
import s3fs

from dask.distributed import Client, progress, get_worker

In [5]:
BUCKET_NAME = "s3://saturn-elsevierinc"

D1 = "2020-08-28"
D2 = "2020-09-28"

METADATA_D1 = "/".join([BUCKET_NAME, "cord19", "metadata.csv"])
METADATA_D2 = "/".join([BUCKET_NAME, D2, "metadata.csv"])

DELETED_FOLDER = "/".join([BUCKET_NAME, "incremental", "deleted"])
ADDED_FOLDER = "/".join([BUCKET_NAME, "incremental", "added"])

## Read Metadata Files

The `metadata.csv` files contains rows with null titles. These typically represent papers which haven't made it into the dataset yet, so they are removed in `prepare_metadata`. We just need the columns named in `col_sublist` for our computations going forward.

There are very few of these, 74 and 52 in our case (check with `df.title_is_null` == 1).

In [6]:
def detect_null(f):
    return 1 if pd.isnull(f) else 0


col_sublist = ["cord_uid", "title", "abstract",
               "pdf_json_files", "pmc_json_files"]


def prepare_metadata(meta_df_path, col_sublist=col_sublist):
    meta_df = dd.read_csv(meta_df_path, dtype=str)
    # remove null titles
    meta_df["title_is_null"] = meta_df.apply(lambda row: detect_null(row.title),
                                              axis=1, meta=("int"))
    meta_df = meta_df[meta_df.title_is_null == 0]
    # show column sublist
    meta_df = meta_df[col_sublist]
    # set index for merging
    meta_df.set_index("cord_uid")
    return meta_df

In [7]:
meta1_df = prepare_metadata(METADATA_D1)
meta1_df.head()

Unnamed: 0,cord_uid,title,abstract,pdf_json_files,pmc_json_files
0,ug7v899j,Clinical features of culture-proven Mycoplasma...,OBJECTIVE: This retrospective chart review des...,document_parses/pdf_json/d1aafb70c066a2068b027...,document_parses/pmc_json/PMC35282.xml.json
1,02tnwd4m,Nitric oxide: a pro-inflammatory mediator in l...,Inflammatory diseases of the respiratory tract...,document_parses/pdf_json/6b0567729c2143a66d737...,document_parses/pmc_json/PMC59543.xml.json
2,ejv2xln0,Surfactant protein-D and pulmonary host defense,Surfactant protein-D (SP-D) participates in th...,document_parses/pdf_json/06ced00a5fc04215949aa...,document_parses/pmc_json/PMC59549.xml.json
3,2b73a28n,Role of endothelin-1 in lung disease,Endothelin-1 (ET-1) is a 21 amino acid peptide...,document_parses/pdf_json/348055649b6b8cf2b9a37...,document_parses/pmc_json/PMC59574.xml.json
4,9785vg6d,Gene expression in epithelial cells in respons...,Respiratory syncytial virus (RSV) and pneumoni...,document_parses/pdf_json/5f48792a5fa08bed9f560...,document_parses/pmc_json/PMC59580.xml.json


In [8]:
len(meta1_df)

238944

In [9]:
meta2_df = prepare_metadata(METADATA_D2)
meta2_df.head()

Unnamed: 0,cord_uid,title,abstract,pdf_json_files,pmc_json_files
0,ug7v899j,Clinical features of culture-proven Mycoplasma...,OBJECTIVE: This retrospective chart review des...,document_parses/pdf_json/d1aafb70c066a2068b027...,document_parses/pmc_json/PMC35282.xml.json
1,02tnwd4m,Nitric oxide: a pro-inflammatory mediator in l...,Inflammatory diseases of the respiratory tract...,document_parses/pdf_json/6b0567729c2143a66d737...,document_parses/pmc_json/PMC59543.xml.json
2,ejv2xln0,Surfactant protein-D and pulmonary host defense,Surfactant protein-D (SP-D) participates in th...,document_parses/pdf_json/06ced00a5fc04215949aa...,document_parses/pmc_json/PMC59549.xml.json
3,2b73a28n,Role of endothelin-1 in lung disease,Endothelin-1 (ET-1) is a 21 amino acid peptide...,document_parses/pdf_json/348055649b6b8cf2b9a37...,document_parses/pmc_json/PMC59574.xml.json
4,9785vg6d,Gene expression in epithelial cells in respons...,Respiratory syncytial virus (RSV) and pneumoni...,document_parses/pdf_json/5f48792a5fa08bed9f560...,document_parses/pmc_json/PMC59580.xml.json


In [10]:
len(meta2_df)

282640

## Deleted Items

We will generate the deleted items and save it directly as a Parquet file for use by the post-processor later.

In [11]:
deleted_df = meta1_df.merge(meta2_df, 
                            how="left",
                            on=["cord_uid"],
                            suffixes=["_d1", "_d2"])

# remove deleted items (present in d1 but not in d2)
deleted_df["rhs_is_null"] = deleted_df.apply(
    lambda row: detect_null(row.title_d2), axis=1, meta=("int"))
deleted_df = deleted_df[deleted_df.rhs_is_null == 1]

# rename the _d1 columns
new_cols = {x + "_d1": x for x in col_sublist[1:]}
deleted_df = deleted_df.rename(columns=new_cols)

# remove the _d2 columns
deleted_df = deleted_df[col_sublist]

deleted_df.head()

Unnamed: 0,cord_uid,title,abstract,pdf_json_files,pmc_json_files
4788,tr1dsqpg,Transvesical natural orifice transluminal endo...,UNLABELLED Study Type - Therapy (case series) ...,,
5072,tlfttlpj,Error in the Text.,,,
5381,n08qtyyb,Cytoreductive surgery plus hyperthermic intrap...,BACKGROUND Advanced colorectal cancer (CRC) is...,,
5574,lwuk232v,Phenylalanine meta-hydroxylase: a single resid...,"The rare non-proteinogenic amino acid, meta- L...",,
5703,b9ucv0n3,DC-SIGN and L-SIGN Are Attachment Factors That...,UNLABELLED It is well established that glycosa...,,


In [12]:
len(deleted_df)

2162

## Added Items

We will compute the added items, and then generate the paragraph dataframe from it using similar logic to `01x-load-and-parse.ipynb`.

In [13]:
added_df = meta1_df.merge(meta2_df,
                          how="right",
                          on=["cord_uid"],
                          suffixes=["_d1", "_d2"])

# remove added items (present in D2 but not in D1)
added_df["lhs_is_null"] = added_df.apply(
    lambda row: detect_null(row.title_d1), axis=1, meta=("int"))
added_df = added_df[added_df.lhs_is_null == 1]

# rename the _d2 columns
new_cols = {x + "_d2": x for x in col_sublist[1:]}
added_df = added_df.rename(columns=new_cols)

# remove the _d1 columns
added_df = added_df[col_sublist]

added_df.head()

Unnamed: 0,cord_uid,title,abstract,pdf_json_files,pmc_json_files
38507,5vdvp2y2,The top 100 most cited articles on bronchoscop...,BACKGROUND: Bronchoscopy is applied broadly in...,document_parses/pdf_json/7b3e2a13aad3b3b185689...,document_parses/pmc_json/PMC7450920.xml.json
38508,fh5tj8qr,Experience with Temporary Centrifugal Pump Bi-...,Though ventricular assist devices (VADs) are a...,document_parses/pdf_json/cf4862c0fda8f258eb918...,document_parses/pmc_json/PMC7451784.xml.json
38509,5fini4ud,Baricitinib: Impact on COVID-19 coagulopathy?,,document_parses/pdf_json/882ae5379c1d79a98b3b2...,
38510,95pp2qf8,Social marketing interventions to promote phys...,BACKGROUND: Falls are a significant source of ...,document_parses/pdf_json/7305eb5c4c054d14dd82b...,document_parses/pmc_json/PMC7456007.xml.json
38511,yu7eudq6,The Impact of Accidental Hypothermia on Mortal...,BACKGROUND: Accidental hypothermia is a known ...,document_parses/pdf_json/338d52f61d8b10f5f803c...,document_parses/pmc_json/PMC7454138.xml.json


In [14]:
len(added_df)

44536

In [15]:
fs = s3fs.S3FileSystem()
if fs.exists(DELETED_FOLDER):
    fs.rm(DELETED_FOLDER, recursive=True)
    
DELETED_FOLDER

's3://saturn-elsevierinc/incremental/deleted'

In [16]:
%%time
deleted_df.to_parquet(DELETED_FOLDER, engine="pyarrow", compression="snappy")

CPU times: user 102 ms, sys: 4.14 ms, total: 106 ms
Wall time: 6.01 s


### Parse Additions to Added Paragraphs

In [17]:
def read_fully(filepath, s3, bucket_name):
    obj = s3.Object(bucket_name, filepath)
    s = obj.get()['Body'].read().decode('utf-8')
    return s


def parse_paragraphs(rows, bucket_name):
    worker = get_worker()
#     try:
#         s3 = worker.s3
#     except:
#         s3 = boto3.resource('s3')
#         worker.s3 = s3
    
#     s3 = None
#     while not s3:
#         try:
#             s3 = boto3.resource("s3")
#             worker.s3 = s3
#         except:
#             s3 = None

    try:
        s3 = worker.s3
    except:
        sess = boto3.session.Session()
        s3 = sess.resource("s3")
        worker.s3 = s3

    paragraphs = []
    filepath = None
    try:
        if pd.notnull(rows.pdf_json_files):
            filepath = rows.pdf_json_files
        elif pd.notnull(rows.pmc_json_files):
            filepath = rows.pmc_json_files
        else:
            pass
        if filepath is not None:
            abs_filepath = "/".join([D2, filepath])
            fdict = json.loads(read_fully(abs_filepath, s3, bucket_name))
            paragraphs.append(("T", fdict["metadata"]["title"]))
            paragraphs.extend([("A{:d}".format(i), x["text"]) 
                for i, x in enumerate(fdict["abstract"])])
            paragraphs.extend([("B{:d}".format(i), x["text"]) 
                for i, x in enumerate(fdict["body_text"])])
        else:
            paragraphs.append(("T", rows["title"]))
            for i, abs_para_text in enumerate(rows["abstract"].split('\n')):
                paragraphs.append(("A{:d}".format(i), abs_para_text))
    except:
        pass
    return paragraphs


In [18]:
paragraph_df = added_df.copy()

In [19]:
paragraph_df = paragraph_df.repartition(npartitions=100)

In [20]:
paragraph_df["paragraphs"] = paragraph_df.apply(
    lambda rows: parse_paragraphs(rows, BUCKET_NAME), meta=("object"), axis=1)
paragraph_df = paragraph_df.drop(columns=["title", "abstract",
                                          "pdf_json_files", "pmc_json_files"])
paragraph_df = paragraph_df.explode("paragraphs")
paragraph_df = paragraph_df.dropna()
paragraph_df["pid"] = paragraph_df.apply(lambda rows: rows.paragraphs[0], 
                                         meta=("str"), axis=1)
paragraph_df["ptext"] = paragraph_df.apply(lambda rows: rows.paragraphs[1], 
                                           meta=("str"), axis=1)
paragraph_df = paragraph_df.drop(columns=["paragraphs"])

In [21]:
fs = s3fs.S3FileSystem()
if fs.exists(ADDED_FOLDER):
    fs.rm(ADDED_FOLDER, recursive=True)

ADDED_FOLDER

's3://saturn-elsevierinc/incremental/added'

In [22]:
%%time
paragraph_df.to_parquet(ADDED_FOLDER, engine="pyarrow", compression="snappy")

CPU times: user 732 ms, sys: 16 µs, total: 732 ms
Wall time: 11.2 s


## Verify Result

In [23]:
DELETED_FOLDER

's3://saturn-elsevierinc/incremental/deleted'

In [24]:
v_deleted_df = dd.read_parquet(DELETED_FOLDER, engine="pyarrow")
v_deleted_df.head()

Unnamed: 0,cord_uid,title,abstract,pdf_json_files,pmc_json_files
4788,tr1dsqpg,Transvesical natural orifice transluminal endo...,UNLABELLED Study Type - Therapy (case series) ...,,
5072,tlfttlpj,Error in the Text.,,,
5381,n08qtyyb,Cytoreductive surgery plus hyperthermic intrap...,BACKGROUND Advanced colorectal cancer (CRC) is...,,
5574,lwuk232v,Phenylalanine meta-hydroxylase: a single resid...,"The rare non-proteinogenic amino acid, meta- L...",,
5703,b9ucv0n3,DC-SIGN and L-SIGN Are Attachment Factors That...,UNLABELLED It is well established that glycosa...,,


In [25]:
len(v_deleted_df)

2162

In [26]:
v_added_df = dd.read_parquet(ADDED_FOLDER, engine="pyarrow")
v_added_df.head()

Unnamed: 0,cord_uid,pid,ptext
38548,l2m8y422,T,Correction: Selective laser trabeculoplasty: p...
38563,kwby80nj,T,Publishing in the transfusion field: “Like a B...
38565,9vbwzi8v,T,Nachfrage nicht zu bremsen
38568,w6bnygac,T,Editor’s Focus
38572,k4bcmeke,T,Introduction to the September 2020 Special Iss...


In [27]:
len(v_added_df)

58621

## Shut down Cluster

In [28]:
# do this if youre done using the cluster
# cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
_GatheringFuture exception was never retrieved
future: <_GatheringFuture finished exception=CancelledError()>
concurrent.futures._base.CancelledError
