# Notebook for training information retrieval models

### Import packages

In [None]:
# Import packages
import numpy as np

from pyspark.sql import SparkSession
from pyspark import SQLContext
from pyspark.sql.functions import udf, size, explode, col, countDistinct, collect_list, monotonically_increasing_id, row_number
from pyspark.ml import Pipeline

from nltk.corpus import stopwords
from gensim.parsing.preprocessing import STOPWORDS as gensim_words
import spacy
sp = spacy.load('en_core_web_sm')

import os

from sparknlp.base import Finisher, DocumentAssembler
from sparknlp.annotator import Tokenizer, Normalizer, LemmatizerModel, StopWordsCleaner

from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.feature import Normalizer as Normalizer_L2

import time

In [None]:
nltk_stopwords = set(stopwords.words('english')) \
                    .union(set(stopwords.words('german'))) \
                    .union(set(stopwords.words('french')))
gensim_stopwords = set(gensim_words)
spacy_stopwords = sp.Defaults.stop_words
# https://countwordsfree.com/stopwords
cwf_stopwords = set(line.strip() for line in open('stop_words.txt'))

all_stopwords = list( nltk_stopwords \
                        .union(gensim_stopwords) \
                        .union(spacy_stopwords) \
                        .union(cwf_stopwords) )

### Create Spark Context and SQL Context

In [None]:
# Get the right paths on local machine
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ["PYSPARK_PYTHON"] = '/usr/bin/python3.7'
os.environ["PYSPARK_DRIVER_PYTHON"] = '/usr/bin/python3.7'

In [None]:
# Start spark session configured for spark nlp
spark = SparkSession.builder \
        .master('local[*]') \
        .appName('SDDM') \
        .config('spark.driver.memory', '8g') \
        .config('spark.executor.memory', '8g') \
        .config('spark.memory.fraction', '0.8') \
        .config('spark.executor.cores', '8') \
        .config('spark.local.dir', '/home/rikz/Documents/Master/Semester2/SDDM/data/tmp') \
        .config('spark.jars.packages', 'com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.0') \
        .getOrCreate()
print("Created a SparkSession")
sc = spark.sparkContext
print("Created a SparkContext")
sqlContext = SQLContext(sc)
print("Created a SQLContext")

#         .config('spark.local.dir', '/data/s1847503/SDDM/tmp') \

### Load the data into a SQLContext Dataframe

In [None]:
df = sqlContext.read.format('csv').options(header='true', maxColumns=2000000) \
        .load('/home/rikz/Documents/Master/Semester2/SDDM/data/data.csv')
#       .load('/data/s1847503/SDDM/newdata/data.csv')

df.show()

In [None]:
df_metadata = sqlContext.read.format('csv').options(header='true') \
                .load('/home/rikz/Documents/Master/Semester2/SDDM/data/metadata.csv') \
                .select(col('sha').alias('paper_id'), 'publish_time', 'title', 'doi', 'journal')

df_metadata.show()

### Initialize Annotators

In [None]:
# Pipeline for text
document_assembler = DocumentAssembler() \
                        .setInputCol('full_text') \
                        .setOutputCol('document')

# Tokenizer divides the text into tokens
tokenizer = Tokenizer() \
                .setInputCols(['document']) \
                .setOutputCol('tokens')

# Finisher converts tokens to human-readable output (we need the tokens for determining the text lengths)
finisher_tokens = Finisher() \
                        .setInputCols(['tokens']) \
                        .setCleanAnnotations(False)

# Normalizer removes punctuation, numbers etc.
normalizer = Normalizer() \
                .setInputCols(['tokens']) \
                .setOutputCol('normalized') \
                .setLowercase(True)

# Lemmatizer changes each word to its lemma
lemmatizer = LemmatizerModel.pretrained() \
                .setInputCols(['normalized']) \
                .setOutputCol('lemma')

# StopWordsCleaner removes stop words    
stopwords_cleaner = StopWordsCleaner() \
                        .setInputCols(['lemma']) \
                        .setOutputCol('clean_lemma') \
                        .setCaseSensitive(False).setStopWords(all_stopwords)

# Finisher converts clean tokens to human-readable output
finisher = Finisher() \
            .setInputCols(['clean_lemma']) \
            .setCleanAnnotations(False)

### Create Pipeline

In [None]:
# Pipeline for fully preprocessing the text
pipeline = Pipeline() \
            .setStages([
                document_assembler,
                tokenizer,
                normalizer,
                lemmatizer,
                stopwords_cleaner,
                finisher_tokens,
                finisher
             ])

### Preprocess questions

In [None]:
# questions = sqlContext.read.format('csv').options(header='true').load('/data/s1847503/SDDM/newdata/questions.csv')
questions = sqlContext.read.format('csv').options(header='true').load('/home/rikz/Documents/Master/Semester2/SDDM/data/questions.csv')
questions_clean = pipeline.fit(questions).transform(questions)
questions_clean = questions_clean.select('question_id', 'full_text', col('finished_clean_lemma').alias('preprocessed'))
questions_clean.show()

In [None]:
# Select the question from 0 to 9
question_num = 2

questions_clean = questions_clean.filter(questions_clean.question_id == question_num)
q = questions_clean.first().full_text
q

### Preprocess text

In [None]:
time_before = time.time()

In [None]:
# Peprocess the data
df = pipeline.fit(df).transform(df)
df = df.select('*', size('finished_tokens').alias('text_length'))

print("Before removing empty papers and duplicates: {} rows.".format(df.count()))
df = df.dropna(subset='full_text')
print("Removed empty papers")
df = df.dropDuplicates(subset=['full_text'])
print("Removed duplicates")
print("After removing empty papers and duplicates: {} rows.".format(df.count()))
print()

df = df.select(
                col('_c0').alias('id'),
                'paper_id',
                'title',
                'full_text',
                'text_length',
                col('finished_clean_lemma').alias('preprocessed')
            )

df.show()

In [None]:
time_after = time.time()

In [None]:
print('Preprocessing time: {} sec'.format(time_after-time_before) )

### TF-IDF

In [None]:
# Compute TF-IDF matrix for papers
tf_p = []
tf_idf_papers = []

tf_p = HashingTF(inputCol='preprocessed', outputCol='tf') \
                    .transform(df)

tf_idf_papers = IDF(inputCol='tf', outputCol='feature') \
                        .fit(tf_p) \
                        .transform(tf_p)

In [None]:
# Compute TF-IDF matrix for questions
tf_q = []
tf_idf_questions = []

tf_q = HashingTF(inputCol='preprocessed', outputCol='tf') \
                    .transform(questions_clean)

tf_idf_questions = IDF(inputCol='tf', outputCol='feature') \
                        .fit(tf_p) \
                        .transform(tf_q)

In [None]:
# tf_idf_papers.show()
tf_idf_questions = tf_idf_questions.select('question_id', 'feature')
tf_idf_papers = tf_idf_papers.select('id', 'feature')

In [None]:
# Compute L2-norm for papers and questions
normalizer_L2 = Normalizer_L2(inputCol='feature', outputCol='norm')
tf_idf_papers = normalizer_L2.transform(tf_idf_papers)
tf_idf_questions = normalizer_L2.transform(tf_idf_questions)

In [None]:
# Compute similarity matrix
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
matrix_q = IndexedRowMatrix(
                tf_idf_questions \
                    .select('question_id', 'norm') \
                    .rdd.map(lambda row: IndexedRow(row.question_id, row.norm.toArray()))
            ).toBlockMatrix()

matrix_p = IndexedRowMatrix(
                tf_idf_papers \
                    .select('id', 'norm') \
                    .rdd.map(lambda row: IndexedRow(row.id, row.norm.toArray()))
            ).toBlockMatrix()

In [None]:
sim_matrix = matrix_p.multiply(matrix_q.transpose())

In [None]:
sim_matrix = sim_matrix.toLocalMatrix().toArray()

In [None]:
relevant = sc.parallelize(sim_matrix[:, question_num].tolist()) \
                .zipWithIndex() \
                .toDF(['similarity', 'id'])

# Remove questions from the paper list
# Sort on cosine similarity
# Take the top 10 relevant documents
relevant = relevant.select('id', 'similarity') \
                .filter(relevant.id > 9) \
                .sort(col('similarity').desc()) \
                .limit(10)

relevant.show()

In [None]:
# Get the data of the 10 most relevant papers in order of relevance
relevant_ids = [int(row.id) for row in relevant.collect()]
print("Query: {}".format(q))
print()
print("Relevant Papers:")
print()
df_relevant = relevant.join(df.filter(df.id.isin(relevant_ids)), on=['id'], how='left_outer') \
                        .select('paper_id', 'similarity')
df_relevant.show()

In [None]:
# Create the summary table with the relevant paper from the metadata
df_relevant = df_relevant.join(df_metadata, on=['paper_id'], how='left_outer') \
                            .select('paper_id', 'publish_time', 'title', 'doi', 'journal', 'similarity') \
                            .toPandas() \
                            .sort_values(by='similarity', ascending=False)
df_relevant.head(10)

In [None]:
# Send the summary table to a csv file
df_relevant.to_csv('/home/rikz/Documents/Master/Semester2/SDDM/SDDM/summary_tables/{}.csv' \
                   .format(q.lower().replace(' ', '_')), index=False)
print("Summary table extracted and sent to csv file.")

### Word2Vec

In [None]:
from pyspark.ml.feature import Word2Vec
word2Vec = Word2Vec(inputCol='preprocessed', outputCol='word_vector')

In [None]:
model = word2Vec.fit(df)
df = model.transform(df)

In [None]:
df.show()

In [None]:
questions_clean = model.transform(questions_clean)
ques_vec = questions_clean.first().word_vector

In [None]:
# Calculate cosine similarity between a document vector and a question vector
def cossim(doc_vec): 
    global ques_vec
    sim = np.dot(doc_vec, ques_vec) / np.sqrt(np.dot(doc_vec, ques_vec)) / np.sqrt(np.dot(doc_vec, ques_vec)) 
    return float(sim)

cossim_udf = udf(cossim)

In [None]:
df2 = df.select('id', cossim_udf('word_vector').alias('similarity'))

In [None]:
df2.show()

In [None]:
# print("Before removing empty papers: {} rows.".format(df2.count()))
print("x")
df2 = df2.filter(df2.similarity.isNotNull())
print("After removing empty papers 1: {} rows.".format(df2.count()))
df2 = df2.filter(df2.similarity != 'NaN')
# df2 = df2.dropna(subset='similarity')
print("After removing empty papers 2: {} rows.".format(df2.count()))
df2 = df2.orderBy('similarity', ascending=False).show()

In [None]:
relevant = [40821, 40931, 40823, 40776, 40831]
for r in relevant:
    print(df.filter(df.id == 40821).first().title)
    print()

### Close Spark Context when done

In [None]:
sc.stop()