In [1]:
#Graph Analytics on Twitter user data using SPARK GraphFrames 

In [2]:
from graphframes import *
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
vertices = sqlContext.sql("Select * from nodes_test_csv")

In [3]:
print("Total no of users present:",vertices.count())

In [4]:
edges = sqlContext.sql("Select * from edges_test_csv")

In [5]:
print("Total no of connections present:",edges.count())

In [6]:
g = GraphFrame(vertices, edges)
print(g)

In [7]:
#indegree represents number of incoming vertex(nodes) i.e followers
in_pydf=g.inDegrees

In [8]:
indegree_count_df=in_pydf.select('inDegree').groupby('inDegree').count().toPandas()

In [9]:
indegree_count_df

In [10]:
indegree_count_df['inDegree'].astype(str)

In [11]:
import matplotlib.pyplot as plt
import numpy as np
fig, ax = plt.subplots()
plt.figure(figsize=(16,16))
#pandas_df = joined_df.groupBy('device').agg(F.mean('outgoing_sms_per_month')).toPandas()
indegree_count_df.sort_values('count', axis = 0, ascending=False, inplace=True)
indegree_count_df = indegree_count_df[1:50]
inD = indegree_count_df['inDegree'].astype(str)
count = indegree_count_df['count']
x = np.arange(len(inD))
plt.bar(x, count)
plt.xticks(x, inD)
display(plt.show())

In [12]:
in_pydf.select('inDegree').approxQuantile("inDegree", [0.0,0.1,0.2,0.3,0.4,0.5,0.55,0.6,0.7,0.8,0.9,1.0], 0.0)
#Min number of followers:1
#Maximum no of followers:564512
#90% of users have less than 10 followers

In [13]:
#outdegree represents number of outgoing vertex(nodes) i.e following
out_pydf=g.outDegrees

In [14]:
out_pydf.select('outDegree').approxQuantile("outDegree", [0.0,0.1,0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9,1.0], 0.0)
#Min of following:1
#Maximum no of following: 214381
#90% of users are following less than 7 people 

In [15]:
display(g.outDegrees)

In [16]:
outdegree_count_df=out_pydf.select('outDegree').groupby('outDegree').count().toPandas()

In [17]:
outdegree_count_df['outDegree'].astype(str)

In [18]:
import matplotlib.pyplot as plt
import numpy as np

fig, ax = plt.subplots()
plt.figure(figsize=(16,16))
outdegree_count_df.sort_values('count', axis = 0, ascending=False, inplace=True)
outdegree_count_df = outdegree_count_df[1:50]
outD = outdegree_count_df['outDegree'].astype(str)
count = outdegree_count_df['count']
x = np.arange(len(outD))
plt.bar(x, count)
plt.xticks(x, outD)
display(plt.show())

In [19]:
#FIltering out dormant and active users, Followers with more than 10 (826734) are active and less than 10 are dormant
active_nodes=g.outDegrees.select('id').filter('outDegree>10')
dormant_nodes=g.outDegrees.select('id').filter('outDegree<10')

In [20]:
#Similarly filtering out less infulential and high influential people
inf_nodes=g.inDegrees.select('id').filter('inDegree>7')
non_inf_nodes=g.inDegrees.select('id').filter('inDegree<7')

In [21]:
non_inf_nodes.count() #51113 are non influential nodes/users

In [22]:
#The plot shows active, dormant, influential and non-influential users
type_d={'active':active_nodes.count(),'dormant':dormant_nodes.count(),'inf':inf_nodes.count(),'non_inf':non_inf_nodes.count()}

fig, ax = plt.subplots()
plt.figure(figsize=(16,16))

plt.bar(range(len(type_d)), type_d.values(), align='center')
plt.xticks(range(len(type_d)), list(type_d.keys()),rotation='vertical')

display(plt.show())

In [23]:

act_inf_nodes=(active_nodes.select('id')).intersect(inf_nodes.select('id'))
act_non_inf_nodes=(active_nodes.select('id')).intersect(non_inf_nodes.select('id'))

dor_inf_nodes=(dormant_nodes.select('id')).intersect(inf_nodes.select('id'))
dor_non_inf_nodes=(dormant_nodes.select('id')).intersect(non_inf_nodes.select('id'))


In [24]:
#The plot shows influential and non-influential active and dormant users
type_d={'act_inf':act_inf_nodes.count(),'act_non_inf':act_non_inf_nodes.count(),'dor_inf':dor_inf_nodes.count(),'dor_non_inf':dor_non_inf_nodes.count()}
 
fig, ax = plt.subplots()
plt.figure(figsize=(16,16))
plt.bar(range(len(type_d)), type_d.values(), align='center')        
plt.xticks(range(len(type_d)), list(type_d.keys()),rotation='vertical')
        
display(plt.show())

In [25]:
# Search for pairs of users that follow each other. A follows B and B follows A
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(a)")

display(motifs)
#motifs.count()

#220 users follow each other.

In [26]:
#Suggestions for user
# Search for pairs of users that follow each other. A follows B and B follows C, but A does not follow C
motifs = g.find("(a)-[e]->(b); (b)-[e2]->(c); !(a)-[]->(c)")
display(motifs)
#motifs.count() #234620 users
#This can be further used to filter users.

In [27]:
#
#Assigning each user to a community number based on it's strong connections to other users.A directed graph is strongly connected if there is a path between all pairs of #vertices.
result = g.stronglyConnectedComponents(maxIter=10)
#result.select("id", "component").orderBy("component").show()
#result.sort(result["id"].desc()).select('id', 'component').show(100)
result1 = result.select('id', 'component').groupby('component').count()
result1 = result1.sort(result1["count"].desc())


In [28]:
display(result1)

In [29]:
#Label Propogation: Community affiliation
#Each node in the network is initially assigned to its own community. At every superstep, nodes send their community affiliation to all neighbors and update their state to the mode community affiliation of incoming messages.
labels = g.labelPropagation(maxIter=5)
display(labels)

In [30]:
#Following displays dominant communities
label_df=labels.select('label').groupBy('label').count()
label_df = label_df.sort(label_df['count'].desc())
display(label_df)

In [31]:
#Page Rank: Identify important vertices in a graph based on connections.
pr = g.pageRank(resetProbability=0.15, tol=0.01)
display(pr.vertices)

In [32]:
display(pr.edges)

In [33]:
#Another community detection algorithm
results = g.triangleCount()
display(results)

In [34]:
tc_df=results.select('count').groupBy('count').count().toPandas()
tc_df.columns=['triangle_count','count']

In [35]:
tc_df

In [36]:
#Displaying top 30 triangle count values
fig, ax = plt.subplots()
plt.figure(figsize=(16,16))
#pandas_df = joined_df.groupBy('device').agg(F.mean('outgoing_sms_per_month')).toPandas()
tc_df.sort_values('count', axis = 0, ascending=False, inplace=True)

tc_df = tc_df[1:30]
outD = tc_df['triangle_count'].astype(str)
count = tc_df['count']
x = np.arange(len(outD))
plt.bar(x, count)
plt.xticks(x, outD)
display(plt.show())

In [37]:
#Users that have connections with average number of highest non-mutual friends are considered as spammers
spam_df = motifs
spam_users=spam_df.select('a','c').groupby('a').count().sort('count',ascending=False) 

In [38]:
motifs2 = g.find("(a)-[e]->(b)")
no_of_followers=motifs2.select('a','b').groupby('a').count().alias('count2')

In [40]:
spam_users = g.find("(a)-[e]->(b);(b)-[e2]->(c);(a)-[]->(c)") #users having few mutual friends are more prone to be spammers. Spammers are more likely to add random people
#, so the lesser the value, of groupby a by c, greater are the chances of him being a spammer
#spam_users=spam_users.select('a','b').groupby('a').count().alias('count2'

In [41]:
display(spam_users)

In [42]:
#Users that have connections with average number of lower mutual friends are considered as spammers
spam_users=spam_users.select('a','c').groupby('a').count().sort('count',ascending=False) 

In [43]:
motifs2 = g.find("(a)-[e]->(b)")
no_of_followers=motifs2.select('a','b').groupby('a').count().alias('count2')

In [44]:
from pyspark.sql.functions import *
#no_of_followers.withColumnRenamed('count','count2').collect()
no_of_followers=no_of_followers.select('a',col('count').alias('count2'))

In [45]:
from pyspark.sql.functions import *

df1 = spam_users.alias('df1')
df2 = no_of_followers.alias('df2')

df_merged=df1.join(df2,['a'])

In [46]:
df_merged.show()

In [47]:
#count2 is the number of total followers from A to B and count is the no of mutual friends, A is follower of B, B is follower of C, but A is follower of C. high ratio of count2 and count1 implies Spammer as following requests are usually sent to known connections. 
df_merged=df_merged.withColumn('avg_non_mut_frnd',(df_merged['count2']/df_merged['count'])).collect()

In [48]:
display(df_merged)
#after sorting avg_non_mut_frnd in descending order, user 25215 follows 1092 users out of which only 4 are mutual, so chances of him being a spammer are high