# COVID-411 Project
## Data & Pre-Processing Report + Code
---

For the purposes of this project, the CORD-19 from 2020-07-15 is utilized. The full
dataset consists of ```85,451``` parsed PDF-origin papers and ```61,966``` parsed XML-origin
papers from PubMed Central, along with supporting and metadata files.

For the scope of the project it became necessary to narrow the focus of the data involved,
so we elected to utilize only the PDF-origin parses to create the final product. This document
will briefly outline the data and pre-processing steps taken to prepare the data for generating
our model.

A local PySpark environment in standalone mode was ultimately utilized for simplicity, as it was not
feasible for these authors to constantly spin up and down cloud services during development. Initially, a more
complex distributed Apache Spark cluster was trialed on a single machine using Docker, including
a HDFS volume and a Master with several Worker nodes and a JupyterLab instance for running code.
However, while effective (particularly for parallel processing) this approach was ultimately abandoned due to
difficult interoperability with later stages of the project, along with performance issues (mostly related to the
Windows Docker Desktop WSL2 backend and its underlying memory issues.) A copy of the Docker files (and attributions as
to where they were derived from) will be included in the project codebase for reference.

The use of the standalone PySpark environment presented some challenges with memory management. Processing the full
dataset via a Resilient Distributed Dataset (RDD) and then collecting all at once tended to trigger an
OutOfMemoryException for the Java Heap. To
mitigate this the data was first partitioned on disk into 10,000 file chunks and processed sequentially, the results of
which were written to an intermediate database.
Transitioning from use of map to foreach to perform the task resulted in a more
stable running environment, however we kept the partitioning as it helped to recover in the case of a mid-process crash.
The first iteration using the distributed cluster environment
utilized a PostgreSQL server on a Docker image on the same virtual network as the Spark cluster. This network
environment allowed for worker nodes to commit the processed data to the database in parallel streams. However, it
was determined a simpler approach would need to be used for our purposes and we switched to a SQLite database as it
would give a single database file that could be more easily manipulated. Since the use of a file-based database
platform with frequent locking would negate the benefit of parallelizing the streaming of data into the database, the
Standalone Spark environment worked well for our purposes

The code (and output) for partitioning and initial data pre-processing is included below.

In [7]:
# PARTITIONING STEP

import os
import shutil

SRC_PATH = '../data/2020-07-15/document_parses/pdf_json/'
DEST_PATH = './data/'

files = os.listdir(SRC_PATH)

part_dirs = os.listdir(DEST_PATH)

print("Partition directories: ", part_dirs)

nfiles = len(files)

# There will be 9 total partitions (0-8) for 10,000 file chunks
for i in range(9):
    cdir = part_dirs[i]

    for j in range(10000):
        fn = (i*10000) + j # file number

        if fn == nfiles:
            break

        cfile = files[fn]

        # Show the progress of the partitioning
        print(f'Processing partition {i+1} of 9 [{cdir}]: Processing file {fn+1} [{cfile}] of {nfiles}.', end='\r')

        # Also considered hardlinking rather than copying, but diskspace was not a concern for this part
        shutil.copy(SRC_PATH+cfile, DEST_PATH+cdir+'/')

Partition directories:  ['p0', 'p1', 'p2', 'p3', 'p4', 'p5', 'p6', 'p7', 'p8']


In [9]:
# ENVIRONMENT SETUP AND INITIAL PRE-PROCESSING PREPARATIONS

from datetime import datetime as dt

nbstime = dt.now()

FILES = '*.json'
PARTS = '../data'

DB = './covid411.db'

parts = os.listdir(PARTS)

# from pyspark.sql import SparkSession

# spark = SparkSession.\
#         builder.\
#         appName("pyspark-notebook").\
#         master("spark://spark-master:7077").\
#         config("spark.executor.memory", "1024m").\
#         getOrCreate()

# sc = spark.sparkContext

# Present in the pyspark environment that spawned the Jupyter server
sc

In [10]:
import sqlite3

# Helper functions

# Remove database file if exists
if os.path.isfile(DB):
    os.remove(DB)

def get_db_conn(db):

    return sqlite3.connect(db)


def show_record_counts(cur):

    q_papers = "SELECT COUNT(*) FROM papers;"
    q_sentences = "SELECT COUNT(*) FROM sentences;"

    cur.execute(q_papers)
    n_papers = cur.fetchone()[0]

    cur.execute(q_sentences)
    n_sentences = cur.fetchone()[0]

    print (f"There are currently {n_papers} papers and {n_sentences} sentences present in the database.")


# Initialize DB

with get_db_conn(DB) as conn:

    cur = conn.cursor()

    cur.execute('''
        CREATE TABLE papers (
            paper_id text PRIMARY KEY NOT NULL,
            title text NOT NULL
        );
    ''')

    cur.execute('''
        CREATE TABLE sentences (
            paper_id text NOT NULL,
            sentence_number integer NOT NULL,
            sentence text,
            PRIMARY KEY (paper_id, sentence_number),
            CONSTRAINT fk_paper
                FOREIGN KEY (paper_id)
                    REFERENCES papers (paper_id)
        );
    ''')

    conn.commit()

    show_record_counts(cur)

There are currently 0 papers and 0 sentences present in the database.


The bulk of the processing is shown in the code below. The first task involved was to load the JSON files and parse
them into a Spark RDD. Next, the following pieces of data were targeted:

| Field | Description
| :- | :-
| **```paper_id```** | a unique id hash
| **```title```** | title of the paper
| **```body_text```** | the main text of the paper, consisting of all sections

This information was processed and represented as a list of Python dictionaries via the RDD map function. Then, NLTK
was utilized to detect the written language of the paper by examining stopwords, the aim of which was to filter out all
non-English papers for the purposes of this project. Finally, the sentences of the ```body_text``` were split using an
NLTK tokenizer and all the relevant data was placed into the intermediate database. This process was broken up by the
partitioning paradigm discussed in the first section of this report.

The code (and output) for these tasks follows:

In [11]:
# DATA PROCESSING HELPER FUNCTIONS

import json
from nltk.corpus import stopwords
from nltk.tokenize import wordpunct_tokenize

def load_paper_json(raw_json):

    # This was necessary because there was a file that would not
    # parse properly and would error the whole operation

    try:
        res = json.loads(raw_json)
    except:
        res = ''

    return res


def parse_paper_json(loaded_json):
    """
    Builds and returns a dict containing the paper_id, title, and
    body_text (joined sections) for further processing

    :param paper_json: loaded JSON object
    :return: dict containing targeted information
    """
    return \
        {
            'paper_id':     loaded_json['paper_id'],
            'title':        loaded_json['metadata']['title'],
            'body_text':    " ".join([x['text'] for x in loaded_json['body_text']])
        }


sw_fileids = stopwords.fileids()
sw_dict = {lang:stopwords.words(lang) for lang in sw_fileids}

def text_lang_likely(text):
    """
    Compares tokenized text to set of stopwords for each language contained
    within the NLTK stopwords corpus and outputs the likely language based on
    the number of common words.

    Adapted from http://blog.alejandronolla.com/2013/05/15/detecting-text-language-with-python-and-nltk/

    :param text: body of text
    :return: most likely language of text
    """
    wp_words = set(wd.lower() for wd in wordpunct_tokenize(text))
    lang_scores = {}
    for lang in sw_fileids:
        sw_set = set(sw_dict[lang])
        intersection = wp_words & sw_set
        lang_scores[lang] = len(intersection)
    return max(lang_scores, key=lang_scores.get) # return language with highest score


def lang_likely_wrapper(data):
    #import nltk ; was initially necessary to load NLTK on each worker node

    # Adds a dictionary field for the detected language of the paper
    data.update({'lang': text_lang_likely(data['body_text'])})

    return data


def process_sentences(body_text):
    from nltk.tokenize import sent_tokenize

    # Gets a list of sentences using the NLTK sentence tokenizer on the whole text body
    sentences = sent_tokenize(body_text)

    # Returns a list of dictionaries assigning a number to each sentence for use as
    # a composite key in the intermediate database along with paper_id
    return [{
        'sentence_number': i,
        'sentence': sentence
    } for i,sentence in enumerate(sentences)]

# Define the SQL statements for inserting data into the intermediate database
add_paper = "INSERT INTO papers (paper_id, title) VALUES (?, ?);"
add_sentence = "INSERT INTO sentences (paper_id, sentence_number, sentence) VALUES (?, ?, ?);"

def db_transact(row):
    """
    Intended to be used inside a map function within a Spark RDD foreach call
    """
    with get_db_conn(DB) as conn:

        conn.cursor().execute(add_paper, (row['paper_id'], row['title']))

        for sentence_data in process_sentences(row['body_text']):
            conn.cursor().execute(add_sentence, (row['paper_id'], sentence_data['sentence_number'], sentence_data['sentence']))

        conn.commit()


def db_transact_wrapper(x):

    # Will output to the PySpark console so I can keep track
    print(f"Processing paper: {x['paper_id']}", end='\r')

    db_transact(x)

In [12]:
# MAIN PROCESS DRIVER

for part in parts:
        stime_in = dt.now()

        sc \
            .wholeTextFiles(f'{PARTS}/{part}/{FILES}').values() \
            .map(load_paper_json) \
            .filter(lambda x: x != '') \
            .map(parse_paper_json) \
            .map(lang_likely_wrapper) \
            .filter(lambda x: x['lang'] == 'english') \
            .foreach(db_transact_wrapper)

        print (f"Partition [{part}] completed in {(dt.now() - stime_in).total_seconds()} seconds")

print ("-----------")
print (f"Data processing completed in {(dt.now() - nbstime).total_seconds()} seconds")

Partition [p0] completed in 154.623591 seconds
Partition [p1] completed in 193.394659 seconds
Partition [p2] completed in 212.696997 seconds
Partition [p3] completed in 202.906467 seconds
Partition [p4] completed in 202.743589 seconds
Partition [p5] completed in 202.516695 seconds
Partition [p6] completed in 200.014015 seconds
Partition [p7] completed in 202.893136 seconds
Partition [p8] completed in 93.316841 seconds
-----------
Data processing completed in 1674.647034 seconds


In [13]:
with get_db_conn(DB) as conn:
    show_record_counts(conn.cursor())

There are currently 82626 papers and 15081367 sentences present in the database.
