This notebook contains the source codes and comments of retriever and reader module for question-answering task.

In [1]:
# -*- coding: utf-8 -*- 

import os 

from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors
from schema import *


In [2]:
# cluster mode
spark = SparkSession \
    .builder \
    .appName("sddm") \
    .master("spark://132.229.44.220:7272") \
    .config("spark.driver.memory", "50g")\
    .config("spark.executor.memory",'10g') \
    .config("spark.memory.storageFraction", 0.1)\
    .config("spark.ui.port",4077)\
    .config("spark.driver.maxResultSize","5g")\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.executor.heartbeatInterval", 600)\
    .config("spark.network.timeout", 1000)\
    .config('spark.local.dir', '/home/s2465922/tmp')\
    .getOrCreate()

# # local mode
# spark = SparkSession \
#     .builder \
#     .appName("sddm") \
#     .master("local[20]") \
#     .config("spark.driver.memory", "50g")\
#     .config("spark.driver.maxResultSize","10g")\
#     .config("spark.ui.port",4077)\
#     .config('spark.local.dir', '/home/s2465922/tmp')\
#     .getOrCreate()

# import sparknlp
# spark=sparknlp.start()

In [18]:
# spark.sparkContext.uiWebUrl
spark.stop()

### Load data
#### read metadata

In [9]:
path="/data/s2465922/cord_19/"

In [10]:
# read metadata
metadata=spark.read.csv(path+'metadata.csv',header=True,inferSchema=True,sep=',')

In [7]:
# metadata.count()

138794

In [11]:
# extract the data with full text
metadata.createOrReplaceTempView('md')
meta_with_text=spark.sql('select * from md where pdf_json_files is not null or pmc_json_files is not null')

In [12]:
# extract filename from pmc or pdf json files
meta_with_text=meta_with_text.withColumn('file', F.when(F.col('pmc_json_files')!='', F.col('pmc_json_files'))\
                                         .otherwise(F.col('pdf_json_files')))

# remove records that have incorrect filenames 
meta_with_text.createOrReplaceTempView('mwt')
meta_with_text=spark.sql('select * from mwt where file RLIKE "^document_parses"')\
              .dropDuplicates(subset=["file"]) 

In [7]:
# meta_with_text.count()

68000

#### read text files

In [7]:
# get filenames
filenames=meta_with_text.select('file').rdd.flatMap(lambda x: x).collect()

# get complete file path
fs=[]
for f in filenames:
  fs.extend(f.split('; '))
  
filepath=[path+f for f in set(fs)]

In [8]:
# read body_text data
texts=spark.read.schema(schema).json(filepath, multiLine=True)

In [34]:
# texts.printSchema()

root
 |-- paper_id: string (nullable = true)
 |-- abstract: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |-- body_text: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |    |    |-- section: string (nullable = true)



In [9]:
# extract paper_id and text part of body_text
texts.createOrReplaceTempView("papers")
ts=spark.sql('select paper_id,  concat_ws("\n",body_text.text) as text from papers')

### Data preprocessing

* common data preprocessing functions

In [3]:
import re

def text2paragraph(text):
  return re.split('\n+', text)

def paragraph2sentence(paragraph):
  return [s.strip() for s in re.split(r"(?<=[.!?])\s+", paragraph) if s != ""]

def text2sentence(text):
  sentences=[]
  for paragraph in text2paragraph(text):
    sentences.extend(paragraph2sentence(paragraph))
  return sentences

text2sentence_udf=F.udf(lambda x: text2sentence(x), ArrayType(StringType()))

def sentence2tokens(sentence):
  return re.findall(r'[^\s,":;()]+', sentence.strip('.?!').lower())
  
sentence2tokens_udf=F.udf(sentence2tokens, ArrayType(StringType()) )

def text2tokens(text):
  tokens=[]
  for paragraph in text2paragraph(text):
    for sentence in paragraph2sentence(paragraph):
      tokens.extend(sentence2tokens(sentence))
  return tokens

text2tokens_udf=F.udf(text2tokens, ArrayType(StringType()))

* find articles that contains covid-19 or other similar words

In [11]:
keywords = [r"2019[\-\s]?n[\-\s]?cov", "2019 novel coronavirus", "coronavirus 2019", r"coronavirus disease (?:20)?19",
            r"covid(?:[\-\s]?19)?", r"n\s?cov[\-\s]?2019", r"sars-cov-?2", r"wuhan (?:coronavirus|cov|pneumonia)"]

In [12]:
import re

def covid_tag(text):
  for k in keywords:
    if re.search(k, text.lower()):
      return True
  return False

covid_tag_udf=F.udf(lambda x: covid_tag(x), BooleanType())

ts_covid=ts.withColumn('covid', covid_tag_udf('text'))\
        .select('paper_id','text')\
        .filter('covid==True')

In [14]:
ts_covid.count()

18683

### Retriever

* fit tf-idf model using text data

In [13]:
# create tf-idf  pipeline

from pyspark.ml.feature import StopWordsRemover, CountVectorizer, IDF

stopwords =StopWordsRemover.loadDefaultStopWords('english')
sw_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered', stopWords=stopwords)

tf = CountVectorizer(inputCol='filtered', outputCol='tf')

idf = IDF(inputCol='tf', outputCol='tfidf')

td_pipeline = Pipeline(stages=[sw_remover, tf, idf])

In [14]:
# transform each document to a list of tokens
ts_tokenized=ts_covid.withColumn('tokens', text2tokens_udf('text'))

In [15]:
# fit tf-idf model using text data
from time import time
start=time()
td_model = td_pipeline.fit(ts_tokenized)
print('Running time: %.3f min'%((time()-start)/60))

Running time: 2.465 min


* apply tf-idf model to query and text data

In [16]:
# load questions
qs=spark.read.csv("qs.csv", header=True)\
  .withColumn('tokens', sentence2tokens_udf('query'))

In [17]:
# apply tf-idf model to query
qs_td=td_model.transform(qs)
qtf_lst=[u[0] for u in qs_td.select('tf').collect()]

In [18]:
# apply tf-idf model to text data
ts_td = td_model.transform(ts_tokenized)

* retrieve the top 100 papers with the  highest tf-idf scores

In [19]:
from time import time 
start=time()

candidate_papers=[]
for i in range(len(qtf_lst)):
  q_tf=qtf_lst[i]
  
  # compute the tf-idf scores of all papers for the given query
  get_score_udf=F.udf(lambda x: float(x.dot(q_tf)), FloatType())
  ts_score=ts_td.withColumn('score', get_score_udf('tfidf'))

  # select the top 100 papers with the highest score
  candidate=ts_score.select('paper_id').orderBy('score',ascending=False).take(100)
  candidate_papers.append([x[0] for x in candidate])
  ts_score.unpersist(blocking = True)
  
print('Running time: %.3f min'%((time()-start)/60))

# save candidate papers into the disk
import pickle
with open('candidate_papers_0718', 'wb') as fp:
    pickle.dump(candidate_papers, fp)


Running time: 6.671 min


### Reader

#### Convert text into sentences

In [13]:
# read candidate papers
import pickle
with open ('candidate_papers_0718', 'rb') as fp:
    candidate_papers = pickle.load(fp)

# get filenames of candidate papers
filepath=[]
for i in range(len(candidate_papers)):
  f=[]
  for x in candidate_papers[i]:
    if x.startswith("PMC"):
      f.append(path+"document_parses/pmc_json/"+x+".xml.json")
    else:
      f.append(path+"document_parses/pdf_json/"+x+".json")
  filepath.append(f)

In [15]:
# extract keywords from query

from pyspark.ml.feature import StopWordsRemover
stopwords =StopWordsRemover.loadDefaultStopWords('english')

from nltk.stem import PorterStemmer 
ps = PorterStemmer() 

import pandas as pd
qs_tokens=pd.read_csv("qs.csv")['query'].apply(sentence2tokens).tolist()

qs_kw=[]
for qtokens in qs_tokens:
  q_kw=[ps.stem(x) for x in qtokens if x not in stopwords] 
  qs_kw.append(q_kw)

In [16]:
stem_udf=F.udf(lambda xs: [ps.stem(x) for x in xs], ArrayType(StringType()))

def is_question(sentence):
  '''
  identify whether a sentence is a question
  '''
  return sentence.endswith('?')

is_question_udf=F.udf(is_question, BooleanType())

def is_match(pg,q_kw):
  '''
  Input: a paragraph, the query keywords
  Return: True if the paragraph contained any query keyword, otherwise false
  '''
  for q in q_kw:
    if q in pg:
      return True
  return False

In [17]:
# clean text and split text into sentences

for i in range(10):
  texts_k=spark.read.schema(schema).json(filepath[i], multiLine=True)

  # filter out paragraph that do not contain any keyword of question
  is_match_udf=F.udf(lambda x: is_match(x, qs_kw[i]), BooleanType())
  
  pg_k=texts_k.withColumn('paragraph', F.explode('body_text.text'))\
        .withColumn('pg_tokens', text2tokens_udf('paragraph'))\
        .withColumn('pg_token_stem', stem_udf('pg_tokens'))\
        .select('paper_id', 'paragraph')\
        .filter(is_match_udf('pg_token_stem')==True)

  # split paragraph into sentences
  # remove sentences ending with ?
  # remove sentences shorter than 7
  ss_k=pg_k.withColumn('sentences', text2sentence_udf('paragraph'))\
      .withColumn('sentence', F.explode('sentences'))\
      .withColumn('tokens', sentence2tokens_udf('sentence'))\
      .select('paper_id', 'sentence')\
      .filter((is_question_udf('sentence')==False) &
              (F.size('tokens')>=7))
  
  ss_k.rdd.map(lambda x: '\t'.join([x[0],x[1]])).saveAsTextFile("ss"+str(i)+".txt")
  spark.catalog.clearCache()

#### BlueBERT

* load BlueBERT model

In [4]:
from transformers import BertTokenizer
from transformers import BertModel
from transformers import BertConfig
import torch

bluebert_dir='/data/s2465922/NCBI_BERT_pubmed_mimic_uncased_L-12_H-768_A-12'
# load BlueBERT tokenizer
bluebert_tokenizer = BertTokenizer.from_pretrained(bluebert_dir)
# load BlueBERT model
bluebert_config = BertConfig.from_json_file(bluebert_dir+'/bert_config.json')
bluebert_model = BertModel.from_pretrained(bluebert_dir, config=bluebert_config)

In [5]:
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

def to_tokens(sentence):
  '''
  input: a sentence
  return: a list of tokens
  '''
  tokens=sentence2tokens(sentence)
  token_ids=bluebert_tokenizer.encode(tokens, add_special_tokens=False)
  return token_ids

def fix_len(x, max_len=128):
  '''
  input: a list of tokens
  return: a list of tokens with a fixed length
  '''
  fixed=[]
  if len(x)<max_len:
    fixed=x+[0]*(max_len-len(x))
  else:
    fixed=x[:max_len]
  return fixed

def bluebert_embed(batch):
  '''
  input: number of sentences * token ids with fixed length
  return: number of sentences * average embedding of words in a sentence
  '''
  batch=np.array(batch)
  attention_mask=np.where(batch != 0, 1, 0)
  outputs=bluebert_model(torch.tensor(batch), torch.tensor(attention_mask))
  embedding=outputs[0].mean(1).detach().numpy()
  return embedding

def predict_similarity(batch, q):
  '''
  input: 1. number of sentences * token ids with fixed length,
  2. sentence embedding of the given query
  return: an array of cosine similarity 
  '''
  q=q.reshape(1,-1)
  batch_emb=bluebert_embed(batch)
  return cosine_similarity(batch_emb, q).flatten()

def row_preprocess(row):
  return row.split('\t')

def row_postprocess(row, sim):
   return '\t'.join([row[0], row[1], format(sim, '.3f')])
  
def mapPartition_func(partition, q_emb, batch_size=32, threshold=0.72):
  '''
  This function is used to process each partition for predicting the cosine similarity 
  between each sentence and the given query.
  '''
  batch_rows, batch=[], []
  count=0
  for row in partition:
    pre_row=row_preprocess(row)
    batch_rows.append(pre_row)
    batch.append(fix_len(to_tokens(pre_row[1])))
    count+=1
    if count==batch_size:
      sims=predict_similarity(batch, q_emb)
      for i in range(len(sims)):
        if sims[i]>=threshold:
          yield row_postprocess(batch_rows[i], sims[i])
      count=0
      batch_rows, batch=[], []
      
  if count!=0:
    sims=predict_similarity(batch, q_emb)
    for i in range(len(sims)):
      if sims[i]>=threshold:
        yield row_postprocess(batch_rows[i], sims[i])
  

* query processing

In [6]:
import pandas as pd

qs_df=pd.read_csv('qs.csv')

import numpy as np

# convert each sentence to a sequence of tokens
qs_tokenized=qs_df['query'].apply(lambda x: fix_len(to_tokens(x), max_len=50))

# get the bluebert embeddings for each sentence
qs_emb=bluebert_embed(qs_tokenized.tolist())


* BlueBERT model prediction

In [7]:
from time import time

start=time()

sc=spark.sparkContext
spark.catalog.clearCache()

for i in range(10):
  ss_k_sim=sc.textFile('ss'+str(i)+".txt")\
          .mapPartitions(lambda x: mapPartition_func(x, qs_emb[i]))

  ss_k_sim.saveAsTextFile("result"+str(i))
  spark.catalog.clearCache()
  
print('Running time: %.3f min'%((time()-start)/60))



Running time: 99.648 min


* process the result

In [8]:
import numpy as np
sc=spark.sparkContext
res=[]

# load the result
for i in range(10):
  res_i=sc.textFile('result'+str(i)).map(lambda x: x.split('\t')).collect()

  sims=np.array([x[2] for x in res_i]).astype(float)
  ix=sims.argsort()[::-1][:20]

  # get the bluebert embedding of top 20 sentences
  for j in ix:
    res.append((i, res_i[j][0], res_i[j][1], sims[j]))
    
# save the final result to disk
import pandas as pd

res_df=pd.DataFrame(res, columns=['query_id', 'paper_id', 'sentence', 'similarity'])

token_ids=res_df['sentence'].apply(lambda x: fix_len(to_tokens(x))).tolist()
res_df['embedding']=bluebert_embed(token_ids).tolist()

res_df.to_pickle('result_bert_0722.pkl')
res_df.to_csv('result_bert_0722.csv')

#### Word2vec

* Word2vec model tranining

In [11]:
# split document into sentence
ss=ts.withColumn('sentences', text2sentence_udf('text'))\
    .withColumn('sentence', F.explode('sentences'))\
    .withColumn('tokens', sentence2tokens_udf('sentence'))\
    .select('paper_id', 'tokens')

In [13]:
#ss.count()

14953775

In [14]:
# create word embedding pipeline
from pyspark.ml.feature import StopWordsRemover, Normalizer, Word2Vec
stopwords = StopWordsRemover.loadDefaultStopWords('english')
sw_remover = StopWordsRemover(inputCol='tokens', outputCol='filtered', stopWords=stopwords)
word2vec = Word2Vec(vectorSize=100, minCount=5,seed=56,numPartitions=10, maxIter=5, 
                    windowSize=7, inputCol = 'filtered', outputCol = 'embedding')
normalizer=Normalizer(inputCol="embedding", outputCol="norm_embedding", p=2.0)

w2v_pipeline=Pipeline(stages=[sw_remover, word2vec, normalizer])

In [15]:
# fit word2vec model 
from time import time
start=time()

w2v_model=w2v_pipeline.fit(ss)
w2v_model.write().overwrite().save('w2v_model_0712')
print('Running time: %.3f min'%((time()-start)/60))

Running time: 132.906 min


* Query processing

In [27]:
#load word2vec model
loaded_w2v=PipelineModel.load("w2v_model_0712")

In [28]:
# apply word2vec model to query

# load questions
qs=spark.read.csv("qs.csv", header=True)\
  .withColumn('tokens', sentence2tokens_udf('query'))

# get word2vec sentence embeddings for each query
qs_emb=loaded_w2v.transform(qs)
qe_lst=[u[0] for u in qs_emb.select('norm_embedding').collect()]

* Word2vec model prediction

In [29]:
ss_schema=StructType([ 
  StructField("paper_id", StringType(), True),
  StructField("sentence", StringType(), True)
])

In [30]:
# computer the cosine similarity between each sentence and query
import pyspark.sql.functions as F
from time import time

start=time()
result=[]
spark.catalog.clearCache()

for i in range(len(qe_lst)):
  
  # get embedding vector for the given query
  q_emb=qe_lst[i]

  ss_k=spark.read.schema(ss_schema).json('ss'+str(i)+'.json')\
      .withColumn('tokens', sentence2tokens_udf('sentence'))
  
  # apply bert or word2vec model to text data
  ss_k_emb=loaded_w2v.transform(ss_k)

  # get embedding similarity between query and each sentence of papers
  get_sim=F.udf(lambda x: float(x.dot(q_emb)), FloatType())
  ss_k_sim=ss_k_emb.withColumn('similarity', get_sim('norm_embedding'))

  # rank the similarity and obtain the top 20 sentences
  ss_k_sim.createOrReplaceTempView("ssim")
  rank=spark.sql("select paper_id, sentence, embedding, similarity from ssim ORDER BY similarity DESC limit 20")
  
  result.append(rank.rdd.map(tuple).collect())
  
  spark.catalog.clearCache()
  
print('Running time: %.3f min'%((time()-start)/60))

# save candidate papers into the disk
import pickle
with open('result_w2v_0718', 'wb') as fp:
    pickle.dump(result, fp)

Running time: 6.581 min


* convert the result into dataframe

In [33]:
import pickle
filename='result_w2v_0718'
with open (filename, 'rb') as fp:
    data = pickle.load(fp)
    
# create dataframe
res=[]
for i in range(len(data)):
  for x in data[i]:
    res.append([i]+list(x))

res_df = pd.DataFrame(res, columns=['query_id','paper_id', 'sentence', 'embedding','similarity'])
res_df.to_pickle(filename+'.pkl')
res_df.to_csv(filename+'.csv')