In [1]:
!pip3 install nltk



In [2]:
from pyspark.sql import functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, IntegerType, StringType
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, udf
from operator import add
import time
import csv
import re
from tempfile import NamedTemporaryFile

from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import RegexTokenizer, StopWordsRemover

import numpy as np
from nltk.stem import PorterStemmer

In [3]:
spark = SparkSession \
.builder \
.appName("cite") \
.config("spark.memory.fraction", 0.8) \
.config("spark.executor.memory", "14g") \
.config("spark.driver.memory", "12g")\
.config("spark.sql.shuffle.partitions" , "800") \
.config("spark.driver.maxResultSize",  0) \
.getOrCreate()

sc = spark.sparkContext


# Loading Data onto Dataframes

In [4]:

# loading the schema as mentioned in the README 
papersSchema = StructType([
    StructField("paper_id", StringType(), False),
    StructField("type", StringType(), True),
    StructField("journal", StringType(), True),
    StructField("book_title", StringType(), True),
    StructField("series", StringType(), True),
    StructField("publisher", StringType(), True),
    StructField("pages", IntegerType(), True),
    StructField("volume", IntegerType(), True),
    StructField("number", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("postedat", IntegerType(), True),
    StructField("address", StringType(), True),
    StructField("title", StringType(), True),
    StructField("abstract", StringType(), True)
])
# load csv data onto the RDD
papersDf = spark.read.csv("papers.csv", header = False, schema = papersSchema).select('paper_id', 'title','abstract')

# loading the users usersLibraries schema as mentioned in the README
usersLibrariesSchema = StructType([
    StructField("user_hash_id", StringType(), False),
    StructField("user_library", StringType(), False)
])
# load data into dataframe
usersLibrariesDf = spark.read.csv("users_libraries.txt", sep = ";", header = False, schema = usersLibrariesSchema)
usersLibrariesDf = usersLibrariesDf.select(usersLibrariesDf.user_hash_id, 
                                           F.split(usersLibrariesDf.user_library,',').alias('user_library_arr'))


# load stop words into a python list
stopWords = [line.rstrip('\n') for line in open('stopwords_en.txt')]


# Vector representation for the papers
Steps to generate "important" terms T 

In [5]:
# concatenating the title and the abstract of each paper - [paper_id, content]
papersDf = papersDf.select("paper_id", F.concat_ws(' ',papersDf.title, papersDf.abstract).alias("content"))

In [6]:
#tokenization

# initialize regex tokenizer to include all english alphabets and - and _
regexTokenizer = RegexTokenizer(inputCol="content", outputCol="tokens", pattern="[^A-Za-z-_]+")

# setting minimum token length to three to automatically filter out tokens with smaller lengths 
regexTokenizer.setMinTokenLength(3)

# Before running the tokenizer, remove all - and _ 
papersDf = papersDf.select('paper_id', F.regexp_replace(papersDf.content, "[-_]", "").alias('content'))

# apply tokenization - [paper_id, tokens]
papersDf = regexTokenizer.transform(papersDf).drop("content")


In [7]:
# removing stopwords 

# initialize stopwords remover with stopwords extracted from file
stopwordsremover = StopWordsRemover(stopWords=stopWords, inputCol="tokens", outputCol="words")
# apply stopwords remover on paperDf - [paper_id, words]
papersDf = stopwordsremover.transform(papersDf).drop("tokens")

In [8]:
# stemming

# expand before stemming so that operation can be applied on a column directly
papersDf = papersDf.select('paper_id', F.explode('words').alias('terms'))

# initialize porter stemmer instance
ps = PorterStemmer() 

# create a UDF to apply porter stemming on all the terms
udf_stem = udf(lambda x: ps.stem(x))

# apply the udf on the column - [paper_id, term]
papersDf = papersDf.withColumn("term", udf_stem(col("terms"))).drop("terms")

In [9]:
# removing irrelevant words
nPapers = papersDf.count()

# removing words that appear in more tham 10% of the papers
maxThreshold = np.floor(0.1*nPapers)
# removing words that appear in less that 20 papers
minThreshold = 20

# count all papers for each term - [term, paper_counts]
wordCountDf = papersDf.groupBy("term").count().select('term', col('count').alias('paper_counts'))

# filter out all the terms that do not satisfy the criterion
wordCountDf = wordCountDf.filter((wordCountDf.paper_counts > minThreshold) & (wordCountDf.paper_counts < maxThreshold))


In [10]:
# Limiting to only the top 1000 words

# sort by paper counts and limit to 1000
wordCountDf = wordCountDf.orderBy(wordCountDf.paper_counts.desc()).limit(1000)
# join with papersDf to select only those terms that remain after the above steps - [paper_id, term]
paperWordsDf = wordCountDf.join(papersDf, papersDf.term == wordCountDf.term).select('paper_id', wordCountDf.term)


### Bag of words representation

In [11]:
w = Window.orderBy(F.monotonically_increasing_id())
# create an index for words corresponding to row_number - 1
wordIndexDf = wordCountDf.withColumn("term_index", F.row_number().over(w)-1).drop('paper_counts')

# join with paper words to link with paper_id - [paper_id, term, term_index]
paperWordsDf = paperWordsDf.join(wordIndexDf, paperWordsDf.term == wordIndexDf.term).drop(wordIndexDf.term)


In [12]:
from pyspark.ml.feature import CountVectorizer

# initialize CountVectorizer 
cv = CountVectorizer(inputCol="terms", outputCol="TermFrequencyVector")

# collect paperWords by id into a list - [paper_id, terms]
collectedPaperWordsDf = paperWordsDf.groupBy('paper_id').agg(F.collect_list(col('term')).alias('terms'))

# fit the dataframe above to the CountVectorizer initialized above
cvModel = cv.fit(collectedPaperWordsDf)

# transform the dataframe according to the model to generate tf scores
paperTfDf = cvModel.transform(collectedPaperWordsDf)

# final output containing [paper_id, TermFrequencyVector]
paperTfDf = paperTfDf.drop('terms')


## TF-IDF representation of the papers

In [13]:
from pyspark.ml.feature import IDF
from pyspark.ml.linalg import Vectors, VectorUDT


# initializing IDF instance
idf = IDF(inputCol="TermFrequencyVector", outputCol="IdfVector")
# fit idf to the term frequency Df
idfModel = idf.fit(paperTfDf)
# generate idf scores and save as paperTfIdf dataframe
paperTfIdf = idfModel.transform(paperTfDf)

# utility function to multiply two sparse vectors - return a sparse matrix 
def sparse_multiply(x,y):
    res = np.multiply(x,y).tolist()
    # arguments to form a sparse matrix representation
    vec_args =  len(res), [i for i,x in enumerate(res) if x != 0], [x for x in res if x != 0] 
    return Vectors.sparse(*vec_args)

# user defined function to multiply sparse matrices (tf and idf vectors)
sparse_multiply_udf = udf(sparse_multiply, VectorUDT())

# generate tfIdfVector by multiplying termfrequency (tf) with idf vector
paperTfIdf = paperTfIdf.withColumn('TfIdfVector', sparse_multiply_udf(paperTfIdf.TermFrequencyVector, paperTfIdf.IdfVector))



## LDA

In [14]:
from pyspark.ml.clustering import LDA
from pyspark.sql.types import ArrayType, StringType

# initializing LDA instance with Expectation-Maximization as optimizer
lda = LDA(k=40, seed=1, optimizer="em", maxIter=5, featuresCol='TermFrequencyVector')
# fit the 
ldaModel = lda.fit(paperTfDf)
papersLdaDf = ldaModel.transform(paperTfDf)



In [None]:
# describe topics - returns indices of terms corresponding to different topics
topicsDescription = ldaModel.describeTopics(3)

# initialize vocabulary from the cvModel
vocab = cvModel.vocabulary

# function to convert term indices to words from the vocabulary
def indices_to_terms(termIndices):
    return [vocab[int(idx)] for idx in termIndices]

# user defined function to convert indices to words
indices_to_terms_udf = udf(indices_to_terms, ArrayType(StringType()))

# add column terms containing the words correspoding to the termIndices
topicsDescription = topicsDescription.withColumn("terms", indices_to_terms_udf(topicsDescription.termIndices))

print("top five terms for each topic:")
# we only neede topic and terms columns - drop others
topFiveTerms = topicsDescription.drop('termIndices', 'termWeights')

## User profiling

For calculating user profiles using 1. tf-idf scores and 2. LDA scores, we need to first need to write a utility function to calculate the sum of corresponding vectors. The tf-idf scores are in the form of sparse vectors, hence they muct be converted to a numpy array first and then a summation can be performed easily. The utility function must return a dense vector. This function is used as a udf 

In [17]:
# explode is used to separate the paper_ids from the user_library_arr list into separate rows (opposite of groupBy) [user_hash_id, paper_id]
usersPapersDf = usersLibrariesDf.select(usersLibrariesDf.user_hash_id, 
                                        F.explode(usersLibrariesDf.user_library_arr).alias('paper_id'))

In [18]:
from pyspark.ml.linalg import SparseVector, VectorUDT, Vectors

# utility function to calculate the sum of vectors
def sum_vectors(collectedTfIdfVector):
    # If it is a list of SparseVector - convert to numpy array first
    if type(collectedTfIdfVector[0]) is not list:
        collectedVector = np.asarray([tfVector.toArray() for tfVector in collectedTfIdfVector])
    else:
        collectedVector = np.asarray(collectedTfIdfVector)
    # Calculate the sum over the row axis and return as dense vector
    return Vectors.dense(np.sum(collectedVector, axis=0))

# user defined function to calculate the sum of all the tf-idf scores and return as a vector
sumVectorUdf = udf(sum_vectors, VectorUDT())

            

### TF-IDF based profiling

In [19]:
# function to compute tf-idf profiles as the sum of all the tf-idf scores
# the input Dataframe must be of the format [user_hash_id, paper_id]
def compute_tf_idf_profiles(usersPapersDf):
    # join with userPapersDf to formulate a mapping between user_hash_id and tf-idf scores
    usersTfIdf = usersPapersDf.join(paperTfIdf,  usersPapersDf.paper_id == paperTfIdf.paper_id).drop(paperTfIdf.paper_id)
    
    # collect the tfIdf vectors corresponding to a user as an array of sparse vectors
    usersTfIdf = usersTfIdf.groupBy('user_hash_id').agg(F.collect_list('TfIdfVector').alias('collectedTfIdfVector'))
    
    # return the sum of all the collected sparse tf-idf vectors 
    return usersTfIdf.select('user_hash_id', sumVectorUdf('collectedTfIdfVector').alias('tfIdfProfile'))

### LDA based profiling

In [21]:
# function to compute lda profiles as the sum of all the tf-idf scores
# the input Dataframe must be of the format [user_hash_id, paper_id]
def compute_lda_profiles(usersPapersDf):
    # join with userPapersDf to formulate a mapping between user_hash_id and lda scores
    usersLdaDf = usersPapersDf.join(papersLdaDf, papersLdaDf.paper_id == usersPapersDf.paper_id).drop(papersLdaDf.paper_id)
    
    # collect the lda vectors corresponding to a user as an array of lda vectors
    usersLdaDf = usersLdaDf.groupBy('user_hash_id').agg(F.collect_list('topicDistribution').alias('collectedTopicDistribution'))
    
    # return the sum of all the collected lda vectors 
    return usersLdaDf.select('user_hash_id', sumVectorUdf('collectedTopicDistribution').alias('ldaProfile'))


## Sampling and Data Preparation

In [22]:
def data_sampler(n):
    nUsers = usersLibrariesDf.count()
    
    # generate a sample containing 'n' users and their libraries 
    userLibrariesSampleDf = usersLibrariesDf.sample(fraction=n/nUsers, seed=1)
    
    # utility function to split the user_library corresponding to the user into train and test sets
    def split_library(user_library):
        lib_size = len(user_library)
        # 80% of samples in the training set and 20% in the test set
        split_idx = int(0.8*lib_size)
        return user_library[0:split_idx], user_library[split_idx:lib_size]
    
    # user defined function to split a library into two two arrays containing train and test samples 
    splitUdf = udf(split_library, ArrayType(ArrayType(StringType())))
    
    # split all the user libraries into train and test set using the above udf 
    userLibrariesSampleDf = userLibrariesSampleDf.withColumn('user_library_split', splitUdf(userLibrariesSampleDf.user_library_arr))
    
    # separate the train library as the array on the first index in the split user library - [user_hash_id, train_library]
    trainUserLibrarySampleDf = userLibrariesSampleDf.select('user_hash_id',
                                                            userLibrariesSampleDf.user_library_split[0].alias('train_library'))
    
    # separate the test library as the array on the second index in the split user library - [user_hash_id, test_library]
    testUserLibrarySampleDf = userLibrariesSampleDf.select('user_hash_id',
                                                           userLibrariesSampleDf.user_library_split[1].alias('test_library'))
    
    # computing the tf-idf and lda profiles for th training samples
    # first explode the train_library into separate paper_ids - [user_hash_id, paper_id]
    explodedTrainUserLibrarySampleDf = trainUserLibrarySampleDf.withColumn('paper_id', 
                                                                           F.explode(trainUserLibrarySampleDf.train_library))
    
    # generate tf-idf profiles
    tfIdfProfiles = compute_tf_idf_profiles(explodedTrainUserLibrarySampleDf)
    # generate lda profiles
    ldaProfiles = compute_lda_profiles(explodedTrainUserLibrarySampleDf)
    
    return tfIdfProfiles, ldaProfiles
    
