In [1]:
import os
import pyspark
conf = pyspark.SparkConf()
conf.set('spark.ui.proxyBase', '/user/' + os.environ['JUPYTERHUB_USER'] + '/proxy/4040')

sc = pyspark.SparkContext(conf=conf)
spark = pyspark.sql.SparkSession(sc)
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/15 04:49:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/03/15 04:49:33 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
from pyspark.sql.functions import *
from pyspark.sql.window import Window

In [3]:
# Read the input file and create a data frame with user ID and friends list
users_df = spark.read.option("delimiter", "\t").csv("Friends.txt") \
                    .withColumnRenamed("_c0", "user_id") \
                    .withColumnRenamed("_c1", "friends") \
                    .select("user_id", split("friends", ",").alias("friends_list"))

users_df.printSchema()
users_df.count()
users_df.show(5)

                                                                                

root
 |-- user_id: string (nullable = true)
 |-- friends_list: array (nullable = true)
 |    |-- element: string (containsNull = false)



                                                                                

+-------+--------------------+
|user_id|        friends_list|
+-------+--------------------+
|      0|[1, 2, 3, 4, 5, 6...|
|      1|[0, 5, 20, 135, 2...|
|      2|[0, 117, 135, 122...|
|      3|[0, 12, 41, 55, 1...|
|      4|[0, 8, 14, 15, 18...|
+-------+--------------------+
only showing top 5 rows



In [4]:
# Find pairs of users with mutual friends and store in a new data frame
mutual_friends_df = users_df.alias("u1") \
                     .join(users_df.alias("u2"), 
                           (col("u1.user_id") != col("u2.user_id")) & 
                           (size(array_intersect(col("u1.friends_list"), col("u2.friends_list"))) > 0)) \
                     .select(col("u1.user_id").alias("user1_id"), 
                             col("u2.user_id").alias("user2_id"), 
                             size(array_intersect(col("u1.friends_list"), col("u2.friends_list"))).alias("num_mutual_friends"))

mutual_friends_df.printSchema()
mutual_friends_df.count()
mutual_friends_df.show(5)

root
 |-- user1_id: string (nullable = true)
 |-- user2_id: string (nullable = true)
 |-- num_mutual_friends: integer (nullable = false)



                                                                                

+--------+--------+------------------+
|user1_id|user2_id|num_mutual_friends|
+--------+--------+------------------+
|       0|       1|                 2|
|       0|       3|                 3|
|       0|       4|                 9|
|       0|       5|                 2|
|       0|       6|                 1|
+--------+--------+------------------+
only showing top 5 rows



In [5]:
# Group by user1_id and collect the user2_id with mutual friends
mutual_friends_users_df = mutual_friends_df \
    .groupBy("user1_id") \
    .agg(collect_set(col("user2_id")).alias("mutual_friends_users")) \
    .join(users_df, (col("user1_id") == col("user_id")))

mutual_friends_users_df.printSchema()
mutual_friends_users_df.count()
mutual_friends_users_df.show(5)

root
 |-- user1_id: string (nullable = true)
 |-- mutual_friends_users: array (nullable = false)
 |    |-- element: string (containsNull = false)
 |-- user_id: string (nullable = true)
 |-- friends_list: array (nullable = true)
 |    |-- element: string (containsNull = false)



[Stage 21:>                                                         (0 + 1) / 1]

+--------+--------------------+-------+--------------------+
|user1_id|mutual_friends_users|user_id|        friends_list|
+--------+--------------------+-------+--------------------+
|       0|[27554, 35776, 29...|      0|[1, 2, 3, 4, 5, 6...|
|       1|[12846, 43447, 44...|      1|[0, 5, 20, 135, 2...|
|      10|[1532, 29319, 295...|     10|[0, 12, 16, 30, 6...|
|   10240|[35678, 1085, 351...|  10240|[19, 601, 660, 74...|
|   10623|[12846, 35648, 43...|  10623|[1, 2249, 4376, 5...|
+--------+--------------------+-------+--------------------+
only showing top 5 rows



                                                                                

In [6]:
# Select the recommended users who are not already friends
recommendations_df = mutual_friends_users_df.select(
    "user1_id", 
    array_except(col("mutual_friends_users"), col("friends_list")).alias("recommended_users")
)

# Explode the recommended users column into separate rows and rename columns
recommendations_df = recommendations_df.select(
    col("user1_id").alias("user1_id"),
    explode(col("recommended_users")).alias("user2_id")
)

recommendations_df.printSchema()
recommendations_df.count()
recommendations_df.show(5)

root
 |-- user1_id: string (nullable = true)
 |-- user2_id: string (nullable = false)



[Stage 34:>                                                         (0 + 1) / 1]

+--------+--------+
|user1_id|user2_id|
+--------+--------+
|       0|   27554|
|       0|   35776|
|       0|   29319|
|       0|   35678|
|       0|   19079|
+--------+--------+
only showing top 5 rows



                                                                                

In [23]:
# Join mutual_friends_df and recommendations_df on user1_id and user2_id to get the score
score_df = mutual_friends_df.join(
    recommendations_df.select("user1_id", "user2_id"),
    ["user1_id", "user2_id"],
    "inner"
).withColumnRenamed("num_mutual_friends", "score")

score_df.printSchema()
score_df.count()
score_df.show(5)

root
 |-- user1_id: string (nullable = true)
 |-- user2_id: string (nullable = true)
 |-- score: integer (nullable = false)



                                                                                

+--------+--------+-----+
|user1_id|user2_id|score|
+--------+--------+-----+
|       0|   49226|    1|
|       0|   45046|    1|
|       1|      41|    2|
|    1085|   49226|    2|
|    1100|     439|   24|
+--------+--------+-----+
only showing top 5 rows



In [24]:
# Create a window function to rank the scores for each user1_id and user2_id pair
window_spec = Window.partitionBy(col("user1_id")).orderBy(col("score").desc(), col("user2_id").asc())

In [25]:
# Rank the scores for each user1_id and user2_id pair
rank_df = score_df.withColumn("rank", rank().over(window_spec))

rank_df.printSchema()
rank_df.count()
rank_df.show()

root
 |-- user1_id: string (nullable = true)
 |-- user2_id: string (nullable = true)
 |-- score: integer (nullable = false)
 |-- rank: integer (nullable = false)



                                                                                

+--------+--------+-----+----+
|user1_id|user2_id|score|rank|
+--------+--------+-----+----+
|       0|   38737|    5|   1|
|       0|    1532|    3|   2|
|       0|   22939|    3|   3|
|       0|   30691|    3|   4|
|       0|   35589|    3|   5|
|       0|   49678|    3|   6|
|       0|   12570|    2|   7|
|       0|   12636|    2|   8|
|       0|   12846|    2|   9|
|       0|     135|    2|  10|
|       0|   19044|    2|  11|
|       0|   19079|    2|  12|
|       0|    2644|    2|  13|
|       0|   27679|    2|  14|
|       0|   28193|    2|  15|
|       0|   29724|    2|  16|
|       0|   29791|    2|  17|
|       0|   30257|    2|  18|
|       0|   31232|    2|  19|
|       0|   32317|    2|  20|
+--------+--------+-----+----+
only showing top 20 rows



In [26]:
# Filter out only the top 10 users for each user1_id
rank_df = rank_df.filter(col("rank") <= 10)

rank_df.printSchema()
rank_df.count()
rank_df.show()

root
 |-- user1_id: string (nullable = true)
 |-- user2_id: string (nullable = true)
 |-- score: integer (nullable = false)
 |-- rank: integer (nullable = false)



                                                                                

+--------+--------+-----+----+
|user1_id|user2_id|score|rank|
+--------+--------+-----+----+
|       0|   38737|    5|   1|
|       0|    1532|    3|   2|
|       0|   22939|    3|   3|
|       0|   30691|    3|   4|
|       0|   35589|    3|   5|
|       0|   49678|    3|   6|
|       0|   12570|    2|   7|
|       0|   12636|    2|   8|
|       0|   12846|    2|   9|
|       0|     135|    2|  10|
|       1|   13793|    4|   1|
|       1|    2659|    4|   2|
|       1|    3734|    3|   3|
|       1|   49574|    3|   4|
|       1|      12|    2|   5|
|       1|      13|    2|   6|
|       1|      19|    2|   7|
|       1|       2|    2|   8|
|       1|    2022|    2|   9|
|       1|   27679|    2|  10|
+--------+--------+-----+----+
only showing top 20 rows



In [28]:
# Group by user1_id and collect the top 10 user2_id for each user1_id
top_users_df = rank_df.groupBy("user1_id").agg(collect_list("user2_id").alias("top_users"))

top_users_df.printSchema()
top_users_df.count()
top_users_df.show()

root
 |-- user1_id: string (nullable = true)
 |-- top_users: array (nullable = false)
 |    |-- element: string (containsNull = false)



                                                                                

+--------+--------------------+
|user1_id|           top_users|
+--------+--------------------+
|       0|[38737, 1532, 229...|
|       1|[13793, 2659, 373...|
|      10|[18, 38, 89, 11, ...|
|   10240|[1100, 439, 2413,...|
|   10623|[8715, 8932, 0, 1...|
|    1085|[35613, 10240, 19...|
|      11|[27552, 10, 12, 3...|
|    1100|[439, 10240, 1220...|
|   11142|[1220, 12633, 169...|
|     117|[13793, 128, 1384...|
|      12|[28, 18, 24, 8, 2...|
|    1220|[1100, 10240, 267...|
|   12347|[134, 35613, 3563...|
|   12453|[2409, 38737, 984...|
|   12562|[12570, 13, 16, 2...|
|   12570|[20533, 20599, 95...|
|   12584|[12570, 2413, 32,...|
|   12633|[2659, 35678, 778...|
|   12636|[38737, 38800, 0,...|
|     128|[12570, 20533, 20...|
+--------+--------------------+
only showing top 20 rows



In [29]:
top_users_df.head()

                                                                                

Row(user1_id='0', top_users=['38737', '1532', '22939', '30691', '35589', '49678', '12570', '12636', '12846', '135'])