# Twitter Data: Word2Vec and collaborative filtering

### Spark Data Analytics

This project implements 2 analysis workloads using Twitter tweets data.

1. Finding the top 5 users with similar interests as a given Twitter user id. The involves using tweet information to create a document representation of the tweet IDs each user ID has interacted with, then using Word2Vec to calculate the similarity.

2. The second workload uses an ALS (Alternating Least Squares) collaborative filtering algorithm to suggest to a given user ID similar users to mention in a tweet. This is based on similarity between mention users they have mentioned in the past.

In [1]:
from pyspark import SparkConf,SparkContext
from pyspark.sql import DataFrame,SQLContext
from pyspark.sql import functions as function
from pyspark.ml import Pipeline
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.feature import Word2Vec
from pyspark.sql.types import StructField,IntegerType, StructType,StringType
from pyspark.ml.linalg import Vectors, SparseVector
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import arrays_zip, col
from pyspark.sql import Row
from pyspark.ml.recommendation import ALS
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql import SparkSession
from pyspark import SparkConf

import math
import csv
import datetime

# https://spark.apache.org/docs/latest/configuration.html

spark_conf = SparkConf() \
            .setAppName("Configuration 5").setAll([('spark.executor.instances', '2'), ('spark.executor.memory', '1g'), ('spark.driver.memory','1g'), ('spark.executor.cores', '10'), ('spark.sql.shuffle.partitions', '100')])

# sc.stop()
sc=SparkContext.getOrCreate(spark_conf)

sqlContext = SQLContext(sc)

In [2]:
sc.getConf().getAll()

[('spark.eventLog.enabled', 'true'),
 ('spark.app.startTime', '1622442881930'),
 ('spark.app.id', 'local-1622442884027'),
 ('spark.executor.id', 'driver'),
 ('spark.yarn.historyServer.address', 'http://localhost:18080'),
 ('spark.driver.port', '33771'),
 ('spark.app.name', 'Defaults'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '2d0bccdca3bb'),
 ('spark.eventLog.dir', 'file:///home/jovyan/work/spark-history'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.history.fs.logDirectory', 'file:///home/jovyan/work/spark-history')]

In [3]:
# Can use SparkRDD, SparkSQL and SparkML
# import json data
tweet = sqlContext.read.json("tweets.json", multiLine = "true")
tweet.cache()

DataFrame[created_at: string, hash_tags: array<struct<indices:array<bigint>,text:string>>, id: bigint, replyto_id: bigint, replyto_user_id: bigint, retweet_id: bigint, retweet_user_id: bigint, text: string, user_id: bigint, user_mentions: array<struct<id:bigint,indices:array<bigint>>>]

In [4]:
tweet.printSchema()
tweet.show(5,truncate=False)

root
 |-- created_at: string (nullable = true)
 |-- hash_tags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |-- id: long (nullable = true)
 |-- replyto_id: long (nullable = true)
 |-- replyto_user_id: long (nullable = true)
 |-- retweet_id: long (nullable = true)
 |-- retweet_user_id: long (nullable = true)
 |-- text: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)

+-------------------+---------------------+-------------------+----------+---------------+-------------------+---------------+---------------------------------------------------------------------------

## Workload 1. Top 5 Recommended Users with Similar Tweet Interests

### Preprocessing

In [5]:
# select id, replyto_user_id, retweet_user_id
tweet_reply = tweet.select("id","replyto_user_id") \
.filter(tweet.replyto_user_id.isNotNull()) \
.withColumnRenamed("id","tweet_id") \
.withColumnRenamed("replyto_user_id","user_id") \
.cache()

tweet_reply = tweet_reply.withColumn("tweet_id", tweet_reply["tweet_id"].cast(StringType()))
tweet_reply.show(15)

+-------------------+-------------------+
|           tweet_id|            user_id|
+-------------------+-------------------+
|1390088186945228802|          221718444|
|1390087562425950208|            8083722|
|1390087448093503494|          252794509|
|1390087436747698177| 739595086044876800|
|1390087424571756550|           19881665|
|1390086763901771783|1037346081619804163|
|1390085493057011712|          245587515|
|1390084965640069124|          598921658|
|1390084886459977746|          584972360|
|1390084259327746051|1238850236056924160|
|1390083736436367361|           77509999|
|1390083674432053250|          325542223|
|1390083654534275073|          298217736|
|1390083276308680708|           15012486|
|1390083019558596613|1243269028841492480|
+-------------------+-------------------+
only showing top 15 rows



In [6]:
# select id, replyto_user_id, retweet_user_id
tweet_retweet = tweet.select("id","retweet_user_id") \
.filter(tweet.retweet_user_id.isNotNull()) \
.withColumnRenamed("id","tweet_id") \
.withColumnRenamed("retweet_user_id","user_id").cache()

tweet_retweet = tweet_retweet.withColumn("tweet_id", tweet_retweet["tweet_id"].cast(StringType()))
tweet_retweet.show(15)

+-------------------+------------------+
|           tweet_id|           user_id|
+-------------------+------------------+
|1390088382659895296|            807095|
|1390088354717474822|         380648579|
|1390088345288646661|         191807697|
|1390088329530544128|          15115280|
|1390088318906339328|            807095|
|1390088314984697859|          20402945|
|1390088291559608321|          26574283|
|1390088288556388354|            807095|
|1390088280755081221|            807095|
|1390088274245472264|          15115280|
|1390088263403249667|          26574283|
|1390088257292079112|          36326893|
|1390088255958224899|          26574283|
|1390088227365609472|        3094649957|
|1390088227101368322|846411464885747712|
+-------------------+------------------+
only showing top 15 rows



In [7]:
# Append two table together vertically
tweet_join = tweet_reply.union(tweet_retweet)

In [8]:
# collect_set only grabs distinct values
tweet_set = tweet_join.groupBy("user_id") \
.agg(function.collect_list("tweet_id")) \
.withColumnRenamed("collect_list(tweet_id)","tweet_set")

In [9]:
tweet_set.dtypes

[('user_id', 'bigint'), ('tweet_set', 'array<string>')]

## Feature Extractor 1: Word2Vec

### Document Representation

In [10]:
tweet_set.show(10,truncate=False)
tweet_set.dtypes

+------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id           |tweet_set                                                                                                                                                                                                                                           

[('user_id', 'bigint'), ('tweet_set', 'array<string>')]

### Convert Array to String

In [11]:
# convert array to string
tweet_set_string = tweet_set.withColumn("tweet_set", concat_ws(" ", "tweet_set"))

### Cosine Similarity Function

In [12]:
import math

def cosine_similarity(rdd):
    user = rdd[0][0]
    comp = rdd[1][0]
    x = rdd[0][1]
    y = rdd[1][1]
    return ((user, comp), x.dot(y) / ((x.dot(x) **.5) * (y.dot(y) ** .5)))

### Word2Vec

In [13]:
# Word2Vec algorithm
word2vec = Word2Vec(vectorSize = 300, minCount = 1, inputCol = 'tweet_set', outputCol = 'result')
model = word2vec.fit(tweet_set)
result = model.transform(tweet_set)
result.take(1)

[Row(user_id=186520006, tweet_set=['1390049035248193544'], result=DenseVector([-0.0011, -0.0015, -0.0012, 0.0016, -0.0009, 0.0016, -0.0011, 0.001, 0.0008, 0.0016, -0.0016, -0.0007, 0.0002, -0.0005, -0.0006, 0.0011, -0.0012, 0.001, 0.001, 0.0005, 0.0017, -0.0001, 0.0013, -0.0001, -0.0016, -0.0003, -0.0015, -0.0008, 0.0007, -0.0017, 0.0013, -0.0013, 0.001, -0.0011, 0.0012, -0.0005, -0.0, -0.0014, -0.0, -0.0013, 0.0013, -0.0008, -0.0009, -0.0002, 0.001, 0.0011, 0.0004, -0.0009, 0.0006, -0.0004, -0.0004, 0.0013, -0.0012, 0.0002, -0.0004, -0.0011, 0.0011, -0.0012, 0.0013, -0.0009, -0.0005, 0.0003, -0.0014, 0.0014, -0.0012, -0.0015, -0.0005, 0.0002, -0.0003, -0.0013, 0.0014, -0.0005, -0.0006, 0.0013, 0.0014, 0.0003, 0.0003, 0.0013, -0.0013, 0.0005, -0.0015, 0.0004, 0.0016, 0.0012, -0.0004, -0.0016, 0.0014, -0.0006, -0.0007, -0.0014, 0.001, -0.0005, 0.0001, 0.0016, 0.0004, 0.0004, -0.0013, 0.0011, 0.0007, -0.0011, -0.0006, 0.0009, 0.0016, 0.0016, -0.0007, 0.0006, 0.0007, 0.0014, -0.0008, 0.00

### Creating Static User Vector and Comparison Vectors

In [14]:
# One static vector and all other vectors in dataframe result is DataFrame
user_id_w2v = result.filter(result.user_id == '138168339').select("user_id","result")
user_id_w2v.show()

+---------+--------------------+
|  user_id|              result|
+---------+--------------------+
|138168339|[-2.7001406579074...|
+---------+--------------------+



In [15]:
# Other 
all_other_w2v = result.filter(result.user_id != '138168339').select("user_id","result")
all_other_w2v.show()

+------------------+--------------------+
|           user_id|              result|
+------------------+--------------------+
|         186520006|[-0.0011121596908...|
|        1206227149|[-0.0016537785995...|
|          59811175|[-4.5114144086255...|
|        2791419078|[-8.0588259152136...|
|980129972160360448|[-0.0012216510949...|
|          23085502|[1.60405218861449...|
|          37935680|[-6.7934959952253...|
|         927388044|[2.00922891963273...|
|        1643123766|[8.07363307103514...|
|        2195885540|[-4.1051228451708...|
|          25146372|[-1.7893104813992...|
|         286332872|[6.02257241553161...|
|        2966462296|[2.33374477829784...|
|        3044682647|[-0.0014002505922...|
|          17672825|[1.54426903463900...|
|          32345449|[8.97124991752207...|
|          36326893|[6.01776138258477...|
|          40851610|[1.51855700551095...|
|          76918775|[1.8923262541648E...|
|950322380412342273|[-0.0015861816937...|
+------------------+--------------

In [16]:
# static userID = [0][0]
# comparison userID = [1][0]
# static userID = [0][1]   dense vector of static vector
# static userID = [1][1]   dense vector of comparison vector

# word2vec_cosine.map(lambda x: x[1][0]).collect()

In [17]:
user_id_w2v_rdd = user_id_w2v.rdd.map(lambda x: (x[0],x[1])).collect()
user_id_w2v_rdd = sc.parallelize(user_id_w2v_rdd)

all_other_w2v_rdd = all_other_w2v.rdd.map(lambda x: (x[0],x[1])).collect()
all_other_w2v_rdd = sc.parallelize(all_other_w2v_rdd)

word2vec_cosine = user_id_w2v_rdd.cartesian(all_other_w2v_rdd).map(cosine_similarity).cache()

### Word2Vec Recommendation for UserID = 138168339 - Top 5 Users by Cosine Similarity

In [18]:
# Columns are User_ID, recommended User_ID based on similar interests, and cosine similarity metric
final_Q1_w2v_desc = word2vec_cosine.sortBy(lambda x: x[1],ascending=False).take(5)
final_Q1_w2v_desc

[((138168339, 4319416272), 0.1679473594982805),
 ((138168339, 91905327), 0.15648947468856134),
 ((138168339, 345745234), 0.15575210018168117),
 ((138168339, 5392522), 0.15574439860369743),
 ((138168339, 21969867), 0.14604886511105758)]

## Feature Extractor 2: TF-IDF

In [19]:
# select only tweet_set for TF-IDF vector
tweet_tfidf = tweet_set_string.select("user_id", "tweet_set").cache()
tweet_tfidf.show(5,truncate=False)
tweet_tfidf.printSchema()

+------------------+-------------------------------------------------------------------------------+
|user_id           |tweet_set                                                                      |
+------------------+-------------------------------------------------------------------------------+
|186520006         |1390049035248193544                                                            |
|1206227149        |1390070125722705921                                                            |
|59811175          |1390067294634602497 1390057954771079181 1390056326949126146 1390046798652313601|
|2791419078        |1390047279814483971                                                            |
|980129972160360448|1390042413578731525                                                            |
+------------------+-------------------------------------------------------------------------------+
only showing top 5 rows

root
 |-- user_id: long (nullable = true)
 |-- tweet_set: string (

In [20]:
# Calculate TF-IDF
tokenizer = Tokenizer(inputCol="tweet_set", outputCol="words")
vectorizer  = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="rawFeatures",numFeatures=300)
idf = IDF(inputCol="rawFeatures", outputCol="features")

pipeline = Pipeline(stages=[tokenizer, vectorizer, idf])

model = pipeline.fit(tweet_tfidf)
# Data Frame
tfidf = model.transform(tweet_tfidf)
tfidf.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- tweet_set: string (nullable = false)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)



### Creating Static User Vector and Comparison Vectors

In [21]:
# User to compare other users to
user_id_tdidf = tfidf.filter(tfidf.user_id == '138168339').select("user_id","features").cache()

In [22]:
all_other_tdidf = tfidf.filter(tfidf.user_id != '138168339').select("user_id","features").cache()

In [23]:
# Calculate Cosine Similarity
user_id_tdidf_rdd = user_id_tdidf.rdd.map(lambda x: (x[0],x[1])).collect()
user_id_tdidf_rdd = sc.parallelize(user_id_tdidf_rdd)

all_other_tdidf_rdd = all_other_tdidf.rdd.map(lambda x: (x[0],x[1])).collect()
all_other_tdidf_rdd = sc.parallelize(all_other_tdidf_rdd)

tfidf_cosine = user_id_tdidf_rdd.cartesian(all_other_tdidf_rdd).map(cosine_similarity).cache()

### TF-IDF Recommendation for UserID = 138168339 - Top 5 Users by Cosine Similarity

In [24]:
# Columns are User_ID, recommended User_ID based on similar interests, and cosine similarity metric
final_Q1_idf_desc = tfidf_cosine.sortBy(lambda x: x[1],ascending=False).take(5)
final_Q1_idf_desc

[((138168339, 1906579968), 0.5135958160821892),
 ((138168339, 2512691774), 0.5135958160821892),
 ((138168339, 1497069360), 0.5135958160821892),
 ((138168339, 902902542857854977), 0.5135958160821892),
 ((138168339, 242898032), 0.5135958160821892)]

### Summary Workload 1

In [25]:
final_Q1_w2v_desc

[((138168339, 4319416272), 0.1679473594982805),
 ((138168339, 91905327), 0.15648947468856134),
 ((138168339, 345745234), 0.15575210018168117),
 ((138168339, 5392522), 0.15574439860369743),
 ((138168339, 21969867), 0.14604886511105758)]

In [26]:
final_Q1_idf_desc

[((138168339, 1906579968), 0.5135958160821892),
 ((138168339, 2512691774), 0.5135958160821892),
 ((138168339, 1497069360), 0.5135958160821892),
 ((138168339, 902902542857854977), 0.5135958160821892),
 ((138168339, 242898032), 0.5135958160821892)]

# Workload 2. User Recommendations

In [27]:
# Using collaborative filtering algorithm
tweet_als = tweet.select("user_id","user_mentions") \
.filter(tweet.user_mentions.isNotNull()) \
.cache()

# only need user_id and id= as mention_id
tweet_als.take(15)
type(tweet_als)

pyspark.sql.dataframe.DataFrame

### Pre-Processing

In [28]:
#from pyspark.sql.functions import arrays_zip, col
#from pyspark.sql import Row

# Number of mention_users selected by trial and error
tweet_als_2 = tweet_als.withColumn("mention_1", tweet_als["user_mentions"].getItem(0)) \
.withColumn("mention_2", tweet_als["user_mentions"].getItem(1)) \
.withColumn("mention_3", tweet_als["user_mentions"].getItem(2)) \
.withColumn("mention_4", tweet_als["user_mentions"].getItem(3)) \
.withColumn("mention_5", tweet_als["user_mentions"].getItem(4)) \
.withColumn("mention_6", tweet_als["user_mentions"].getItem(5)) \
.withColumn("mention_7", tweet_als["user_mentions"].getItem(6)) \
.withColumn("mention_8", tweet_als["user_mentions"].getItem(7))

tweet_als_2.printSchema()
tweet_als_2.show()

root
 |-- user_id: long (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |-- mention_1: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- mention_2: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- mention_3: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- mention_4: struct (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- indices: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |-- mention_5: struct (nullable

In [29]:
# still nested struct for user_mentions

expanded_1 = tweet_als_2.select("user_id",col("mention_1.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_1.isNotNull()).cache()
expanded_2 = tweet_als_2.select("user_id",col("mention_2.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_2.isNotNull()).cache()
expanded_3 = tweet_als_2.select("user_id",col("mention_3.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_3.isNotNull()).cache()
expanded_4 = tweet_als_2.select("user_id",col("mention_4.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_4.isNotNull()).cache()
expanded_5 = tweet_als_2.select("user_id",col("mention_5.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_5.isNotNull()).cache()
expanded_6 = tweet_als_2.select("user_id",col("mention_6.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_6.isNotNull()).cache()
expanded_7 = tweet_als_2.select("user_id",col("mention_7.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_7.isNotNull()).cache()
expanded_8 = tweet_als_2.select("user_id",col("mention_8.*")).select("user_id","id").withColumnRenamed("id","mention_id") \
.filter(tweet_als_2.mention_8.isNotNull()).cache()

# Example
expanded_1.printSchema()
type(expanded_1)
expanded_1.show()

root
 |-- user_id: long (nullable = true)
 |-- mention_id: long (nullable = true)

+-------------------+------------------+
|            user_id|        mention_id|
+-------------------+------------------+
|           17799542|            807095|
|         1166466828|         380648579|
|1343606436149022723|         191807697|
| 930226031276982273|          15115280|
| 920858307392192513|            807095|
|           21458110|          20402945|
| 787062740183552000|          26574283|
|         2955789098|            807095|
|          198453947|            807095|
|         1431726547|          15115280|
|1245145031045980163|          26574283|
|         2181244875|          36326893|
|           34865264|          26574283|
|          179912903|        3094649957|
|1173096863840423941|846411464885747712|
|          491594719|          15250661|
|           40404318|           7788062|
|1326851827879604226|        3094649957|
|            5567892|          26574283|
|           551

In [30]:
# UnionAll keeps duplicates
union1 = expanded_1.unionAll(expanded_2)
union2 = union1.unionAll(expanded_3)
union3 = union2.unionAll(expanded_4)
union4 = union3.unionAll(expanded_5)
union5 = union4.unionAll(expanded_6)
union6 = union5.unionAll(expanded_7)
union7 = union6.unionAll(expanded_8).cache()

# DataFrame
union7.printSchema()
union7.show()

root
 |-- user_id: long (nullable = true)
 |-- mention_id: long (nullable = true)

+-------------------+------------------+
|            user_id|        mention_id|
+-------------------+------------------+
|           17799542|            807095|
|         1166466828|         380648579|
|1343606436149022723|         191807697|
| 930226031276982273|          15115280|
| 920858307392192513|            807095|
|           21458110|          20402945|
| 787062740183552000|          26574283|
|         2955789098|            807095|
|          198453947|            807095|
|         1431726547|          15115280|
|1245145031045980163|          26574283|
|         2181244875|          36326893|
|           34865264|          26574283|
|          179912903|        3094649957|
|1173096863840423941|846411464885747712|
|          491594719|          15250661|
|           40404318|           7788062|
|1326851827879604226|        3094649957|
|            5567892|          26574283|
|           551

In [31]:
# Create Rating Column for count of times user_id mentioned mention_id
als_input = union7.select('user_id','mention_id').groupBy('user_id','mention_id').count() \
.toDF('user_id','mention_id','rating').sort('user_id', 'mention_id') \
.cache()

als_input.printSchema()

root
 |-- user_id: long (nullable = true)
 |-- mention_id: long (nullable = true)
 |-- rating: long (nullable = false)



In [32]:
# check
multiple_rating = als_input.filter(als_input.rating > 2).cache()
multiple_rating.show()

+-------------------+----------+------+
|            user_id|mention_id|rating|
+-------------------+----------+------+
|         3094649957|  24259259|     6|
|         3094649957|3094649957|     3|
| 898230537244073986| 281877818|     3|
|1286417148848480257|  55060090|     3|
+-------------------+----------+------+



### Creating Lookup Tables for User and Mention IDs

In [33]:
als_rdd = als_input.rdd.map(list).cache()
als_rdd.take(10)

[[29283, 45943310, 1],
 [29283, 1349149096909668363, 1],
 [29283, 1354219017721032709, 1],
 [50613, 380648579, 1],
 [661173, 807095, 1],
 [756287, 133081348, 1],
 [762115, 26574283, 1],
 [773664, 23085502, 1],
 [773664, 3290364847, 1],
 [817652, 83616201, 1]]

In [34]:
# Have to put assign to distinct IDs
user_als_id_LUT = sqlContext.createDataFrame(als_input.rdd.map(lambda x: x[0]).distinct().zipWithIndex(), StructType([StructField("userid", StringType(), True),StructField("user_als_id", IntegerType(), True)])).cache()

In [35]:
# als_input['user_id'].apply(lambda x: len(x)>9)
# len(als_input.user_id)
mention_als_id_LUT = sqlContext.createDataFrame(als_input.rdd.map(lambda x: x[1]).distinct().zipWithIndex(), StructType([StructField("mentionid", StringType(), True),StructField("mention_als_id", IntegerType(), True)])).cache()

In [36]:
# rename before join
user_als_id_LUT = user_als_id_LUT.withColumnRenamed("userid","user_id")
mention_als_id_LUT = mention_als_id_LUT.withColumnRenamed("mentionid","mention_id")

In [37]:
# Replace original table with lookup variables

user_join = als_input.join(user_als_id_LUT, als_input.user_id == user_als_id_LUT.user_id,how='left')
#user_join.show()

mention_join = user_join.join(mention_als_id_LUT, user_join.mention_id == mention_als_id_LUT.mention_id,how='left')
#mention_join.show()

als_input_index = mention_join.select("user_als_id","mention_als_id","rating").cache()

als_input_index.printSchema()
als_input_index.show(5)

root
 |-- user_als_id: integer (nullable = true)
 |-- mention_als_id: integer (nullable = true)
 |-- rating: long (nullable = false)

+-----------+--------------+------+
|user_als_id|mention_als_id|rating|
+-----------+--------------+------+
|       2494|           680|     1|
|       7639|           680|     1|
|       3969|           680|     1|
|       5208|           680|     1|
|       3404|            31|     1|
+-----------+--------------+------+
only showing top 5 rows



## ALS Model Training

In [38]:
# parrelism when doing parameter optimisation
# Split into test and training sets
import random
random.seed(123)
(training, test) = als_input_index.randomSplit([0.8, 0.2])

In [39]:
# Train the model
als = ALS(maxIter=5, regParam=0.01, userCol="user_als_id", itemCol="mention_als_id", ratingCol="rating",
          coldStartStrategy="drop")

model = als.fit(training)

In [40]:
# Output predictions
predictions = model.transform(test)

In [41]:
# top 5 mention users for each tweet user with rating value
user_recommendations = model.recommendForAllUsers(5)
user_recommendations.show(20)

+-----------+--------------------+
|user_als_id|     recommendations|
+-----------+--------------------+
|       1580|[{498, 0.9983822}...|
|       4900|[{704, 4.1718445}...|
|       5300|[{590, 0.9978583}...|
|       6620|[{635, 3.3646815}...|
|       7240|[{635, 3.3646815}...|
|       7340|[{340, 2.762645},...|
|       7880|[{340, 2.762645},...|
|        471|[{635, 3.3646815}...|
|       4101|[{635, 3.3646815}...|
|       1342|[{220, 3.2450476}...|
|       2122|[{635, 3.3646815}...|
|       2142|[{704, 4.1718445}...|
|        463|[{704, 4.1718445}...|
|        833|[{704, 4.1718445}...|
|       5803|[{704, 4.1718445}...|
|       7253|[{43, 0.9986244},...|
|       7833|[{635, 3.3646815}...|
|       7993|[{340, 2.762645},...|
|       7554|[{635, 3.3646815}...|
|       7754|[{704, 4.1718445}...|
+-----------+--------------------+
only showing top 20 rows



In [42]:
# Split out recommended users
user_recommendations_split = user_recommendations.withColumn("mention_1", user_recommendations["recommendations"].getItem(0)) \
.withColumn("mention_2", user_recommendations["recommendations"].getItem(1)) \
.withColumn("mention_3", user_recommendations["recommendations"].getItem(2)) \
.withColumn("mention_4", user_recommendations["recommendations"].getItem(3)) \
.withColumn("mention_5", user_recommendations["recommendations"].getItem(4))

user_recommendations_split.show(20)

+-----------+--------------------+----------------+-----------------+----------------+-----------------+-----------------+
|user_als_id|     recommendations|       mention_1|        mention_2|       mention_3|        mention_4|        mention_5|
+-----------+--------------------+----------------+-----------------+----------------+-----------------+-----------------+
|       1580|[{498, 0.9983822}...|{498, 0.9983822}| {207, 0.5535434}|{359, 0.5484498}|{225, 0.53368735}| {726, 0.5100828}|
|       4900|[{704, 4.1718445}...|{704, 4.1718445}| {628, 3.3765597}|{314, 3.3749497}| {596, 3.1945496}| {729, 3.1508334}|
|       5300|[{590, 0.9978583}...|{590, 0.9978583}|{635, 0.93047214}|{756, 0.7417017}| {311, 0.7230163}| {243, 0.6074761}|
|       6620|[{635, 3.3646815}...|{635, 3.3646815}| {391, 3.3398652}|{358, 3.1223652}| {704, 3.0805924}|  {128, 3.021338}|
|       7240|[{635, 3.3646815}...|{635, 3.3646815}| {391, 3.3398652}|{358, 3.1223652}| {704, 3.0805924}|  {128, 3.021338}|
|       7340|[{3

In [43]:
# Revert back to user_id and mention_id using lookup table
rec_expanded_1 = user_recommendations_split.select("user_als_id",col("mention_1.*")).select("user_als_id","mention_als_id") \
.cache()
rec_expanded_2 = user_recommendations_split.select("user_als_id",col("mention_2.*")).select("user_als_id","mention_als_id") \
.cache()
rec_expanded_3 = user_recommendations_split.select("user_als_id",col("mention_3.*")).select("user_als_id","mention_als_id") \
.cache()
rec_expanded_4 = user_recommendations_split.select("user_als_id",col("mention_4.*")).select("user_als_id","mention_als_id") \
.cache()
rec_expanded_5 = user_recommendations_split.select("user_als_id",col("mention_5.*")).select("user_als_id","mention_als_id") \
.cache()

rec_expanded_1.show(20)

+-----------+--------------+
|user_als_id|mention_als_id|
+-----------+--------------+
|       1580|           498|
|       4900|           704|
|       5300|           590|
|       6620|           635|
|       7240|           635|
|       7340|           340|
|       7880|           340|
|        471|           635|
|       4101|           635|
|       1342|           220|
|       2122|           635|
|       2142|           704|
|        463|           704|
|        833|           704|
|       5803|           704|
|       7253|            43|
|       7833|           635|
|       7993|           340|
|       7554|           635|
|       7754|           704|
+-----------+--------------+
only showing top 20 rows



In [44]:
# convert using lookup table
u_convert_1 = rec_expanded_1.join(user_als_id_LUT, rec_expanded_1.user_als_id == user_als_id_LUT.user_als_id,how='left')
u_convert_2 = rec_expanded_2.join(user_als_id_LUT, rec_expanded_2.user_als_id == user_als_id_LUT.user_als_id,how='left')
u_convert_3 = rec_expanded_3.join(user_als_id_LUT, rec_expanded_3.user_als_id == user_als_id_LUT.user_als_id,how='left')
u_convert_4 = rec_expanded_4.join(user_als_id_LUT, rec_expanded_4.user_als_id == user_als_id_LUT.user_als_id,how='left')
u_convert_5 = rec_expanded_5.join(user_als_id_LUT, rec_expanded_5.user_als_id == user_als_id_LUT.user_als_id,how='left')

convert_1 = u_convert_1.join(mention_als_id_LUT, u_convert_1.mention_als_id == mention_als_id_LUT.mention_als_id,how='left') \
.select("user_id","mention_id").withColumnRenamed("mention_id","rec_mention_1").cache()
convert_2 = u_convert_2.join(mention_als_id_LUT, u_convert_2.mention_als_id == mention_als_id_LUT.mention_als_id,how='left') \
.select("user_id","mention_id").withColumnRenamed("mention_id","rec_mention_2").withColumnRenamed("user_id","user_id_2").cache()
convert_3 = u_convert_3.join(mention_als_id_LUT, u_convert_3.mention_als_id == mention_als_id_LUT.mention_als_id,how='left') \
.select("user_id","mention_id").withColumnRenamed("mention_id","rec_mention_3").withColumnRenamed("user_id","user_id_3").cache()
convert_4 = u_convert_4.join(mention_als_id_LUT, u_convert_4.mention_als_id == mention_als_id_LUT.mention_als_id,how='left') \
.select("user_id","mention_id").withColumnRenamed("mention_id","rec_mention_4").withColumnRenamed("user_id","user_id_4").cache()
convert_5 = u_convert_5.join(mention_als_id_LUT, u_convert_5.mention_als_id == mention_als_id_LUT.mention_als_id,how='left') \
.select("user_id","mention_id").withColumnRenamed("mention_id","rec_mention_5").withColumnRenamed("user_id","user_id_5").cache()

convert_1.show(20)

+-------------------+------------------+
|            user_id|     rec_mention_1|
+-------------------+------------------+
|           88313436|          23424533|
|1324418983567757314|          12133382|
|          265373725|         149075155|
|          152337358|        1643123766|
|          575269972|        1643123766|
|          132125175|          85583894|
|          170283587|          85583894|
| 791526635182788609|        1643123766|
|           91268897|        1643123766|
|         1283499631|          24259259|
| 719943446300110849|        1643123766|
|1196958389416296449|          12133382|
|         1549482409|          12133382|
|           66753218|          12133382|
|          104159337|          12133382|
|         3383573572|865004396681207809|
|           23124586|        1643123766|
|         3001455391|          85583894|
|           67225979|        1643123766|
| 839055764136157184|          12133382|
+-------------------+------------------+
only showing top

### Top 5 Recommended Mention Users

In [45]:
# join back together
join_1 = convert_1.join(convert_2, convert_1.user_id == convert_2.user_id_2,how='left')
join_2 = join_1.join(convert_3, join_1.user_id == convert_3.user_id_3,how='left')
join_3 = join_2.join(convert_4, join_2.user_id == convert_4.user_id_4,how='left')
join_4 = join_3.join(convert_5, join_3.user_id == convert_5.user_id_5,how='left') \
.select("user_id","rec_mention_1","rec_mention_2","rec_mention_3","rec_mention_4","rec_mention_5")

# final recommended users list
join_4.show(20)

+-------------------+------------------+------------------+------------------+-------------------+-------------------+
|            user_id|     rec_mention_1|     rec_mention_2|     rec_mention_3|      rec_mention_4|      rec_mention_5|
+-------------------+------------------+------------------+------------------+-------------------+-------------------+
|         1054600164|          14167059|         157981564|         114968487|           24259259|           12133382|
|          107685130|          40851610|          14296273|        1643123766|           59159771|1354219017721032709|
|1124122599250911233|        1643123766|         252794509|         153944899|           12133382|         2876041031|
|1129048802017579008|          12133382|         157981564|         114968487|           16184358|          587591389|
|1250082361229729792|          92555364|          61534021|          55060090|1059652318285709312|           39511166|
|1339354662923808780|          12133382|        

In [47]:
type(join_4)

pyspark.sql.dataframe.DataFrame

In [53]:
final_Q1_w2v_desc

[((138168339, 4319416272), 0.1679473594982805),
 ((138168339, 91905327), 0.15648947468856134),
 ((138168339, 345745234), 0.15575210018168117),
 ((138168339, 5392522), 0.15574439860369743),
 ((138168339, 21969867), 0.14604886511105758)]

In [54]:
final_Q1_idf_desc

[((138168339, 1906579968), 0.5135958160821892),
 ((138168339, 2512691774), 0.5135958160821892),
 ((138168339, 1497069360), 0.5135958160821892),
 ((138168339, 902902542857854977), 0.5135958160821892),
 ((138168339, 242898032), 0.5135958160821892)]

In [55]:
join_4.show(20)

+-------------------+------------------+------------------+-------------+-------------+-------------+
|            user_id|     rec_mention_1|     rec_mention_2|rec_mention_3|rec_mention_4|rec_mention_5|
+-------------------+------------------+------------------+-------------+-------------+-------------+
|           88313436|          23424533|          25049056|     35169299|      2729061|     41704988|
|1324418983567757314|          12133382|         157981564|    114968487|     16184358|    587591389|
|          265373725|         149075155|        1643123766|     17464397|     15012486|     17154865|
|          152337358|        1643123766|         252794509|    153944899|     12133382|   2876041031|
|          575269972|        1643123766|         252794509|    153944899|     12133382|   2876041031|
|          132125175|          85583894|         252794509|     94482117|      9300262|    157981564|
|          170283587|          85583894|         252794509|     94482117|      930

press stop kernal to show in history server