In [1]:
# Spark installation on Colab
#!pip install --pyspark

In [2]:
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext("local")
spark = SparkSession.builder.getOrCreate()

Download the data if needed

In [4]:
# Download the data
# ! wget -q https://github.com/trajanov/BigDataAnalytics/raw/master/Spark-Example-Social-Media/data/comment_hasCreator_person.csv.bz2
# ! wget -q https://github.com/trajanov/BigDataAnalytics/raw/master/Spark-Example-Social-Media/data/comment_replyOf_post.csv.bz2
# ! wget -q https://github.com/trajanov/BigDataAnalytics/raw/master/Spark-Example-Social-Media/data/person_knows_person.csv.bz2
# ! wget -q https://github.com/trajanov/BigDataAnalytics/raw/master/Spark-Example-Social-Media/data/person_likes_post.csv.bz2
# ! wget -q https://github.com/trajanov/BigDataAnalytics/raw/master/Spark-Example-Social-Media/data/post_hasCreator_person.csv.bz2

In [5]:
import os
# Set your file path here 
path="data"

# You have 5 files 

fileCommentHasCreator= os.path.join(path,"comment_hasCreator_person.csv.bz2")
fileComment_replyOf_post = os.path.join(path,"comment_replyOf_post.csv.bz2")
filePerson_knows_person= os.path.join(path,"person_knows_person.csv.bz2")
filePerson_likes_post = os.path.join(path,"person_likes_post.csv.bz2")
filePost_hasCreator_person = os.path.join(path,"post_hasCreator_person.csv.bz2")

### Load the data and Create Dataframes

In [7]:
commentHasCreatorDD = spark.read.format('csv').options(header='true', inferSchema='true',  sep ="|").load(fileCommentHasCreator)
# Spark has a problm of using . (dot) in column names so we are converting each dot in the name to "_"
comments = commentHasCreatorDD.toDF(*(c.replace('.', '_') for c in commentHasCreatorDD.columns))
comment_replyOf_postDD = spark.read.format('csv').options(header='true', inferSchema='true',  sep ="|").load(fileComment_replyOf_post)
replays = comment_replyOf_postDD.toDF(*(c.replace('.', '_') for c in comment_replyOf_postDD.columns))
person_knows_personDD = spark.read.format('csv').options(header='true', inferSchema='true',  sep ="|").load(filePerson_knows_person)
knows = person_knows_personDD.toDF(*(c.replace('.', '_') for c in person_knows_personDD.columns))
person_likes_postDD = spark.read.format('csv').options(header='true', inferSchema='true',  sep ="|").load(filePerson_likes_post)
likes = person_likes_postDD.toDF(*(c.replace('.', '_') for c in person_likes_postDD.columns))
post_hasCreator_personDD = spark.read.format('csv').options(header='true', inferSchema='true',  sep ="|").load(filePost_hasCreator_person)
posts = post_hasCreator_personDD.toDF(*(c.replace('.', '_') for c in post_hasCreator_personDD.columns))

### Explore the data

In [8]:
comments.show(3)
replays.show(3)
posts.show(3)
likes.show(3)
knows.show(3)

+----------+---------+
|Comment_id|Person_id|
+----------+---------+
|         0|       74|
|        10|      832|
|        20|      913|
+----------+---------+
only showing top 3 rows

+----------+-------+
|Comment_id|Post_id|
+----------+-------+
|         0|      0|
|        10|      0|
|        30|      0|
+----------+-------+
only showing top 3 rows

+-------+---------+
|Post_id|Person_id|
+-------+---------+
|      0|       38|
|     10|       38|
|     20|       38|
+-------+---------+
only showing top 3 rows

+---------+-------+-------------------+
|Person_id|Post_id|       creationDate|
+---------+-------+-------------------+
|       74|      0|2012-10-15 07:13:41|
|       36|      0|2012-10-18 12:51:39|
|      417|     10|2012-11-18 12:14:47|
+---------+-------+-------------------+
only showing top 3 rows

+----------+----------+
|Person_id0|Person_id1|
+----------+----------+
|        38|       956|
|        38|       962|
|        38|       941|
+----------+----------+
only

In [9]:
# Question 1: 
# Consider that total user activities count to be the number of user’s posts, comments, likes and having friends.
# Which top-10 users have the highest activity counts?

commentsP = comments.groupBy("Person_id").count()
postsP = posts.groupBy("Person_id").count()
likesP = likes.groupBy("Person_id").count()
knowsP = knows.groupBy("Person_id0").count().toDF(*["Person_id","count"])

acitivties = commentsP.union(postsP)
acitivties = acitivties.union(likesP)
acitivties = acitivties.union(knowsP)

acitivtiesP = acitivties.groupBy("Person_id").sum("count").orderBy("sum(count)", ascending=False).limit(10)
acitivtiesP.show()

+---------+----------+
|Person_id|sum(count)|
+---------+----------+
|      918|      6690|
|       40|      6632|
|      959|      6538|
|       23|      6480|
|      838|      6397|
|      557|      6364|
|      280|      6349|
|      296|      6293|
|      922|      6285|
|      135|      6283|
+---------+----------+



In [10]:
# Question 2: 
# Which users wrote the top-10 highest active posts?
# Top-10 active posts are posts that have the highest number of comments and likes.

postLikes = posts.groupBy("Post_id").count()
postReplays = replays.groupBy("Post_id").count()
postsA = postLikes.union(postReplays)
postsActivites = postsA.groupBy("Post_id").sum("count").orderBy("sum(count)", ascending=False).limit(10)

postsActivites = postsActivites.join(posts,"Post_id").orderBy("sum(count)", ascending=False)
postsActivites.show()

+-------+----------+---------+
|Post_id|sum(count)|Person_id|
+-------+----------+---------+
| 445970|        12|      980|
| 596550|        11|      366|
| 618370|        11|      998|
|1335410|        11|      385|
| 675790|        11|      344|
|1059250|        11|      152|
|1289550|        11|      871|
|1716660|        11|      458|
| 938410|        11|      827|
|1095990|        10|        8|
+-------+----------+---------+



In [13]:
# Question 3: 
# Which top-10 users have the highest number of friends-of-friends plus direct friends?
# Each user is connect to a set of other users when we count up the number of connections 
# with 1-step connection plus direction connections.
# Count up for each user the number of unique friends-of-friends plus direct friends and 
# print out the top-10 of them. 

from pyspark.sql import functions as F

# We self join "knows" to create table for FOAF
knowsFOAF = knows.alias("l").join(knows.alias("r"),
        F.col("l.Person_id1") == F.col("r.Person_id0"), "left")\
        .select(*["l.Person_id0", "r.Person_id1"])
# Create a union to have FOAF and direct friends
knowsAll = knows.union(knowsFOAF)
# Collect all friends in an set, so we have only unique friends IDs in the 'knowsAndFOAF' column
knowsAllSet = knowsAll.groupBy('Person_id0').agg(F.collect_set('Person_id1').alias('knowsAndFOAF'))
# Calculate the size of 'knowsAndFOAF'
knowsAllSetNum = knowsAllSet.withColumn("Number",F.size("knowsAndFOAF"))
# Sort the table and take the top 10
knowsAllSetTop10 = knowsAllSetNum.orderBy("Number", ascending=False).limit(10)
knowsAllSetTop10.show()

+----------+--------------------+------+
|Person_id0|        knowsAndFOAF|Number|
+----------+--------------------+------+
|       194|[0, 356, 843, 437...|   809|
|       983|[0, 356, 843, 437...|   802|
|       938|[843, 0, 356, 437...|   795|
|       568|[0, 843, 356, 437...|   794|
|       811|[0, 356, 843, 437...|   791|
|       316|[0, 356, 843, 437...|   786|
|       866|[0, 356, 843, 437...|   783|
|       530|[0, 356, 437, 793...|   782|
|       404|[0, 356, 437, 793...|   780|
|       722|[843, 0, 356, 437...|   779|
+----------+--------------------+------+



In [14]:
# Question: 4
# What is the correlation coefficient between number of posts 
# and number of likes that users do in social network?
# You can calculate correlation coefficient using the following formula:

postsLikes = postsP.withColumnRenamed("count","posts")\
    .join(likesP.withColumnRenamed("count","likes"),"Person_id")
# There is predefined correlation function for DataFrames
correlation = postsLikes.stat.corr("posts","likes")
print(correlation)

0.35536844261116857


In [16]:
# Question 5: 
# What are the top-10 two similar posts (post1, post2) based on users who were active on theseposts?
# A user is active on a post when he posted it, comment on it, or liked it.
# Use the JaccardSimilarity based on the two user sets. Consider posts with more than 10 activities only. 
# Jaccard Similarity of two sets of A and B is defined as the following formula (1):
# J(A,B) = size(A intersect B)/size(A union B)

# Create dataframes wiht active users per post
postPersonCommented = replays.join(comments,"Comment_id").select(["Post_id","Person_id"])
postPersonPosted = posts
postPersonLiked = likes.select(["Post_id","Person_id"])

# Make a union of all user activities in one dataframe
postPersonAllActivities = postPersonCommented.union(postPersonPosted).union(postPersonLiked)

# Aggregate the all user activities in a set of users per post
postPersonAllActivitiesSet = postPersonAllActivities.groupBy('Post_id')\
    .agg(F.collect_set('Person_id').alias('Active_Persons'))\
    .withColumn("Number",F.size("Active_Persons"))

# Take only postst that have more than 12 unique persons that commented
postPersonAllActivitiesSetMin10 = postPersonAllActivitiesSet.where("Number>12")
postPersonAllActivitiesSetMin10 = postPersonAllActivitiesSetMin10.select(*["Post_id","Active_Persons"])

# Create a crossJoin (cartesian product) of posts
postsSim = postPersonAllActivitiesSetMin10.toDF("Post_id0","Active_Persons0")\
    .crossJoin(postPersonAllActivitiesSetMin10).where("Post_id0!=Post_id")

# Calculate the Jaccard similarity
postsJaccard = postsSim.withColumn("Jaccard",
    F.size(F.array_intersect(F.col("Active_Persons0"), F.col("Active_Persons")))/
    F.size(F.array_union(F.col("Active_Persons0"), F.col("Active_Persons")))
    )

# Take the top 10 most similar posts
postsJaccard10 = postsJaccard.orderBy("Jaccard", ascending=False).limit(10)

postsJaccard10.show()

+--------+--------------------+-------+--------------------+------------------+
|Post_id0|     Active_Persons0|Post_id|      Active_Persons|           Jaccard|
+--------+--------------------+-------+--------------------+------------------+
|  494340|[643, 180, 785, 4...| 495350|[643, 180, 785, 4...|               1.0|
|  495350|[643, 180, 785, 4...| 494340|[643, 180, 785, 4...|               1.0|
| 1415610|[15, 426, 495, 10...|1415070|[15, 426, 495, 10...|               1.0|
| 1291960|[951, 200, 564, 7...|1292510|[951, 200, 564, 7...|               1.0|
| 1292510|[951, 200, 564, 7...|1291960|[951, 200, 564, 7...|               1.0|
| 1415070|[15, 426, 495, 10...|1415610|[15, 426, 495, 10...|               1.0|
| 1481610|[778, 880, 557, 9...|1481680|[778, 557, 880, 9...|               1.0|
| 1481680|[778, 557, 880, 9...|1481610|[778, 880, 557, 9...|               1.0|
| 1667890|[219, 342, 300, 2...|1668350|[219, 342, 300, 2...|0.9285714285714286|
| 1668350|[219, 342, 300, 2...|1667890|[