In [1]:
import os
import re
import json
import pickle
import numpy as np
import pandas as pd
import datetime as dt
import multiprocessing as mp
from tqdm import tqdm
from multiprocessing import Pool
from datetime import timezone
from datetime import timedelta
from pprint import pprint

In [2]:
import config

{'data': {'dates': ['2018-03-11', '2018-03-12', '2018-03-13'],
          'phrases': ['givenchy%20death', 'givenchy%20passed%20away'],
          'time': {'end': 'Mar 13 00:00:00 -0500 2018',
                   'start': 'Mar 12 00:00:00 -0500 2018'}},
 'path': {'cwd': '/Users/lzhou/git/github/uclresearchanalysis/data/givenchy',
          'friends_dictionary': '/Users/lzhou/git/github/uclresearchanalysis/data/friends_dictionary.dat',
          'pickle': '/Users/lzhou/git/github/uclresearchanalysis/data/givenchy/pickle',
          'result': '/Users/lzhou/git/github/uclresearchanalysis/data/givenchy/result',
          'twitter': '/Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter'}}


In [3]:
from config import load_us_city_state_files
city_to_state_dict, abbrev_to_state_dict, state_to_state_dict = load_us_city_state_files()

Load city and state dictionaries from dat files
loaded 102 states abbrev, loaded 51 states, loaded 2361 cities


In [4]:
file_input_path = config.settings['path']['twitter']
file_output_path = config.settings['path']['pickle']
dates = config.settings['data']['dates']
search_phrases = config.settings['data']['phrases']
start_time = config.settings['data']['time']['start']
end_time = config.settings['data']['time']['end']

In [5]:
def find_unique_tweets_in_project():
    file_path_dict = {date:['{}/{}_{}.json'.format(file_input_path, x, date) for x in search_phrases] for date in dates}
    all_tweets_collected = []
    for date, file_path_list in file_path_dict.items():
        tweet_counter_date = 0
        print('Processing files for date: {}'.format(date))
        for file_path in file_path_list:
            tweet_counter_file = 0
            if (os.path.isfile(file_path)):
                with open(file_path, 'r') as file:
                    for line in file.readlines():
                        all_tweets_collected.append(json.loads(line))
                        tweet_counter_file = tweet_counter_file + 1
            tweet_counter_date = tweet_counter_date + tweet_counter_file
            print('Finished processing file: {}'.format(file_path))
            print('Found {} tweets'.format(tweet_counter_file))
        print('In total, found {} tweets on date {}'.format(tweet_counter_date, date))
    all_unique_tweets = list({each['id']:each for each in all_tweets_collected}.values())
    print('In total, found {} unique tweets'.format(len(all_unique_tweets)))
    return all_unique_tweets

In [6]:
def convert_utc_to_est(tweet_created_at):
    datetime_object = dt.datetime.strptime(tweet_created_at, '%a %b %d %H:%M:%S %z %Y')
    return datetime_object.replace(tzinfo=timezone.utc).astimezone(tz=timezone(-timedelta(hours=5)))

def is_retweet(tweet):
    if (tweet['text'].split()[0] == 'RT'):
        return tweet['text'].split()[1][1:-1]
    else:
        return None

def is_reply(tweet):
    return tweet['in_reply_to_user_id_str'] != None

def get_source(tweet):
    if (tweet['text'].split()[0] == 'RT'):
        return 'RT'
    if (tweet['in_reply_to_user_id_str'] != None):
        return 'RP'
    return ''

def get_source_user(tweet):
    source = get_source(tweet)
    if (source == 'RT'):
        return tweet['text'].split()[1][1:-1]
    return ''

def get_source_user_id(tweet):
    source = get_source(tweet)
    if (source == 'RP'):
        return str(tweet['in_reply_to_user_id'])
    return ''

def get_user_mentions(tweet):
    mentions = []
    for mention in tweet['entities']['user_mentions']:
        mentions.append(mention['screen_name'])
    return mentions

def populate_tweet_df(tweets):
    df = pd.DataFrame()
    df['user'] = list(map(lambda tweet: tweet['user']['screen_name'], tweets))
    df['user_id'] = list(map(lambda tweet: tweet['user']['id_str'], tweets))
    df['created_at'] = list(map(lambda tweet: convert_utc_to_est(tweet['created_at']), tweets))
    df['source_type'] = list(map(lambda tweet: get_source(tweet), tweets))
    df['source_user'] = list(map(lambda tweet: get_source_user(tweet), tweets))
    df['source_user_id'] = list(map(lambda tweet: get_source_user_id(tweet), tweets))
    df['seed_user'] = ''
    df['seed_user_id'] = ''
    df['followers_count'] = list(map(lambda tweet: tweet['user']['followers_count'], tweets))
    df['friends_count'] = list(map(lambda tweet: tweet['user']['friends_count'], tweets))
    df['text'] = list(map(lambda tweet: tweet['text'], tweets))
    df['location'] = list(map(lambda tweet: tweet['user']['location'], tweets))
    df['country_code'] = list(map(lambda tweet: tweet['place']['country_code'] if tweet['place'] != None else '', tweets))
    df['long'] = list(map(lambda tweet: tweet['coordinates']['coordinates'][0] if tweet['coordinates'] != None else 'NaN', tweets))
    df['latt'] = list(map(lambda tweet: tweet['coordinates']['coordinates'][1] if tweet['coordinates'] != None else 'NaN', tweets))
    df['state'] = ''
    df['mentions'] = list(map(lambda tweet: get_user_mentions(tweet), tweets))
    return df

In [7]:
def find_word(w):
    return re.compile(r'\b({0})\b'.format(w), flags=re.IGNORECASE)
def find_word_case(w):
    return re.compile(r'\b({0})\b'.format(w))

abbrev_to_state_regex_dict = {key:find_word_case(key) for key in abbrev_to_state_dict.keys()}
state_to_state_regex_dict = {key:find_word(key) for key in state_to_state_dict.keys()}
city_to_state_regex_dict = {key:find_word(key) for key in city_to_state_dict.keys()}

def find_state_name(tweet_location):
    # look for the state ID in
    # the location string
    for key, regex in abbrev_to_state_regex_dict.items():
        if regex.search(tweet_location):
            return abbrev_to_state_dict[key]
    # otherwise look for states
    for key, regex in state_to_state_regex_dict.items():
        if regex.search(tweet_location):
            return state_to_state_dict[key]
    # otherwise look for cities
    for key, regex in city_to_state_regex_dict.items():
        if regex.search(tweet_location):
            return city_to_state_dict[key]
    # otherwise return empty string
    return ''

def parallelize_dataframe(df, func):
    num_cores = mp.cpu_count()
    num_partitions = num_cores
    print('Found {} CPUs, will split tasks into {} partitions'.format(num_cores, num_partitions))
    df_split = np.array_split(df, num_partitions)
    pool = Pool(num_cores)
    df = pd.concat(pool.map(func, df_split))
    pool.close()
    pool.join()
    return df

def adding_state(data):
    for i in data.index:
        name = find_state_name(data.location[i])
        data.loc[i, 'state'] = name
    return data

In [8]:
def order_by_dates_and_reindex(df):
    # The bomb attack happened at around 7:20 am
    # We are interested in tweets after 7:00 am
    timeline_start = dt.datetime.strptime(start_time, '%b %d %H:%M:%S %z %Y')
    timeline_end = dt.datetime.strptime(end_time, '%b %d %H:%M:%S %z %Y')
    df = df[(df.created_at >= timeline_start) & (df.created_at <= timeline_end)]
    df = df.sort_values(by=['created_at'])
    df = df.set_index(np.arange(len(df.index)))
    return df

In [10]:
all_unique_tweets = find_unique_tweets_in_project()
df = populate_tweet_df(all_unique_tweets)
df = parallelize_dataframe(df, adding_state)
df = order_by_dates_and_reindex(df)
user_list = []
index = 0
with tqdm(total = len(df), unit='Tweets', unit_scale=True, unit_divisor=1024) as pbar:
    while index < len(df):
        mentions = df.loc[index, 'mentions']
        for mention in mentions:
            df.loc[(df['user'] == mention) & (df.index.values > index), 'source_type'] = 'AT'
            df.loc[(df['user'] == mention) & (df.index.values > index), 'source_user'] = mention
        user_id = df.loc[index, 'user_id']
        if user_id in user_list:
            df.loc[index, 'source_type'] = 'Duplicated'
        user_list.append(user_id)
        index += 1
        pbar.update()
df = order_by_dates_and_reindex(df)  
print('Found duplicate users: {}'.format(len(df[df.source_type == 'Duplicated'])))
df = df[df.source_type != 'Duplicated']
df = order_by_dates_and_reindex(df)
print('{} users left after removing duplicated users'.format(len(df)))
with open(file_output_path + '/user_unique.dat', 'wb') as data_file:
    pickle.dump(df, data_file)

Processing files for date: 2018-03-11
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter/givenchy%20death_2018-03-11.json
Found 0 tweets
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter/givenchy%20passed%20away_2018-03-11.json
Found 0 tweets
In total, found 0 tweets on date 2018-03-11
Processing files for date: 2018-03-12
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter/givenchy%20death_2018-03-12.json
Found 216 tweets
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter/givenchy%20passed%20away_2018-03-12.json
Found 4263 tweets
In total, found 4479 tweets on date 2018-03-12
Processing files for date: 2018-03-13
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/data/givenchy/twitter/givenchy%20death_2018-03-13.json
Found 68 tweets
Finished processing file: /Users/lzhou/git/github/uclresearchanalysis/dat

100%|██████████| 4.80k/4.80k [00:46<00:00, 106Tweets/s]


Found duplicate users: 158
4754 users left after removing duplicated users


In [11]:
df.head()

Unnamed: 0,user,user_id,created_at,source_type,source_user,source_user_id,seed_user,seed_user_id,followers_count,friends_count,text,location,country_code,long,latt,state,mentions
0,davelackie,100766356,2018-03-12 08:20:58-05:00,,,,,,143119,4643,So sad to hear that fashion designer Hubert de...,Canada,,,,,[]
1,alexanderskhan,3186545203,2018-03-12 08:21:07-05:00,RT,davelackie,,,,753,3428,RT @davelackie: So sad to hear that fashion de...,"New Orleans, LA",,,,Louisiana,[davelackie]
2,consiglierela,4134992843,2018-03-12 08:21:12-05:00,RT,davelackie,,,,2161,396,RT @davelackie: So sad to hear that fashion de...,⭐,,,,,[davelackie]
3,ImpactPrincess,1955472014,2018-03-12 08:21:36-05:00,RT,davelackie,,,,242,642,RT @davelackie: So sad to hear that fashion de...,"David Jones, Probably",,,,,[davelackie]
4,Damek0Masca,406301175,2018-03-12 08:22:06-05:00,,,,,,116,239,Today we mourn the death of a #fashion legend;...,New York,,,,New York,[]
