<h1>Combining Interventions to reduce the spread of misinformation online: Data</h1>

In [1]:
%%capture
!pip install fastparquet
!pip install pandarallel

In [2]:
#Set up environment
from tqdm.notebook import trange, tqdm
import pickle
import src.utils as srcu
import src.segmentation as srcseg
import pandas as pd
from pandarallel import pandarallel
import numpy as np
#Set up parallel processing
pandarallel.initialize(nb_workers=8,verbose=True,progress_bar=True)
tqdm.pandas()

#Make sure things reload
%load_ext autoreload
%autoreload 2

#Set up directories
root = './shared_data'
srcu.create_output_directories(root)

<h2>Pull data and segment events</h2>

<h3>Pull</h3>

In [3]:
#Gather list of incidents 
import src.database as sdb
engine = sdb.get_engine('/home/joebak/venus_cred.txt')
incidents = sdb.list_incidents(engine)

In [4]:
#We need something without spaces and / to call each incident. 
fix_name = lambda name: name.replace(' ','_').replace('/','_')
incidents = incidents[incidents['incident']!='Dominion1']

#Dominion is high volume, very noisy, has daily patterns, is prolonged
#and doesn't conform to our notion of "events". 
incidents['incident_name'] = incidents['incident'].apply(fix_name)

In [5]:
incidents.head()

Unnamed: 0,incident,count,incident_name
0,bad_statistics_1,185193,bad_statistics_1
1,bad statistics 3,60909,bad_statistics_3
2,bad statistics 4,78951,bad_statistics_4
3,bad statistics 5,101674,bad_statistics_5
4,bad statistics 6,4736,bad_statistics_6


In [6]:
incidents.to_parquet(root + '/data/incidents.parquet', compression=None)


<h3>Aggregate</h3>

In [7]:
incidents = pd.read_parquet(root + '/data/incidents.parquet')

In [8]:
len(incidents)

152

In [None]:
import src.database as sdb
engine = sdb.get_engine('/home/joebak/venus_cred.txt')
agg_save = lambda row: sdb.aggregate_and_save(row,engine,root=root,to_share=True,keep=False)
incidents.T.parallel_apply(agg_save)

<h3>Repeat Offenders</h3>`

In [10]:
removed = pickle.load(open('.' + '/data/removed.p','rb'))


In [11]:
import src.database as sdb
engine = sdb.get_engine()

query_ro = '''SELECT user_screen_name,incident, user_followers_count,row_number() over (partition by user_screen_name order by created_at) 
            FROM (SELECT DISTINCT ON (user_screen_name, incident) user_screen_name, user_followers_count,
            incident, created_at, row_number() over (partition by user_screen_name order by created_at)
            FROM public.all_ticket_tweets WHERE user_followers_count > 10000 AND incident IS NOT NULL) AS nested'''

query_v = '''SELECT user_screen_name,incident, user_followers_count,row_number() over (partition by user_screen_name order by created_at) 
            FROM (SELECT DISTINCT ON (user_screen_name, incident) user_screen_name, user_followers_count,
            incident, created_at, row_number() over (partition by user_screen_name order by created_at)
            FROM public.all_ticket_tweets 
            WHERE incident IS NOT NULL AND user_verified) AS nested'''

query_all = "SELECT * FROM all_ticket_tweets LIMIT 10;"
ro_all =pd.read_sql(query_ro, con=engine)
ro_verified =pd.read_sql(query_v, con=engine)


In [12]:
#We do not provide user names, so this data is not directly shared. 
ro_all.to_csv('.' + '/data/ro_all.csv')
ro_verified.to_csv('.' + '/data/ro_verified.csv')

In [13]:
ro_all = pd.read_csv('.'+'/data/ro_all.csv')
ro_verified=pd.read_csv('.' + '/data/ro_verified.csv')

In [14]:
def get_repeat_offenders_dict(df, incidents, follower_thresh=1,strikes=3):
    repeat_offenders = {}
    for incident in incidents:
        temp = df[df['incident']==incident]
        temp = temp[temp['row_number'] > strikes]
        temp = temp[temp['user_followers_count'] > follower_thresh]
        repeat_offenders[incident] = temp['user_screen_name'].unique()
    return repeat_offenders

ro_dict_10k = get_repeat_offenders_dict(ro_all, incidents['incident'],follower_thresh=10000)
ro_dict_50k = get_repeat_offenders_dict(ro_all, incidents['incident'],follower_thresh=50000)
ro_dict_100k = get_repeat_offenders_dict(ro_all,incidents['incident'], follower_thresh=100000)
ro_dict_500k = get_repeat_offenders_dict(ro_all, incidents['incident'],follower_thresh=500000)
ro_dict_v = get_repeat_offenders_dict(ro_verified, incidents['incident'])

In [15]:
ro_dict_modest = {}
for item in ro_dict_100k.keys():
    temp = np.unique(np.hstack([ro_dict_100k[item], 
              ro_dict_v[item],
              removed])).tolist()
    ro_dict_modest[item] = temp

ro_dict_aggressive = {}
for item in ro_dict_100k.keys():
    temp = np.unique(np.hstack([ro_dict_50k[item], 
              ro_dict_v[item],
              removed])).tolist()
    ro_dict_aggressive[item] = temp


In [16]:
import src.database as sdb
import src.segmentation as srcseg
engine = sdb.get_engine('/home/joebak/venus_cred.txt')

agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_10k,root=root,keep=False,
                                              floc='/data/timeseries/10K/',to_share=True)
row = incidents.iloc[45]
agg_save(row)

True

In [17]:
import src.database as sdb
engine = sdb.get_engine('/home/joebak/venus_cred.txt')
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_10k,root=root,keep=False,
                                              floc='/data/timeseries/10K/',to_share=True)
_ = incidents.T.parallel_apply(agg_save)

In [18]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_50k,root=root,
                                              floc='/data/timeseries/50K/',to_share=True,keep=False)
_ = incidents.T.parallel_apply(agg_save)

In [20]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,root=root,
                                              removed=ro_dict_100k,
                                              floc='/data/timeseries/100K/',keep=False,
                                              to_share=True)
incidents.T.parallel_apply(agg_save)

index
0      True
1      True
2      True
3      True
4      True
       ... 
148    True
149    True
150    True
151    True
152    True
Length: 152, dtype: bool

In [21]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_500k,root=root,
                                              floc='/data/timeseries/500K/',keep=False,
                                             to_share=True)
_ = incidents.T.parallel_apply(agg_save)

In [22]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_v,root=root,
                                              floc='/data/timeseries/Verified/',
                                              keep=False,to_share=True)
_ = incidents.T.parallel_apply(agg_save)

In [23]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=removed,root=root,
                                              floc='/data/timeseries/currently/',
                                              keep=False,to_share=True)
_ = incidents.T.parallel_apply(agg_save)

In [24]:
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_modest,root=root,
                                              floc='/data/timeseries/modest/',
                                              keep=False,to_share=True)
_ = incidents.T.parallel_apply(agg_save)
agg_save = lambda row: sdb.aggregate_and_save(row,engine,removed=ro_dict_aggressive,root=root,
                                              floc='/data/timeseries/aggressive/',
                                              keep=False,to_share=True)
_ = incidents.T.parallel_apply(agg_save)

In [25]:
#Get totals banned by policy
ro_dicts = {'10K':ro_dict_10k, 
             '50K':ro_dict_50k,
              '100K':ro_dict_100k, 
              '500K':ro_dict_500k, 
                'Verified':ro_dict_v,
               'Modest':ro_dict_modest, 
               'Aggressive':ro_dict_aggressive}
ban_df = pd.DataFrame()
for policy in ['10K','50K', '100K','500K', 'Verified','Modest','Aggressive']:
    N_banned = np.unique(np.hstack([ro_dicts[policy][item] for item in ro_dicts['10K'].keys()])).size
    ban_df =ban_df.append({'Total removed':N_banned, 
                   'Policy':policy},ignore_index=True)
ban_df=ban_df.append({'Total removed':np.unique(removed).size, 
                'Policy':'Currently'},ignore_index=True)

In [26]:
ban_df.to_parquet(root + '/data/ban_df_counts.parquet',compression=None)

In [27]:
incidents = pd.read_parquet('./data/incidents.parquet')

In [28]:
def get_incident_count(incident,engine):
    """Return user_followers_count, user_screen_name, created_at, and user_verified 
     for and incident.
    Keyword arguments:
    incident -- the name of an incident, as identified in our database
    engine -- postgres engine created with src.database.get_engine
    """
    query = "SELECT  count(*) FROM all_ticket_tweets WHERE incident=(%(incident)s)"
    incident_df = pd.read_sql(query, params={'incident':incident},con=engine)
    return incident_df

In [29]:
totals = []
get_incident_count(incidents['incident'][0], engine)

Unnamed: 0,count
0,185193
