In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from pyspark.sql import Row
from pyspark.sql.types import StringType, FloatType, IntegerType
import pyspark.sql.functions as F
from pyspark.ml.feature import HashingTF, IDF, Normalizer, Word2Vec, StringIndexer, IndexToString
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator

spark = SparkSession \
        .builder \
        .appName("Workload One") \
        .getOrCreate()

# Workload 1

In [2]:
file_path = "tweets.json"
tweets = spark.read.option("multiline","true").json(file_path)

In [3]:
# Select 3 columns: user_id, replyto_id, and retweet_id; and change their type to Spark's StringType
tweets1 = tweets.select("user_id", "replyto_id", "retweet_id") \
                .withColumn("user_id", tweets["user_id"].cast(StringType())) \
                .withColumn("replyto_id", tweets["replyto_id"].cast(StringType())) \
                .withColumn("retweet_id", tweets["retweet_id"].cast(StringType()))

# Using "user_id" as the Group By key, and collect all replyto_ids and retweet_ids associated with a particular user_id 
# and put the collected list into 2 separate columns (renamed as "replyto_id_list" and "retweet_id_list", respectively).
tweets_uID_tID = tweets1.groupBy("user_id") \
                        .agg \
                            (
                                F.collect_list("replyto_id"), \
                                F.collect_list("retweet_id") \
                            ) \
                        .toDF("user_id", "replyto_id_list", "retweet_id_list")

# Concatenate 2 Dataframe columns, replyto_id_list and retweet_id_list, into 1 single list.
tweets_uID_doc = tweets_uID_tID.select("user_id", \
                                       F.concat(tweets_uID_tID.replyto_id_list, tweets_uID_tID.retweet_id_list)) \
                               .toDF("User_id","Documentation")

In [4]:
# Manually set a given user ID here
given_uID = '1054600164'

In [5]:
# The function cosine_similarity_top_n below does the following:
# 1. Cross Join (outer product) a given user ID to the dataframe (excluding the given user ID) to obtain 2 columns of 
# normalised vectors: 1 for the given user ID, and 1 that contains normalised vectors of all other user IDs.
# 2. Calculate the dot product between each pair of normalised vectors (this dot product is equivalent to cosine similarity, 
# as no division is needed since the norm of any normalised vector is 1.
# 3. Sort the resultant dataframe by cosine similarity, in descending order.
# 4. Take top 5 and convert the Spark dataframe to Pandas dataframe, and return this result.

def cosine_similarity_top_n(df, given_uID, norm_vector_name="norm", top_n=5):
    dot_udf = F.udf(lambda x,y: float(x.dot(y)), FloatType())
    result = df.where(df.User_id == given_uID).alias("i").crossJoin(df.where(df.User_id != given_uID).alias("j"))\
                       .select( \
                               F.col("i.User_id"), \
                               F.col("j.User_id"), \
                               dot_udf("i."+norm_vector_name, "j."+norm_vector_name).alias("cosine similarity") \
                              ) \
                       .orderBy("cosine similarity", ascending=False) \
                       .limit(top_n)
    return result

In [6]:
# Using Word2Vec
word2Vec = Word2Vec(inputCol="Documentation", outputCol="word2vec")
model = word2Vec.fit(tweets_uID_doc)
word2vec = model.transform(tweets_uID_doc)

# Compute L2-norm (L2-normalising all vectors)
normalizer = Normalizer(inputCol="word2vec", outputCol="norm")
word2vec_norm = normalizer.transform(word2vec)

result_word2vec = cosine_similarity_top_n(word2vec_norm, given_uID)
print(f"Using Word2Vec, top 5 users with similar interest as given user ID {given_uID} are:")
result_word2vec.collect()

Using Word2Vec, top 5 users with similar interest as given user ID 1054600164 are:


[Row(User_id='1054600164', User_id='397424454', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='608382409', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='47771310', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='714835347214110720', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='2619879655', cosine similarity=1.0)]

In [7]:
# Compute TF-IDF
hashingTF = HashingTF(inputCol="Documentation", outputCol="tf")
tf = hashingTF.transform(tweets_uID_doc)

idf = IDF(inputCol="tf", outputCol="tfidf").fit(tf)
tfidf = idf.transform(tf)

# Compute L2-norm (L2-normalising all vectors)
normalizer = Normalizer(inputCol="tfidf", outputCol="norm")
tfidf_norm = normalizer.transform(tfidf)

# Compute cosine similarity between 1 vector to the rest
# Note: for any pair of normalised vectors, we only need their vector product to calculate cosine similarity,
# because the norm of any normalised vectors (done above) = 1

dot_udf = F.udf(lambda x,y: float(x.dot(y)), FloatType())

result_tfidf = cosine_similarity_top_n(tfidf_norm, given_uID)

print(f"Using TF-IDF, top 5 users with similar interest as given user ID {given_uID} are:")
result_tfidf.collect()

Using TF-IDF, top 5 users with similar interest as given user ID 1054600164 are:


[Row(User_id='1054600164', User_id='47771310', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='397424454', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='714835347214110720', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='2619879655', cosine similarity=1.0),
 Row(User_id='1054600164', User_id='845893680', cosine similarity=1.0)]

# Workload 2

In [8]:
tweets \
      .select("user_id","user_mentions") \
      .show()

+-------------------+--------------------+
|            user_id|       user_mentions|
+-------------------+--------------------+
|           17799542| [[807095, [3, 11]]]|
|         1166466828|[[380648579, [3, ...|
|1343606436149022723|[[191807697, [3, ...|
| 930226031276982273|[[15115280, [3, 1...|
| 920858307392192513| [[807095, [3, 11]]]|
|           21458110|[[20402945, [3, 8...|
| 787062740183552000|[[26574283, [3, 1...|
|          392646132|                null|
|         2955789098| [[807095, [3, 11]]]|
| 792380204287164416|                null|
|          198453947| [[807095, [3, 11]]]|
|         1431726547|[[15115280, [3, 1...|
|1245145031045980163|[[26574283, [3, 1...|
|         2181244875|[[36326893, [3, 1...|
|           34865264|[[26574283, [3, 1...|
|          179912903|[[3094649957, [3,...|
|1173096863840423941|[[846411464885747...|
|          491594719|[[15250661, [119,...|
|           40404318|[[7788062, [3, 11]]]|
|1326851827879604226|[[3094649957, [3,...|
+----------

In [9]:
# Key function: explode

tweets \
      .select("user_id","user_mentions") \
      .withColumn("user_mention", F.explode("user_mentions")) \
      .show()

+-------------------+--------------------+--------------------+
|            user_id|       user_mentions|        user_mention|
+-------------------+--------------------+--------------------+
|           17799542| [[807095, [3, 11]]]|   [807095, [3, 11]]|
|         1166466828|[[380648579, [3, ...| [380648579, [3, 7]]|
|1343606436149022723|[[191807697, [3, ...|[191807697, [3, 16]]|
| 930226031276982273|[[15115280, [3, 1...| [15115280, [3, 16]]|
| 920858307392192513| [[807095, [3, 11]]]|   [807095, [3, 11]]|
|           21458110|[[20402945, [3, 8...|  [20402945, [3, 8]]|
|           21458110|[[20402945, [3, 8...|[21802625, [130, ...|
| 787062740183552000|[[26574283, [3, 1...| [26574283, [3, 11]]|
|         2955789098| [[807095, [3, 11]]]|   [807095, [3, 11]]|
|          198453947| [[807095, [3, 11]]]|   [807095, [3, 11]]|
|         1431726547|[[15115280, [3, 1...| [15115280, [3, 16]]|
|1245145031045980163|[[26574283, [3, 1...| [26574283, [3, 11]]|
|         2181244875|[[36326893, [3, 1..

In [10]:
# Key function 1: explode
# Key function 2: getField

tweets \
      .select("user_id","user_mentions") \
      .withColumn("user_mention", F.explode("user_mentions")) \
      .withColumn("user_mention_id", F.col("user_mention").getField("id")) \
      .select("user_id", "user_mention_id") \
      .show()

+-------------------+-------------------+
|            user_id|    user_mention_id|
+-------------------+-------------------+
|           17799542|             807095|
|         1166466828|          380648579|
|1343606436149022723|          191807697|
| 930226031276982273|           15115280|
| 920858307392192513|             807095|
|           21458110|           20402945|
|           21458110|           21802625|
| 787062740183552000|           26574283|
|         2955789098|             807095|
|          198453947|             807095|
|         1431726547|           15115280|
|1245145031045980163|           26574283|
|         2181244875|           36326893|
|         2181244875|1146329871418843136|
|         2181244875|           42183750|
|         2181244875|1003895325025865730|
|           34865264|           26574283|
|          179912903|         3094649957|
|          179912903|           24259259|
|          179912903|           21802625|
+-------------------+-------------

In [11]:
df = tweets \
      .select("user_id","user_mentions") \
      .withColumn("user_mention", F.explode("user_mentions")) \
      .withColumn("user_mention_id", F.col("user_mention").getField("id")) \
      .select("user_id", "user_mention_id") \
      .groupBy("user_id", "user_mention_id", ).count() \
      .orderBy("count", ascending=False) \

df.show()

+-------------------+------------------+-----+
|            user_id|   user_mention_id|count|
+-------------------+------------------+-----+
|         3094649957|          24259259|    6|
| 898230537244073986|         281877818|    3|
|         3094649957|        3094649957|    3|
|1286417148848480257|          55060090|    3|
|           97950174|         380648579|    2|
|1343557199394525186|         380648579|    2|
|1379485472158842887|          56488059|    2|
| 722337933907398656|         380648579|    2|
|1331611799053955073|          17674244|    2|
|          175604092|842678603305377793|    2|
| 918084577377136640|            807095|    2|
|          138011877|         380648579|    2|
|          372409362|           9814812|    2|
|          473714282|          19881665|    2|
|1345652009618255872|        1643123766|    2|
|          205463662|         380648579|    2|
|          109025128|          61863570|    2|
|          480875170|         380648579|    2|
|          16

In [12]:
indexer_user_id = StringIndexer(inputCol="user_id", outputCol="user_id_indexed")
indexer_user_mention_id = StringIndexer(inputCol="user_mention_id", outputCol="user_mention_id_indexed")

df_indexed1 = indexer_user_id.fit(df).transform(df)
df_indexed2 = indexer_user_mention_id \
                                          .fit(df_indexed1).transform(df_indexed1)
df_indexed_final = df_indexed2 \
                      .withColumn("user_id_indexed", df_indexed2["user_id_indexed"].cast(IntegerType())) \
                      .withColumn("user_mention_id_indexed", df_indexed2["user_mention_id_indexed"].cast(IntegerType())) \
                      .withColumn("count", df_indexed2["count"].cast(FloatType()))

df_indexed_final.show()

+-------------------+---------------+-----+---------------+-----------------------+
|            user_id|user_mention_id|count|user_id_indexed|user_mention_id_indexed|
+-------------------+---------------+-----+---------------+-----------------------+
|         3094649957|       24259259|  6.0|            262|                    164|
|         3094649957|     3094649957|  3.0|            262|                    161|
| 898230537244073986|      281877818|  3.0|            904|                    140|
|1286417148848480257|       55060090|  3.0|            438|                    138|
|1343557199394525186|      380648579|  2.0|            271|                      2|
| 722337933907398656|      380648579|  2.0|            298|                      2|
|          138011877|      380648579|  2.0|           5527|                      2|
|           97950174|      380648579|  2.0|           7534|                      2|
|          166579962|      380648579|  2.0|           4889|                 

In [13]:
df_indexed_final.where(df_indexed_final.user_mention_id_indexed == 608).show()

+------------------+-------------------+-----+---------------+-----------------------+
|           user_id|    user_mention_id|count|user_id_indexed|user_mention_id_indexed|
+------------------+-------------------+-----+---------------+-----------------------+
|822561271115550723|1238850236056924160|  1.0|             27|                    608|
+------------------+-------------------+-----+---------------+-----------------------+



In [14]:
(training, test) = df_indexed_final.randomSplit([0.8, 0.2])

# Build the recommendation model using ALS on the training data
als = ALS(userCol="user_id_indexed", itemCol="user_mention_id_indexed", ratingCol="count")
model = als.fit(training)

# Evaluate the model by computing the RMSE on the test data
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="count", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

In [15]:
predictions.show()

+-------------------+------------------+-----+---------------+-----------------------+-----------+
|            user_id|   user_mention_id|count|user_id_indexed|user_mention_id_indexed| prediction|
+-------------------+------------------+-----+---------------+-----------------------+-----------+
| 824684311827738624|        4319416272|  1.0|            293|                    471|        NaN|
|           95273513|          35240968|  1.0|           7598|                    737|        NaN|
|          226390770|           1917731|  1.0|           6610|                     31|        NaN|
|          320346750|           1917731|  1.0|            194|                     31|-0.09003855|
| 952740986064703488|           1917731|  1.0|             62|                     31|-0.09651223|
|           16172991|           1917731|  1.0|            354|                     31|        NaN|
|          197489550|           1917731|  1.0|           4283|                     31|        NaN|
|         

In [16]:
top_n = 5
userRecs = model.recommendForAllUsers(top_n)

In [17]:
userRecs.show()

+---------------+--------------------+
|user_id_indexed|     recommendations|
+---------------+--------------------+
|           1580|[[164, 1.6341316]...|
|           4900|[[164, 1.6341316]...|
|           5300|[[1, 0.9006183], ...|
|           6620|[[164, 1.6341316]...|
|           7340|[[1, 0.9006183], ...|
|           7880|[[1, 0.9006183], ...|
|           1591|[[308, 0.8998625]...|
|           1342|[[1, 0.9006183], ...|
|           2142|[[1, 0.9006183], ...|
|           7982|[[300, 1.1054603]...|
|            463|[[164, 1.9966516]...|
|            833|[[164, 1.1863202]...|
|           5803|[[1, 0.9006183], ...|
|           7253|[[1, 0.9006183], ...|
|           7833|[[164, 1.6341316]...|
|           6654|[[164, 0.9159051]...|
|           7754|[[300, 0.9351011]...|
|           3175|[[1, 0.9006183], ...|
|           4935|[[26, 0.90466726]...|
|            496|[[1, 0.9006183], ...|
+---------------+--------------------+
only showing top 20 rows



In [18]:
userRecs1 = userRecs \
                        .withColumn("recommendation", F.explode("recommendations")) \
                        .withColumn("user_mention_id_indexed", F.col("recommendation").getField("user_mention_id_indexed")) \
                        .select("user_id_indexed", "user_mention_id_indexed")

userRecs1.show()

+---------------+-----------------------+
|user_id_indexed|user_mention_id_indexed|
+---------------+-----------------------+
|           1580|                    164|
|           1580|                    298|
|           1580|                    337|
|           1580|                    722|
|           1580|                      0|
|           4900|                    164|
|           4900|                    298|
|           4900|                    337|
|           4900|                    722|
|           4900|                      0|
|           5300|                      1|
|           5300|                    502|
|           5300|                    126|
|           5300|                    688|
|           5300|                    201|
|           6620|                    164|
|           6620|                    298|
|           6620|                    337|
|           6620|                    722|
|           6620|                      0|
+---------------+-----------------

In [19]:
new_index = df_indexed_final.select("user_id","user_id_indexed").distinct()
new_index1 = df_indexed_final.select("user_mention_id","user_mention_id_indexed").distinct()


In [20]:
new_index1.show()

+-------------------+-----------------------+
|    user_mention_id|user_mention_id_indexed|
+-------------------+-----------------------+
|1108054880139264000|                    123|
| 887306762801614848|                    687|
|1343992358371799042|                    413|
|         2248932158|                    712|
|           15766082|                    160|
|           66723691|                    344|
|          528820859|                    227|
| 962490153007710208|                    386|
|           56488059|                    722|
|1358117090268446721|                     51|
|           97273575|                    242|
| 867744392223887360|                    142|
|          271319718|                    472|
|           15907183|                    110|
|1063180083769176064|                    545|
|           29083969|                    755|
|         2162764543|                    346|
|1352453439796768768|                    738|
|1341019687614820358|             

In [21]:
userRecs2 = userRecs1.join(new_index, on ="user_id_indexed").select("user_id","user_mention_id_indexed").orderBy("user_id", ascending=False) 
userRecs2.show()

+-------------------+-----------------------+
|            user_id|user_mention_id_indexed|
+-------------------+-----------------------+
|1390066304611192832|                      1|
|1390066304611192832|                    502|
|1390066304611192832|                    688|
|1390066304611192832|                    201|
|1390066304611192832|                    126|
|1390042324105760768|                     14|
|1390042324105760768|                    337|
|1390042324105760768|                    525|
|1390042324105760768|                     66|
|1390042324105760768|                    722|
|1389884106394243075|                    327|
|1389884106394243075|                    258|
|1389884106394243075|                     80|
|1389884106394243075|                     43|
|1389884106394243075|                    156|
|1389558006837391363|                    115|
|1389558006837391363|                    231|
|1389558006837391363|                      5|
|1389558006837391363|             

In [22]:
userRecs3 = new_index1.join(userRecs2, on="user_mention_id_indexed",how = "outer").select("user_id", "user_mention_id").orderBy("user_id", ascending=False) 
userRecs3.show()

+-------------------+-------------------+
|            user_id|    user_mention_id|
+-------------------+-------------------+
|1390066304611192832|         3300064909|
|1390066304611192832|          185025785|
|1390066304611192832|           26574283|
|1390066304611192832| 996693014251585536|
|1390066304611192832|1203444885287821312|
|1390042324105760768|           20457806|
|1390042324105760768|            5392522|
|1390042324105760768|         2361630415|
|1390042324105760768|            9300262|
|1390042324105760768|           56488059|
|1389884106394243075| 918579142416830466|
|1389884106394243075|            1652541|
|1389884106394243075|          221718444|
|1389884106394243075|           36326893|
|1389884106394243075|          298217736|
|1389558006837391363|           15115280|
|1389558006837391363|          224329419|
|1389558006837391363|          281877818|
|1389558006837391363|         2318270108|
|1389558006837391363| 807357676300730368|
+-------------------+-------------

In [23]:
userRecs4 = userRecs3.groupBy("user_id").agg(F.collect_list("user_mention_id").alias("Recommendation"))
userRecs4.show()

+-------------------+--------------------+
|            user_id|      Recommendation|
+-------------------+--------------------+
|1390066304611192832|[9966930142515855...|
|1390042324105760768|[2361630415, 5392...|
|1389884106394243075|[9185791424168304...|
|1389558006837391363|[8073576763007303...|
|1389350768348737537|[32828872, 242592...|
|1388954835656007681|[9966930142515855...|
|1388879909712498688|[24259259, 124401...|
|1388359473056690180|[9966930142515855...|
|1388207843812151296|[9966930142515855...|
|1387168414221938690|[250917489, 32043...|
|1386511248666234882|[1203444885287821...|
|1386393460685430786|[24259259, 124401...|
|1385988241518592006|[24259259, 124401...|
|1385979253104205825|[9966930142515855...|
|1385930603581771776|[24259259, 124401...|
|1385678014919348225|[32828872, 100508...|
|1385654656248938496|[32828872, 100508...|
|1385358484573761537|[24259259, 124401...|
|1385355916376215556|[24259259, 124401...|
|1385344825965400067|[428333, 90918200...|
+----------

In [27]:
userRecs4.withColumn("Recommendation 1",userRecs4.Recommendation.getItem(0))\
         .withColumn("Recommendation 2",userRecs4.Recommendation.getItem(1))\
         .withColumn("Recommendation 3",userRecs4.Recommendation.getItem(2))\
         .withColumn("Recommendation 4",userRecs4.Recommendation.getItem(3))\
         .withColumn("Recommendation 5",userRecs4.Recommendation.getItem(4))\
         .select("user_id", "Recommendation 1", "Recommendation 2", "Recommendation 3", "Recommendation 4", "Recommendation 5").show()

+-------------------+-------------------+-------------------+-------------------+----------------+------------------+
|            user_id|   Recommendation 1|   Recommendation 2|   Recommendation 3|Recommendation 4|  Recommendation 5|
+-------------------+-------------------+-------------------+-------------------+----------------+------------------+
|1390066304611192832| 996693014251585536|1203444885287821312|           26574283|      3300064909|         185025785|
|1390042324105760768|         2361630415|            5392522|           20457806|        56488059|           9300262|
|1389884106394243075| 918579142416830466|          298217736|            1652541|        36326893|         221718444|
|1389558006837391363| 807357676300730368|          281877818|           15115280|       224329419|        2318270108|
|1389350768348737537|           32828872|           24259259|           94482117|         4970411|         380648579|
|1388954835656007681| 996693014251585536|120344488528782