# Name: Umad ul hassan Rai

# Experiment 3

In [233]:
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.types import StringType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql import Row
from pyspark.sql.functions import concat, col, lit, split, udf, size, lit
from pyspark.sql import functions as sf
from operator import add
import pandas as pd
import pyspark
import time
import csv
import re


In [313]:
# initialise Spark Session, setting broadcast Timeout to 36000. 
sparkSession = SparkSession.builder.appName("Exercise1").config("spark.sql.broadcastTimeout", "36000").getOrCreate()
sc = sparkSession.sparkContext
sqlC = SQLContext(sc)

# Exercise 3.1 Vectors Representation

In [235]:
# Using csv to parse the papers.CSV file to map paper id and abstract as Key/Value
# Also allows to replace null bytes in csv file.
# Creating paperTermsRDD
# Concatenating the title and abstract fields[13] + " " + fields[14]
rdd2 = sparkSession.sparkContext.textFile("papers.csv")
paperTermsRDD = rdd2.map(lambda line: next(csv.reader(x.replace("\x00", "") for x in [line]))) \
                    .map(lambda fields: (fields[0], fields[13] + " " + fields[14]))
paperTermsRDD.take(1)

[('80546',
  "the arbitrariness of the genetic code the genetic code has been regarded as arbitrary in the sense that the codon-amino acid assignments could be different than they actually are. this general idea has been spelled out differently by previous, often rather implicit accounts of arbitrariness. they have drawn on the frozen accident theory, on evolutionary contingency, on alternative causal pathways, and on the absence of direct stereochemical interactions between codons and amino acids. it has also been suggested that the arbitrariness of the genetic code justifies attributing semantic information to macromolecules, notably to {dna}. i argue that these accounts of arbitrariness are unsatisfactory. i propose that the code is arbitrary in the sense of jacques monod's concept of chemical arbitrariness: the genetic code is arbitrary in that any codon requires certain chemical and structural properties to specify a particular amino acid, but these properties are not required in 

In [236]:
# Converting RDD to DF.
sentenceDataFrame = paperTermsRDD.toDF()
sentenceDataFrame = sentenceDataFrame.withColumnRenamed("_1", "Paper_Id").withColumnRenamed("_2", "Text")
sentenceDataFrame.show(1)

+--------+--------------------+
|Paper_Id|                Text|
+--------+--------------------+
|   80546|the arbitrariness...|
+--------+--------------------+
only showing top 1 row



In [None]:
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

# Using regex tokenizer to remove all non-word english characters except (-) and (_)
regexTokenizer = RegexTokenizer(inputCol="Text", outputCol="Words", pattern="[^a-z\_\-A-Z]")

# Counting tokens just for sake of seeing length of papers.
countTokens = udf(lambda words: len(words), IntegerType())

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized = regexTokenized.select("Paper_Id", "Words") \
    .withColumn("tokens", countTokens(col("Words")))
regexTokenized.show(1, False)

In [238]:
# Creating function to remove (-) and (_) from words
def remove_hyp_uscore(x):
    final = []
    for word in x:
        removed = word.replace("-","").replace("_","")
        final.append(removed)
    return final

# Creating function to remove words with length smaller than 3 to be used with udf
def rem_2len(x):
    final = []
    for word in x:
        if len(word)>2:
            final.append(word)
    return final

In [239]:
# Creating udf for removing (-) and (_), applying it on respective df
rem_hyp_udf = udf(remove_hyp_uscore, ArrayType(StringType()))

removed_hyp_df = regexTokenized.withColumn("Removed_Hyp", rem_hyp_udf(regexTokenized["words"])).\
                    select("Paper_Id","Removed_Hyp")

# Creating udf for removing words less than length 3 and applying it on respective df
rem_2len_words = udf(rem_2len, ArrayType(StringType()))

removed_2len_df = removed_hyp_df.withColumn("Rem_2len", rem_2len_words(removed_hyp_df["Removed_Hyp"])).\
                    select("Paper_Id", "Rem_2len")


In [None]:
removed_hyp_df.show(1,False)
removed_2len_df.show(1,False)

In [241]:
# Removing stopwords using Spark Stopwords remover.
from pyspark.ml.feature import StopWordsRemover

remover = StopWordsRemover(inputCol="Rem_2len", outputCol="Filtered")
stopwords_removed = remover.transform(removed_2len_df).select("Paper_Id", "Filtered")

In [None]:
stopwords_removed.show(1,False)

In [243]:
# Performing stemming by using PorterStemmer() from nltk
from nltk.stem import PorterStemmer

#creatig object of PorterStemmer()
porter = PorterStemmer()

# Function for performing stemming of words by iterating over list of each paper
def stemming(x):
    porter = PorterStemmer()
    final = []
    for word in x:
        stemmed = porter.stem(word)
        final.append(stemmed)
    return final

# Stemming udf
stem_udf = udf(stemming, ArrayType(StringType()))

# Stemming the DF which i got after removing stopwords.
stemmedDF = stopwords_removed.withColumn("Stemmed", stem_udf(stopwords_removed["Filtered"])).select("Paper_Id", "Stemmed")


In [None]:
stemmedDF.show(1,False)

In [245]:
# Exploding stemmed column to get paper_id|word df and can perform groupBy and other agg functions.
exploded_stems = stemmedDF.select(stemmedDF.Paper_Id, sf.explode(stemmedDF.Stemmed)).withColumnRenamed("col", "Word")
exploded_stems.show(5, False)

+--------+---------+
|Paper_Id|Word     |
+--------+---------+
|80546   |arbitrari|
|80546   |genet    |
|80546   |code     |
|80546   |genet    |
|80546   |code     |
+--------+---------+
only showing top 5 rows



In [247]:
# groupBy on Word, and then collect list and as set so we can perform filtering
# of words occuring in more than 10% papers and atleast 20 Papers.
grouped = exploded_stems.groupBy("Word").agg(sf.collect_list("Paper_Id"), sf.collect_set("Paper_Id")).\
            withColumnRenamed("collect_list(Paper_Id)", "Papers").withColumnRenamed("collect_set(Paper_Id)", "Papers_set")
grouped.show(2, False)

+-------+---------+----------+
|Word   |Papers   |Papers_set|
+-------+---------+----------+
|aaomega|[1270054]|[1270054] |
|abbrev |[758630] |[758630]  |
+-------+---------+----------+
only showing top 2 rows



In [248]:
# Counting the size of each paper set so we know in how many papers every word appeared.
grouped_withCount = grouped.withColumn("Count", size(("Papers_set")))

In [249]:
# We take size of set collected in previous step as it help us in filtering the words which are in more than 
# 10% of the paper and also easily filter for words which are atleast in 20 papers later on sorting helps to get top 1000 Words.
# Keeping list for now as it can help in preparing the Paper-Term Vector...
grouped_withCount.show(1,False)

+-------+---------+----------+-----+
|Word   |Papers   |Papers_set|Count|
+-------+---------+----------+-----+
|aaomega|[1270054]|[1270054] |1    |
+-------+---------+----------+-----+
only showing top 1 row



In [250]:
# Removing the words more than 10%, As we know total papers are 172079.
# Simply filtering data frame size column which contains the count of 
# papers each word has appeared in. Instead of sorting i am using filter
# Also, Selecting the words which appeared in atleast 20 papers. as we know count
# of Papers for each word. We can simply use filters on count.

final_words = grouped_withCount.filter(grouped_withCount.Count <= (0.1 * 172079) ).filter(grouped_withCount.Count >= 20)

In [None]:
final_words.show(2,False)

In [252]:
final_1000 = final_words.orderBy(final_words.Count.desc()).select("*").limit(1000)

In [253]:
final_1000.printSchema()

root
 |-- Word: string (nullable = true)
 |-- Papers: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Papers_set: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Count: integer (nullable = false)



In [258]:
# Trying to Use stemmed DF to create paper Term Vector. Dropping paper_set and Count column 
# Will use it to create TermVectors by converting it to the df as paper_id| list of words in paper
paper_term = final_1000.select("Word", "Papers")

In [259]:
paper_term.printSchema()
paper_term.show(2,False)

root
 |-- Word: string (nullable = true)
 |-- Papers: array (nullable = true)
 |    |-- element: string (containsNull = false)



In [260]:
# Exploding the Papers columm so we get 1 on 1 word-paper df
exploded_paperTerm = paper_term.select("Word", sf.explode("Papers")).withColumnRenamed("col", "Papers")

In [261]:
# Contains repititive words as we exploded list of papers colleted above because it will
# Further help us in counting number of appearance of specific term.
exploded_paperTerm.show(2)

+-----+-------+
| Word| Papers|
+-----+-------+
|activ|8823677|
|activ| 138401|
+-----+-------+
only showing top 2 rows



In [262]:
# Grouping the exploded_paperTerm by "Papers" column and collect all words as list. this will
# Give us dataframe as "Paper": "List of words" appearing in that paper.
grouped_paperTerm = exploded_paperTerm.groupBy("Papers").agg(sf.collect_list("Word")).withColumnRenamed("collect_list(Word)", "Words")

In [263]:
grouped_paperTerm.show(2,False)

+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Papers|Words                                                                                                                                                                                                                                                                                          |
+------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100   |[network, simpl, network, build, network, evolv, design, block, complex, motif, scienc]              

In [None]:
# Using count vectorizer to create sparsevector for frequency of each term in a paper.
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="Words", outputCol="features")
model = cv.fit(grouped_paperTerm)

result = model.transform(grouped_paperTerm)
result.show(2,truncate=False)

In [265]:
finalModel = result.select("Papers", "features").\
            withColumnRenamed("features", "TermFrequencyVector").withColumnRenamed("Papers", "paper_id")

In [268]:
finalModel.show(2,truncate = False)

+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|paper_id|TermFrequencyVector                                                                                                                                                                                             |
+--------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|100     |(1000,[2,6,17,184,226,406,499,546,790],[3.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0])                                                                                                                                   |
|100001  |(1000,[2,5,9,14,70,81,88,122,139,262,302,338,342,405,409,414,462,535,550,560,749,805,808,954,961,997],[1.0,2.0

# Exercise 3.2 TF-IDF

In [270]:
# Creating TF-IDF dataframe.
from pyspark.ml.feature import IDF, Tokenizer

idf = IDF(inputCol="TermFrequencyVector", outputCol="features")
idfModel = idf.fit(finalModel)
rescaledData = idfModel.transform(finalModel)

rescaledData.select("paper_id", "features").show(2,truncate = False)

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|paper_id|features                                                                                                                                                                                                                                                                                                                                                                                                          

In [271]:
# Creating tf_idf dataframe and renaming features column to ptf_idf
tf_idf = rescaledData.select("paper_id", "features").withColumnRenamed("features", "ptf_idf")

In [272]:
tf_idf.show(2, False)

+--------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|paper_id|ptf_idf                                                                                                                                                                                                                                                                                                                                                                                                           

# Exercise 3.3 LDA

In [273]:
# LDA
from pyspark.ml.clustering import LDA

# Trains a LDA model.
lda = LDA(k=40, maxIter=10, featuresCol="TermFrequencyVector")
model = lda.fit(finalModel)

# Describe topics.
topics = model.describeTopics(5)


# Shows the result
transformed = model.transform(finalModel)


In [None]:
# Showing topics and transformed finalModel.
topics.show(5, truncate=False)
transformed.show(truncate=False)

In [304]:
lda = transformed.select("paper_id", "topicDistribution")

In [305]:
lda.printSchema()

root
 |-- paper_id: string (nullable = false)
 |-- topicDistribution: vector (nullable = true)



In [None]:
lda.show(2,False)


#  Exercise 3.4 User Profiling

In [277]:
# Creating Users DataFrame from user_libraries.txt
# with two columns user_hash and Papers
# Using the same Dataframes as created in previous exercise.
# Using two columns as StringType (Later will assign second column (Papers_ID) to array.)
schema = StructType([
    StructField("User", StringType(), False),
    StructField("Papers_ID", StringType(), True)
])

# Creating DataFrame df for users_libraries
userDF = sqlC.read.schema(schema)\
        .option("header", 'False').option("delimiter", ";").csv("users_libraries.txt")

userDF.printSchema()
userDF.show(2, False)


root
 |-- User: string (nullable = true)
 |-- Papers_ID: string (nullable = true)



In [278]:
#Type casting from str to array so i can apply explode function on papers_id column.
def str_to_arr(my_list):
    my_list = my_list.split(",")
    return [x for x in my_list]

# String to array udf
str_to_arr_udf = udf(str_to_arr,ArrayType(StringType()))

# Converting from string to array
userDF = userDF.withColumn('PapersID',str_to_arr_udf(userDF["Papers_ID"]))
userDF = userDF.drop("Papers_ID")
userDF.printSchema()
userDF.show(2)

root
 |-- User: string (nullable = true)
 |-- PapersID: array (nullable = true)
 |    |-- element: string (containsNull = true)

+--------------------+--------------------+
|                User|            PapersID|
+--------------------+--------------------+
|f05bcffe7951de9e5...|[1158654, 478707,...|
|28d3f81251d94b097...|[3929762, 503574,...|
+--------------------+--------------------+
only showing top 2 rows

root
 |-- User: string (nullable = true)
 |-- PapersID: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [280]:
# Exploding PapersID column.
exploded_UserDF = userDF.select("User", sf.explode("PapersID")).withColumnRenamed("col", "paper_id")

In [281]:
exploded_UserDF.show(2,False)

+--------------------------------+--------+
|User                            |paper_id|
+--------------------------------+--------+
|f05bcffe7951de9e5a32fff4a42eb088|1158654 |
|f05bcffe7951de9e5a32fff4a42eb088|478707  |
+--------------------------------+--------+
only showing top 2 rows



# User Profiling using TF-IDF

In [332]:
# Requires 2 DFs, 1: paper_id|User_hash, 2: paper_id|ptf_idf
# Performs join on paper_id column of both dfs and then sum respective vectors for users.
def idf_user_profile(df1, df2, column1, column2):
    joined = df1.join(df2, column1)
    # Adding the sparseVector for each user's paper 
    sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
    summed = joined.withColumn("idf_sum", sum_(column2))
    # Summing all the sparseVectors sum of a user for all papers
    all_summed = summed.groupBy("User").sum("idf_sum").withColumnRenamed("sum(idf_sum)", "total_IDF")
    return all_summed


In [335]:
user_profile_idf = idf_user_profile(exploded_UserDF, tf_idf, "paper_id", "ptf_idf")

In [336]:
user_profile_idf.show(10,False)

+--------------------------------+------------------+
|User                            |total_IDF         |
+--------------------------------+------------------+
|f1e1cd4ff25018273aafc0c68fbb5a2f|23181.273284519546|
|188b2723f7349804c6a237f47b089982|724.0752267183952 |
|fc0d51c63591e5b0b12289c002b065c7|24986.54222147031 |
|236ef17c11f7f43ac467ce810bdfca2f|1831.3183879490953|
|9b87c5cc8095466b44b974a676ff39c3|1127.3446720314141|
|5e117499b0c8001c963aebbbf11646e2|9157.817445900286 |
|5b0c4c6f84f09ea85bdd9528ce8f9e42|9135.996828178533 |
|c1f57a368451d12e22abaad7e45c5f1d|10006.843481476979|
|4ae30712c901a64fdb7ad84375df3e28|4785.868851030114 |
|0d875b8672933b9a5bb121edc560791b|228.85625105158246|
+--------------------------------+------------------+
only showing top 10 rows



# User Profiling using LDA

In [316]:
# Requires 2 DFs, 1: paper_id|User_hash, 2: paper_id|topicDistribution
# Performs join on paper_id column of both dfs and then sum respective vectors for users.
def lda_user_profile(df1, df2, column1, column2):
    joined = df1.join(df2, column1)
    # Adding the sparseVector for each user's paper 
    sum_ = udf(lambda v: float(v.values.sum()), DoubleType())
    summed = joined.withColumn("lda_sum", sum_(column2))
    # Summing all the sparseVectors sum of a user for all papers
    all_summed = summed.groupBy("User").sum("lda_sum").withColumnRenamed("sum(lda_sum)", "total_LDA")
    return all_summed

In [317]:
user_profile_lda = lda_user_profile(exploded_UserDF, lda, "paper_id", "topicDistribution")

In [318]:
user_profile_lda.show(5, False)

+--------------------------------+-----------------+
|User                            |total_LDA        |
+--------------------------------+-----------------+
|f1e1cd4ff25018273aafc0c68fbb5a2f|168.0            |
|188b2723f7349804c6a237f47b089982|3.0              |
|fc0d51c63591e5b0b12289c002b065c7|236.0            |
|236ef17c11f7f43ac467ce810bdfca2f|9.999999999999998|
|9b87c5cc8095466b44b974a676ff39c3|10.0             |
+--------------------------------+-----------------+
only showing top 5 rows



# Exercise 3.5 Sampling and Data-preparation 

In [325]:
# Creating Function Sampler
# column1 = column to join training and idf df or training and lda df. "paper_id"
# column2 = column of idf df which will be used for sum. "ptf-idf"
# column3 = column of lda df which will be used for sum. "topicDistribution"
def sampler(userDF, tf_idf, lda, column1, column2, column3):
    samplee = userDF.sample(False,0.7,0)
    exploded_sample = samplee.select("User", sf.explode("PapersID")).withColumnRenamed("col", "paper_id")
    # Creating a fraction for every user, and set it as 0.8 so i can do stratified Sampling
    fraction = exploded_sample.select("User").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
    # Creating a training DF for users. By selecting fraction(0.8) of values for every user from exploded sample.
    training = exploded_sample.sampleBy("User", fraction, 0)
    # Creating Test DF by subtracting Training DF from Exploded_Sample.
    test = exploded_sample.select("*").subtract(training.select("*"))
    # Calling idf_user_profiling function
    sampled_tr_idf = idf_user_profile(training, tf_idf, column1, column2)
    # Calling lda_user_profilig function
    sampled_tr_lda = lda_user_profile(training, lda, column1, column3)
    
    # Writing to file as csv, which can be used in next experiment.
    sampled_tr_idf.write.option("sep",",").option("header","true").csv("TF-IDF")
    sampled_tr_lda.write.option("sep",",").option("header","true").csv("LDA")
    
    return sampled_tr_idf, sampled_tr_lda
    

In [326]:
sampled_tr_idf, sampled_tr_lda = sampler(userDF, tf_idf, lda, "paper_id", "ptf_idf", "topicDistribution")

In [329]:
sampled_tr_idf.show(10,False)

+--------------------------------+------------------+
|User                            |total_IDF         |
+--------------------------------+------------------+
|f1e1cd4ff25018273aafc0c68fbb5a2f|18879.434963006985|
|fc0d51c63591e5b0b12289c002b065c7|21059.515189072376|
|236ef17c11f7f43ac467ce810bdfca2f|1273.2746527179975|
|9b87c5cc8095466b44b974a676ff39c3|907.4855924678919 |
|5e117499b0c8001c963aebbbf11646e2|7067.655195121627 |
|5b0c4c6f84f09ea85bdd9528ce8f9e42|7375.065700218626 |
|c1f57a368451d12e22abaad7e45c5f1d|7611.099838311228 |
|4ae30712c901a64fdb7ad84375df3e28|3313.55374336235  |
|0d875b8672933b9a5bb121edc560791b|228.85625105158246|
|f422cf79c5cc56e92da4c5ab618a5e3c|547.6101949031281 |
+--------------------------------+------------------+
only showing top 10 rows



In [330]:
sampled_tr_lda.show(10,False)

+--------------------------------+------------------+
|User                            |total_LDA         |
+--------------------------------+------------------+
|f1e1cd4ff25018273aafc0c68fbb5a2f|129.0             |
|fc0d51c63591e5b0b12289c002b065c7|190.0             |
|236ef17c11f7f43ac467ce810bdfca2f|6.999999999999998 |
|9b87c5cc8095466b44b974a676ff39c3|8.0               |
|5e117499b0c8001c963aebbbf11646e2|44.0              |
|5b0c4c6f84f09ea85bdd9528ce8f9e42|41.0              |
|c1f57a368451d12e22abaad7e45c5f1d|40.0              |
|4ae30712c901a64fdb7ad84375df3e28|15.999999999999998|
|0d875b8672933b9a5bb121edc560791b|1.9999999999999998|
|f422cf79c5cc56e92da4c5ab618a5e3c|4.0               |
+--------------------------------+------------------+
only showing top 10 rows

