In [1]:
import networkx as nx
import community

import random, pickle

import pyspark
from pyspark import SparkContext
from pyspark.sql.functions import countDistinct
from pyspark.storagelevel import StorageLevel
import pandas as pd
import numpy as np



In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# Load tweet data and mentions data to use in network analysis

In [4]:
tweets = spark.read.parquet('tweets_all.parquet')
tweets.registerTempTable('tweets')

mentions = spark.read.parquet('mentions_all.parquet')
mentions.registerTempTable('mentions')

In [5]:
# ~5476 tweets seem to be retweets but aren't explicitly marked as such. It's not worth trying to integrate these.

temp = spark.sql("""
    select screen_name, text
    from tweets
    where text rlike '([^a-zA-Z0-9]|^)RT[^a-zA-Z0-9]+' and retweet_id is null
""")
temp.count()

5457

# Network Analysis

### 1. Create edges based on retweets, quotes, mentions, and replies
### 2. Sum those interactions for each dyad to produce weights
### 3. Create dictionary of other user data for graph

In [5]:
# Create edge from users to people they retweeted
# Weight edges by n_links
tweet_rt_edge = spark.sql("""
    select t.screen_name as screen_name1, rt.screen_name as screen_name2, count(*) as n_links
    from tweets as t
    left join tweets as rt
    on t.retweet_id = rt.tweet_id
    where t.retweet_id is not null and rt.screen_name is not null
    group by t.screen_name, rt.screen_name
""")
tweet_rt_edge.registerTempTable('tweet_rt_edge')
tweet_rt_edge.persist(StorageLevel.MEMORY_AND_DISK);

In [6]:
tweet_rt_edge.count()

679842

In [7]:
# Create edge from users to people they mentioned/quoted/replied to
interaction_edge = spark.sql("""
        select screen_name as screen_name1, quoted_screen_name as screen_name2, count(*) as n_links, 'quote' as desc
        from tweets as t
        where quoted_screen_name is not null
        group by screen_name, quoted_screen_name

        union all

        select screen_name as screen_name1, in_reply_to_screen_name as screen_name2, count(*) as n_links, 'reply' as desc
        from tweets as t
        where in_reply_to_tweet_id is not null
        group by screen_name, in_reply_to_screen_name

        union all

        select t.screen_name as screen_name1, m.mention_screen_name as screen_name2, count(*) as n_links,
            'mention' as desc
        from mentions as m
        left join tweets as t
        on t.tweet_id = m.tweet_id
        where t.retweet_id is null
        group by t.screen_name, m.mention_screen_name
""")
interaction_edge.registerTempTable('interaction_edge')
interaction_edge.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[screen_name1: string, screen_name2: string, n_links: bigint, desc: string]

In [8]:
interaction_edge.count()

283806

In [9]:
# Combine all edges into one list
all_edge = spark.sql("""
    select screen_name1, screen_name2, sum(n_links) as n_links
    from
        (select screen_name1, screen_name2, n_links
        from tweet_rt_edge

        union all

        select screen_name1, screen_name2, n_links
        from interaction_edge) sub
    where not screen_name1 = 'raimu0003'
    group by screen_name1, screen_name2
""")
# I had to delete the three tweets from 'raimu0003'
# Their data caused an issue with the Gephi import, possibly because 'u0003' is a unicode character

all_edge.registerTempTable('all_edge')
all_edges = all_edge.rdd.map(lambda x: (x['screen_name1'], x['screen_name2'], {'weight':x['n_links']})).collect()

In [11]:
len(all_edges)

884553

In [10]:
# Save other user data into one list
user_data = spark.sql("""
    select u.screen_name, u.name, u.description, u.followers_count,
        case when c.total_rts is null then 0 else total_rts end as total_rts
    from
       (select screen_name, name, description, followers_count
            from
                (select screen_name, name, description, followers_count,
                    row_number() over (partition by screen_name order by created_at desc) as n_repeats
                from tweets) sub
        where n_repeats = 1) as u
    left join
        (select screen_name, sum(retweet_count) as total_rts
        from tweets
        where retweet_id is null
        group by screen_name) as c
    on c.screen_name = u.screen_name
""")
user_data.registerTempTable('user_data')

user_data_dict = {}
for row in user_data.rdd.collect():
    user_data_dict[row['screen_name']] = {'name':row['name'],
                                        'description':row['description'],
                                        'followers_count':row['followers_count'],
                                        'total_rts':row['total_rts']}

### 1. Create graph object based on the above edges  
### 2. Find giant component  
### 3. Assign communities to each node in giant component

In [11]:
A=nx.DiGraph()
A.add_edges_from(all_edges)

# Remove self RTs or mentions
A.remove_edges_from(A.selfloop_edges())

In [13]:
# The giant component is vastly larger than all the other connected subgraphs in the network, so it's safe to
# perform analysis only on the giant component.

A_subgraphs = sorted(nx.connected_component_subgraphs(A.to_undirected()), key=len, reverse=True)
print('Size of largest connected components:', [len(g) for g in A_subgraphs[:20]])

Size of largest connected components: [480331, 163, 89, 76, 73, 41, 39, 34, 30, 28, 26, 25, 25, 23, 23, 23, 22, 21, 20, 19]


In [15]:
# Determine which community each user is in
partition = community.best_partition(A_subgraphs[0].to_undirected())

### 1. Find largest strongly connected component (SCC)  
### 2. Create graph from SCC  
### 3. Calculate PageRank of all users in SCC  
### 4. Save PageRank and Community data to SCC graph 

In [18]:
# Find strongly connected components
A_conn_comp = sorted(nx.strongly_connected_components(A),key=len, reverse=True)
print('Size of largest strongly connected components:', [len(g) for g in A_conn_comp[:20]])

Size of largest strongly connected components: [14321, 12, 12, 11, 11, 9, 9, 9, 9, 8, 8, 8, 7, 7, 7, 7, 7, 7, 6, 6]


In [18]:
# Create graph of largest strongly connected component
S = nx.subgraph(A, A_conn_comp[0])
pr = nx.pagerank(S)

# Save PageRank and Community Membership to the data dict for each node
for n, d in S.nodes(data=True):
    d['pagerank'] = pr[n]
    d['community'] = partition[n]
    d['name'] = user_data_dict[n]['name']
    #d['description'] = user_data_dict[n]['description']
    d['followers_count'] = user_data_dict[n]['followers_count']
    d['total_rts'] = user_data_dict[n]['total_rts']

## Save data

In [19]:
# Export graph to Gephi
nx.write_gexf(S, 'tweet_network_final.gexf')

In [20]:
# Save Community and PageRank data
coms = sc.parallelize([(k, str(v)) for k, v in partition.items()]). \
    toDF(['screen_name', 'community'])
coms.write.parquet('communities.parquet')

In [21]:
pageranks = sc.parallelize([(k, str(v)) for k, v in pr.items()]). \
    toDF(['screen_name', 'pagerank'])
pageranks.write.parquet('pageranks.parquet')