In [2]:
# importing required packages
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Normalizer, Word2Vec 
from pyspark.ml.linalg import DenseVector, Vectors, VectorUDT
from pyspark.sql.functions import col, explode, udf, concat_ws, collect_list, split
from pyspark.ml.recommendation import ALS
from pyspark.sql.types import DoubleType

In [3]:
# Setting a spark session 
spark = SparkSession \
    .builder \
    .appName("Workload-2") \
    .getOrCreate()

# spark.conf.set("spark.sql.shuffle.partitions", 100)
sc = spark.sparkContext
sc.defaultParallelism
sc.getConf().getAll()
spark.conf.set('spark.sql.adaptive.enabled',True)

In [3]:
# reading data
data = spark.read.option("multiline","true").json('tweets.json')
# data.cache()spark.conf
# data.show(truncate=False)

In [4]:
# Data formatting for collaborative filtering
data2_formatting = data.withColumn("user_mentions", explode("user_mentions")).select(
  col('user_id'), col("user_mentions")["id"].alias("mention_users")).cache()
# data2_formatting.show()

data2_required = data2_formatting.groupBy("user_id", "mention_users").count().withColumnRenamed("count","rating").cache()
# data2_required.show()
# data2_required.dtypes
data2_formatting.unpersist()

DataFrame[user_id: bigint, mention_users: bigint]

In [5]:
# Mapping large id values to integer range values
distinct_tweet_uids = data2_required.select("user_id").distinct()
distinct_mention_uids = data2_required.select("mention_users").distinct()
# distinct_user_id.show()
distinct_ids_total = distinct_tweet_uids.union(distinct_mention_uids).distinct()
# distinct_ids_total.show()
indices_for_ids = distinct_ids_total.rdd.zipWithIndex().toDF()
# indices_for_ids.show()

mapping_df_for_uid = indices_for_ids.select("_1.*", "_2").withColumnRenamed("user_id","uid").withColumnRenamed("_2","new_uid").cache()

mapping_df_for_mid = mapping_df_for_uid.withColumnRenamed("new_uid","new_mid")

# print(mapping_df_for_uid.dtypes)
# print(mapping_df_for_mid.dtypes)

In [6]:
# Preparing data with the mapped values 
data_with_new_uid = data2_required.join(mapping_df_for_uid,data2_required.user_id==mapping_df_for_uid.uid,how='inner').drop('uid').cache()
# data_with_new_uid.show()

data_with_new_mid = data_with_new_uid.join(mapping_df_for_mid,data_with_new_uid.mention_users==mapping_df_for_mid.uid).drop('uid')
# data_with_new_mid.show()

In [7]:
# Building the recommendation model
als = ALS(rank=8,maxIter=20,regParam=0.01, implicitPrefs=True,userCol="new_uid", itemCol="new_mid", ratingCol="rating",coldStartStrategy="drop")
als_model = als.fit(data_with_new_mid)

# Recommending 5 items for each user
user_recommendations = als_model.recommendForAllUsers(5)

In [8]:
output = user_recommendations.join(mapping_df_for_uid,user_recommendations.new_uid==mapping_df_for_uid.new_uid).select(col('uid').alias('tweet_users'),col('recommendations').alias('mention_users'))
# output.dtypes

In [9]:
final_output = output.select('tweet_users','mention_users.new_mid')
# final_output.show()

In [10]:
# Re-mapping and getting back the original ids
mapping_dictionary = mapping_df_for_uid.rdd.map(lambda x: (x.new_uid, x.uid)).collectAsMap()
# type(mapping_dictionary)
# mapping_dictionary
mapping_array = udf(lambda x: [mapping_dictionary[key] for key in x])
final_required_output = final_output.withColumn('original_mids', mapping_array(final_output['new_mid']))

In [11]:
# Recommended mention users for each tweet user 
final_required_output.cache()
final_required_output.select('tweet_users',col('original_mids').alias('mention_users')).show(truncate=False)

+-------------------+---------------------------------------------------------------+
|tweet_users        |mention_users                                                  |
+-------------------+---------------------------------------------------------------+
|133350176          |[1349149096909668363, 355989081, 3290364847, 18831926, 939091] |
|1201278424553336835|[15115280, 191807697, 13393052, 18831926, 1349149096909668363] |
|183115377          |[133081348, 191807697, 22429979, 13393052, 18831926]           |
|3820494734         |[807095, 360019454, 428333, 18831926, 21802625]                |
|4096787361         |[133081348, 191807697, 22429979, 13393052, 18831926]           |
|70686448           |[26574283, 996693014251585536, 13850422, 185025785, 1004633989]|
|27884506           |[26574283, 996693014251585536, 13850422, 185025785, 1004633989]|
|23676863           |[133081348, 13393052, 22429979, 191807697, 299273962]          |
|1006173880568942593|[26574283, 996693014251585536, 13

In [4]:
# sc.getConf().getAll()

[('spark.app.startTime', '1622110396408'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.app.id', 'local-1622110398615'),
 ('spark.app.name', 'Workload-2'),
 ('spark.sql.warehouse.dir',
  'file:/home/jovyan/work/Assignment-2/spark-warehouse'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.port', '39613'),
 ('spark.yarn.historyServer.address', 'http://localhost:18080'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '836656c9dccc'),
 ('spark.eventLog.dir', 'file:///home/jovyan/work/spark-history'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.history.fs.logDirectory', 'file:///home/jovyan/work/spark-history')]