In [1]:
from pyspark.sql import SparkSession
import json
from datetime import datetime, date
import unidecode as uni
import pandas as pd
from functools import reduce

In [2]:
# Create spark session
spark = SparkSession.builder.getOrCreate()

21/10/14 22:35:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
# Create rdd from twitter json
tweetsRDD = spark.sparkContext.textFile('../data/congress-115-116-tweets.jsonl').map(json.loads)

In [4]:
# Create return_date function
def return_date(created_at):
    return datetime.strptime(created_at, '%a %b %d %H:%M:%S +0000 %Y').date()

# Perform some initial filtering
tweetsRDD = tweetsRDD.filter(lambda tweet: return_date(tweet['created_at']) >= date(2017, 1, 3))
tweetsRDD = tweetsRDD.filter(lambda tweet: 'RT @' not in tweet['full_text'])

In [5]:
# Get twitter accounts and users 
twitterAccountsRDD = (tweetsRDD
                      .map(lambda tweet: (tweet['user']['screen_name'], tweet['user']['name']))
                      .reduceByKey(lambda a, b: a)
                     )

# Convert to dataframe
twitter_accounts = (twitterAccountsRDD
                    .map(lambda kv: (kv[0], uni.unidecode(kv[1])))
                    .toDF(['twitter_account', 'twitter_user'])
                    .toPandas()
                   )

                                                                                

In [6]:
# Read in party info
party_info = pd.read_csv('../data/congress-wikiscrape.csv')

# Create empty dataframe
twitter_df = pd.DataFrame(columns = ['twitter_account', 'twitter_user', 'full_name', 'party']) 

# Loop through twitter_accounts rows
for twitter_index, twitter_row in twitter_accounts.iterrows():

    # Store twitter_account, twitter_user
    twitter_account = twitter_row['twitter_account']
    twitter_user = twitter_row['twitter_user']

    # Reset match counters
    first_name_matches = 0
    last_name_matches = 0
    perfect_match = False

    # Loop through party_info rows
    for party_info_index, party_info_row in party_info.iterrows():

        # Store full_name, first_name, last_name, party
        full_name = party_info_row['full_name']
        first_name = party_info_row['first_name']
        last_name = party_info_row['last_name']
        party = party_info_row['party']
        
        # Append to dataframe if both first_name and last_name in twitter_user (i.e. perfect match)
        if all([name in twitter_user.lower() for name in [first_name, last_name]]):
            
            twitter_df = twitter_df.append({'twitter_account': twitter_account, 
                                            'twitter_user': twitter_user,
                                            'full_name': full_name, 
                                            'party': party}, ignore_index = True)
            
            perfect_match = True           
            break
            
        # Store first name match if first_name in twitter_user
        elif first_name in twitter_user.lower():
            
            first_name_match = {'twitter_account': twitter_account,
                                'twitter_user': twitter_user,
                                'full_name': full_name, 
                                'party': party}
            
            first_name_matches += 1
            
        # Store last name match if last_name in twitter_user
        elif last_name in twitter_user.lower():
            
            last_name_match = {'twitter_account': twitter_account,
                               'twitter_user': twitter_user,
                               'full_name': full_name,
                               'party': party}
            
            last_name_matches += 1
            
    # Run if perfect match doesn't exist after looping through all party_info rows
    if not perfect_match:
        
        # Append first_name_match to dataframe if only 1 first_name match exists
        if first_name_matches == 1:
            
            twitter_df = twitter_df.append(first_name_match, ignore_index = True)
            
        # Append last_name_match to dataframe if only 1 last_name match exists
        elif last_name_matches == 1:
            
            twitter_df = twitter_df.append(last_name_match, ignore_index = True)
            
        # Append incomplete info to dataframe if multiple or no matches exist
        else:
            
            twitter_df = twitter_df.append({'twitter_account': twitter_account,
                                            'twitter_user': twitter_user,
                                            'full_name': '', 
                                            'party': ''}, ignore_index = True)

# Write dataframe to csv
twitter_df.to_csv("../data/twitter-account-with-party.csv", index = False)

In [7]:
# Matched 986 of 1005 twitter accounts with party
display(twitter_df['party'].isin(['D','R']).value_counts())

True     986
False     19
Name: party, dtype: int64

In [8]:
# Read in manually updated csv
twitter_df = pd.read_csv('../data/twitter-account-with-party-final.csv')

In [9]:
# Get follower counts
followerCountsRDD = (tweetsRDD
                     .map(lambda tweet: (tweet['user']['screen_name'], tweet['user']['followers_count']))
                     .reduceByKey(max)
                     )

# Convert to dataframe
follower_counts = (followerCountsRDD
                   .map(lambda kv: (kv[0], kv[1]))
                   .toDF(['twitter_account', 'followers'])
                   .toPandas()
                  )

                                                                                

In [10]:
# Get tweet counts
tweetCountsRDD = (tweetsRDD
                  .map(lambda tweet: (tweet['user']['screen_name'], 1))
                  .reduceByKey(lambda a, b: a + b)
                 )

# Convert to dataframe
tweet_counts = (tweetCountsRDD
                .map(lambda kv: (kv[0], kv[1]))
                .toDF(['twitter_account', 'tweet_count'])
                .toPandas()
               )

                                                                                

In [11]:
# Get retweet counts
retweetCountsRDD = (tweetsRDD
                    .map(lambda tweet: (tweet['user']['screen_name'], tweet['retweet_count']))
                    .reduceByKey(lambda a, b: a + b)
                   )

# Convert to dataframe
retweet_counts = (retweetCountsRDD
                  .map(lambda kv: (kv[0], kv[1]))
                  .toDF(['twitter_account', 'retweets_total'])
                  .toPandas()
                 )

                                                                                

In [12]:
# Get favorite counts
favoriteCountsRDD = (tweetsRDD
                     .map(lambda tweet: (tweet['user']['screen_name'], tweet['favorite_count']))
                     .reduceByKey(lambda a, b: a + b)
                    )

# Convert to dataframe
favorite_counts = (favoriteCountsRDD
                   .map(lambda kv: (kv[0], kv[1]))
                   .toDF(['twitter_account', 'favorites_total'])
                   .toPandas()
                  )

                                                                                

In [13]:
# Combine dataframes
data_frames = [twitter_df, follower_counts, tweet_counts, retweet_counts, favorite_counts]
summary_df = reduce(lambda left, right: pd.merge(left, right, on = ['twitter_account']), data_frames)

# Calculate per tweet statistics
summary_df['retweets_per_tweet'] = summary_df['retweets_total'] / summary_df['tweet_count']
summary_df['favorites_per_tweet'] = summary_df['favorites_total'] / summary_df['tweet_count']

# Write to csv
summary_df.to_csv("../data/twitter-accounts-summary.csv", index = False)