# Initial Data Processing

In [41]:
import pandas as pd
import numpy as np
import pickle
import os
import matplotlib.pyplot as plt
import pyspark.sql.functions as F
import pyspark.sql.types as T
import pickle
import re
import pandas as pd
from scipy import sparse, io
import matplotlib.pylab as plt
import numpy as np
from glob import glob
from pyspark.sql import SparkSession
from tqdm import tqdm
from collections import defaultdict, Counter
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

RAW_TWEETS_PATH = "/data/dnc2020/raw_tweets/"
PARQUET_TWEET_PATH = '/data/navid/processed_tweets'
PARQUET_USER_PATH = "/data/navid/processed_users"
PARQUET_RTNET_PATH = "/data/kenny/processed_rt_network"

In [42]:
## We use spark to analyze the data efficiently. You may opt for a different format
## Also note we here assume a machine with a reasonably large number of cores (16) 
## and RAM (at least 32 GB). Adjust below accordingly
spark = (
    SparkSession
    .builder
    .master("local[{}]".format(16))
    .config("spark.driver.memory", "{}g".format(50))
    .config("spark.driver.maxResultSize", f"{10}g")
    .getOrCreate()
)


# 1. Move data from raw to processed

Generate processed tweets, users, RT graph (easier loading/analysis)

In [43]:
from util import run_data_prep, create_retweet_graph

In [44]:
# Processed tweet format
if not os.path.exists(PARQUET_TWEET_PATH):
    run_data_prep(stream_path_template=os.path.join(RAW_TWEETS_PATH,"{date}*/*.gz"),
                         output_path_template=os.path.join(PARQUET_TWEET_PATH,"processed_tweets_{date}/"),
                         begin_date="2020-01-01",
                         end_date="2020-12-31",
                         batch_size=100000,
                         mode='tweet'
    )
else:
    print(f"path {PARQUET_TWEET_PATH} already exists, assuming parquet has been generated already")
    


path /data/navid/processed_tweets already exists, assuming parquet has been generated already


In [45]:
# Processed user format
if not os.path.exists(PARQUET_USER_PATH):
    run_data_prep(stream_path_template=os.path.join(RAW_TWEETS_PATH,"{date}*/*.gz"),
                         output_path_template=os.path.join(PARQUET_TWEET_PATH,"processed_users_{date}/"),
                         begin_date="2020-01-01",
                         end_date="2020-12-31",
                         batch_size=100000,
                         mode='user'
    )
else:
    print(f"path {PARQUET_USER_PATH} already exists, assuming parquet has been generated already")

path /data/navid/processed_users already exists, assuming parquet has been generated already


In [46]:
# Processed retweet graph 
if not os.path.exists(PARQUET_RTNET_PATH):
    spark = (
            SparkSession
            .builder
            .master("local[{}]".format(16))
            .config("spark.driver.memory", "{}g".format(50))
            .config("spark.driver.maxResultSize", f"{10}g")
            .getOrCreate()
        )

    print(f"Creating RT graph at {PARQUET_RTNET_PATH}")
    create_retweet_graph(spark,
                         output_path=PARQUET_RTNET_PATH, 
                         all_tweets_path=PARQUET_TWEET_PATH)
    spark.stop()
else:
    print(f"path {PARQUET_RTNET_PATH} already exists, assuming parquet has been generated already")
    

path /data/kenny/processed_rt_network already exists, assuming parquet has been generated already


# 2. Read in data and get some basic numbers for paper

In [7]:
all_tweets_df = spark.read.parquet(os.path.join(PARQUET_TWEET_PATH, '*', '*'))

                                                                                

In [8]:
n_tweets = all_tweets_df.count()
n_unique_users = all_tweets_df.select(F.countDistinct("uid")).collect()
non_rt_cnt = all_tweets_df.filter(F.col("rt_id").isNull()).count()

f"N total tweets: {n_tweets}  N unique users: {n_unique_users} N non-retweets: {non_rt_cnt}"

                                                                                

'N total tweets: 589729860  N unique users: [Row(count(DISTINCT uid)=20928178)] N non-retweets: 169614594'

## Generate Data for Figure 1

In [None]:
#### Data for Figure 1
from datetime import date,timedelta

def daterange(start_date, end_date):
    for n in range(int((end_date - start_date).days)):
        yield start_date + timedelta(n)
        
start_date = date(2020, 1, 1)
end_date = date(2020, 12, 31)
dat = []
for single_date in daterange(start_date, end_date):
    
    datestr= single_date.strftime("%Y-%m-%d")
    filpath = os.path.join(PARQUET_TWEET_PATH, 
                                        f'processed_tweets_{datestr}', 
                                        '*')
    g = glob.glob(filpath)
    #print(datestr,filpath,len(g))
    if not len(g):
        dat.append((datestr,0))
    else:
        p = spark.read.parquet(filpath)
        z = p.withColumn("date",F.from_unixtime(F.col("created_at"),"yyyy-MM-dd")).groupby("date").count().collect()
        dat += [(x[0], x[1]) for x in z]
pd.DataFrame(dat, columns=['date','count']).to_csv("data/tweet_counts.csv",index=False)

## Compute fraction of data from top retweets

In [14]:
all_tweets_df = spark.read.parquet(os.path.join(PARQUET_TWEET_PATH, '*', '*'))

rt_counts = (all_tweets_df.filter(F.col("rt_id").isNotNull())
             .groupby("rt_id")
             .agg(F.count(F.lit(1)).alias('rt_times'))
             .filter(F.col("rt_times") >= 10000)
            )
rt_pd = rt_counts.toPandas()
print("N tweets RT'd >10K times: ", len(rt_pd))
print("% of all tweets: ",rt_pd.rt_times.sum()/589729860)



N tweets RT'd >10K times:  4355
% of all tweets:  0.14926089548865645


                                                                                

### Basic Stats on RT network

In [17]:
rtd_users = spark.read.parquet(PARQUET_RTNET_PATH)
rtd_users.show(5,False)

+-------------------+-------------------+--------+
|uid                |rt_uid             |rt_times|
+-------------------+-------------------+--------+
|350383251          |1122623265673891840|17      |
|14940699           |1177630382084124672|1       |
|243795934          |1177630382084124672|1       |
|2675291408         |1134899032348712960|12      |
|1173361710477934598|1159600736021504006|13      |
+-------------------+-------------------+--------+
only showing top 5 rows



In [19]:
rtd_users_cnt = rtd_users.groupby("uid").count()
print("N edges in RT net: ", rtd_users.count())
print("N users in RT net: ", rtd_users_cnt.count())
print("Percent of all users sending >=1 RT:", 14244949/20928178)
print("Total N rts", rtd_users.agg({'rt_times':"sum"}).collect())
print("% of all tweets that are RTs: ", 431081836/589729860)

N edges in RT net:  166021415


                                                                                

N users in RT net:  14244949
Percent of all users sending >=1 RT: 0.6806588227603951




Total N rts [Row(sum(rt_times)=431081836)]
% of all tweets that are RTs:  0.7309818702414017


                                                                                

### Write out all users who have RT'd at least once and have a description

In [None]:
all_users_df = (spark.read.parquet(os.path.join(PARQUET_USER_PATH, '*', '*'))[
                                F.col("description").isNotNull()].
                                            dropDuplicates(subset=['id','description'])
               )
print("N users w/ description: ", all_users_df.select(F.countDistinct('id')).collect())
print("% with description:", 15953966/20928178)

In [None]:
rtd_user_descript = rtd_users_cnt.join(all_users_df, rtd_users_cnt.uid == all_users_df.id)
rtd_user_descript.write.parquet("/data/dnc2020/user_descript", mode="overwrite")

# 3 Create Trimmed Retweet Network

In [None]:
USER_THRESHOLD = 12
RTD_USER_THRESHOLD = 500

In [None]:
rt_net = spark.read.parquet(PARQUET_RTNET_PATH)
rt_net.show(2,False)

+---------+-------------------+--------+
|uid      |rt_uid             |rt_times|
+---------+-------------------+--------+
|350383251|1122623265673891840|17      |
|14940699 |1177630382084124672|1       |
+---------+-------------------+--------+
only showing top 2 rows



In [None]:
a = rt_net.agg(F.countDistinct(F.col("uid"))).collect()
b = rt_net.agg(F.countDistinct(F.col("rt_uid"))).collect()
c = rt_net.count()
d = rt_net.agg({"rt_times" : "sum"}).collect()

print(f"""Prefiltering:

Users: {a}
RT'd accounts: {b}
Total Links: {c}
Total RTs: {d}
""")

                                                                                

Prefiltering:

Users: [Row(count(uid)=14244949)]
RT'd accounts: [Row(count(rt_uid)=2308610)]
Total Links: 166021415
Total RTs: [Row(sum(rt_times)=431081836)]



In [None]:
user_counts = rt_net.groupby("uid").count().filter(F.col("count") >= USER_THRESHOLD)
rtd_counts = rt_net.groupby("rt_uid").count().filter(F.col("count") >= RTD_USER_THRESHOLD)

rt_net = rt_net.join(user_counts.select("uid"), ["uid"])
rt_net = rt_net.join(rtd_counts.select("rt_uid"), ["rt_uid"])

In [None]:
rt_net.write.parquet("data/trimmed_rtnet", mode="overwrite")

                                                                                

In [None]:
trimmed_net = spark.read.parquet("data/trimmed_rtnet")

In [None]:
a = trimmed_net.agg(F.countDistinct(F.col("uid"))).collect()
b = trimmed_net.agg(F.countDistinct(F.col("rt_uid"))).collect()
c = trimmed_net.count()
d = trimmed_net.agg({"rt_times" : "sum"}).collect()

print(f"""Post filtering:

Users: {a}
RT'd accounts: {b}
Total Links: {c}
Total RTs: {d}
""")

                                                                                

Post filtering:

Users: [Row(count(uid)=1791912)]
RT'd accounts: [Row(count(rt_uid)=25395)]
Total Links: 111840107
Total RTs: [Row(sum(rt_times)=325107353)]

