## COVID-19 Open Research Project - CS431 Final Project

## Set up data and environment

In [0]:
from pyspark import SparkContext, SparkConf

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.ml import Pipeline

In [0]:
import glob

In [0]:
# File location and type
file_location = "/FileStore/tables/COR/"
file_type = "json"

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.json(file_location, multiLine=True)

df.show(5)

In [0]:
df.count()

## Preprocess Data

In [0]:
df.printSchema()

In [0]:
grouped_doc = df.select("paper_id", explode("body_text")\
                .alias("body_text_flat"))\
                .select("paper_id", "body_text_flat.text")\
                .groupby("paper_id")\
                .agg(concat_ws(" ", collect_list("text").alias("text")).alias("body_text"))

# Preprocess Text

In [0]:
%sh

ls /databricks/python3/bin/python3

In [0]:
from langid.langid import LanguageIdentifier, model
import string
import re
import spacy
spacy.cli.download("en_core_web_md")
nlp=spacy.load('en_core_web_md')

In [0]:
import thinc

## Remove None ASCII

In [0]:
def strip_non_ascii(data_str):
    ''' Returns the string without non ASCII characters'''
    stripped = (c for c in data_str if 0 < ord(c) < 127)
    return ''.join(stripped)

## Check Language

In [0]:
def check_lang(data_str):
    identifier = LanguageIdentifier.from_modelstring(model, norm_probs=True)
    predict_lang = identifier.classify(data_str)

    if predict_lang[1] >= .9:
        language = predict_lang[0]
    else:
        language = predict_lang[0]
    return language

## Remove Stop Words & Lemmatize

In [0]:
# Remove stop words and lemmatize

def clean_text(text):
  if thinc.extra.load_nlp.VECTORS is None:
    nlp = spacy.load('en_core_web_md')
    proc = nlp(text)
    
  try:
    proc = nlp(text)
  except (NameError, UnboundLocalError) as e:
    nlp = spacy.load('en_core_web_md')
    proc = nlp(text)
  else:
    pass
  
  types = ["NOUN", "PROPN", "VERB", "ADJ"]
  
  cleaned = []
  for i, token in enumerate(proc):
    if i == 0:
      cleaned.append(token.lemma_)
    elif token.is_stop is False and token.pos_ in types:
      cleaned.append(token.lemma_)
  
  return " ".join(cleaned)

## Spark UDF

In [0]:
strip_non_ascii_udf = udf(strip_non_ascii, StringType())
check_lang_udf = udf(check_lang, StringType())
clean_udf = udf(clean_text, StringType())

In [0]:
grouped_doc.printSchema()

In [0]:
grouped_doc.show(3)

In [0]:
proc_df = grouped_doc.withColumn('no_ascii', strip_non_ascii_udf(grouped_doc['body_text'])).drop("body_text")
proc_df.show(5)

In [0]:
proc_df = proc_df.withColumn('lang', check_lang_udf(proc_df['no_ascii']))
proc_df.show(5)

In [0]:
proc_df = proc_df.filter(proc_df['lang'] == "en")

In [0]:
proc_df = proc_df.withColumn('clean', clean_udf(proc_df['no_ascii']))
proc_df.show(5)

# Recommender

In [0]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, NGram
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes, RandomForestClassifier
from pyspark.ml.clustering import LDA
from pyspark.ml.feature import CountVectorizer

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
tokenizer = Tokenizer(inputCol="clean", outputCol="words")
data = tokenizer.transform(proc_df)


In [0]:
data.show(5)

In [0]:
ngram = NGram(n=2, inputCol='words', outputCol='ngrams')
ngram_df = ngram.transform(data)

In [0]:
hashing = HashingTF(numFeatures=10000, inputCol="ngrams", outputCol="vectors")
hash_df = hashing.transform(ngram_df)

In [0]:
hash_df.show(5)

In [0]:
final_model = hash_df.select('paper_id', 'ngrams', col('vectors').alias('features'))
final_model.persist()
# final_model.show(5)

# Jaccard Distance with MinHash LSH

In [0]:
from pyspark.ml.feature import MinHashLSH

In [0]:
mh = MinHashLSH(inputCol="features", outputCol="hashes", numHashTables=3)
model = mh.fit(final_model)

In [0]:
model.transform(final_model).show(3)

In [0]:
final_model.head().features

Let's now try to use the model to find similar articles

In [0]:
test_df = spark.read.json('/FileStore/tables/CORD19/test.json', multiLine=True)
grouped_test = test_df.select("paper_id", explode("body_text")\
                      .alias("body_text_flat"))\
                      .select("paper_id", "body_text_flat.text")\
                      .groupby("paper_id")\
                      .agg(concat_ws(" ", collect_list("text").alias("text")).alias("body_text"))

In [0]:
grouped_test.show()

In [0]:
test_proc_df = grouped_test.withColumn('no_ascii', strip_non_ascii_udf(grouped_test['body_text']))
test_proc_df = test_proc_df.withColumn('clean', clean_udf(test_proc_df['no_ascii']))
test_proc_df.show()

In [0]:
test_data = tokenizer.transform(test_proc_df)
test_ngram_df = ngram.transform(test_data)
test_hash_df = hashing.transform(test_ngram_df)
test_final = test_hash_df.select('paper_id', 'ngrams', col('vectors').alias('features'))

In [0]:
test_final.show()

In [0]:
key = test_final.head().features

In [0]:
model.approxNearestNeighbors(final_model, key, 5).show()