In [81]:
import pandas as pd
import numpy as np

import pyspark.sql.functions as f
from pyspark.sql.types import *
from pyspark.sql import Window
from functools import reduce
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf


In [82]:
spark.sparkContext.addPyFile("../.local/lib/python3.5/site-packages/mwcomments-0.2.0-py3.5.egg")
spark.sparkContext.addPyFile("../.local/lib/python3.5/site-packages/sortedcontainers-2.1.0-py3.5.egg")
spark.sparkContext.addPyFile("../.local/lib/python3.5/site-packages/python_dateutil-2.8.0-py3.5.egg")
spark.sparkContext.addPyFile("./spark_functions.py")

In [83]:
import mwcomments

In [84]:
cutoffs = pd.read_csv("ores_bias_data/ores_rcfilters_cutoffs.csv")

In [85]:
# compare 14 days before and after the cutoff
# unless there's another cutoff less than 28 days away, in which case split the difference
by_wiki = cutoffs.groupby('wiki_db')

In [86]:
cutoffs['date'] = pd.to_datetime(cutoffs.deploy_dt)
cutoffs = cutoffs.drop("deploy_gap",1)
cutoffs = cutoffs.drop("deploy_dt",1)
cutoffs = cutoffs.drop("commit_dt",1)


In [87]:
def set_cutoff_period(df):
    df = df.sort_values(by=['date'])
    next_cutoff = df.shift(1)
    df['time_since_last_cutoff']  = df.date - df.shift(1).date
    df['time_till_next_cutoff']  = df.shift(-1).date - df.date
    df = df.reset_index()
    return df

In [88]:
cutoffs = cutoffs.groupby("wiki_db").apply(set_cutoff_period)

cutoffs = cutoffs.drop('wiki_db',1).reset_index()
cutoffs = cutoffs.drop("level_1",1)

select =[ 'wiki_db','has_ores','has_rcfilters','has_rcfilters_watchlist','time_since_last_cutoff','time_till_next_cutoff','date']

In [89]:
# We special case wikis where some issues lead to changes and deployments that we don't want to analyze. 
# fawiki: bug leads to cutoff disabling ores for 2 days. These won't show up in any other interval, so ignore them. 
cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'fawiki') & ( (cutoffs.date == pd.to_datetime("2017-12-09 11:19:00")) | (cutoffs.date == pd.to_datetime("2017-12-11 18:56:00"))))]




In [90]:
# etwiki, frwiki, and hewiki apparently turned on rcfilters 50 days after enabling ORES. This is OK. The periods overlap.

#frwiki and ruwiki experienced a bug on the deployment of rcfilters to watchlist. So let's ignore them for those messages.   

cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'frwiki') & (cutoffs.date >= pd.to_datetime("2017-11-09 14:35:00")))]

cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'ruwiki') & (cutoffs.date >= pd.to_datetime("2017-11-20 19:22:00") ))]

In [91]:
cutoffs.loc[(cutoffs.time_since_last_cutoff<=pd.Timedelta(60,'D')) | (cutoffs.time_till_next_cutoff<=pd.Timedelta(60,'D')), select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
12,etwiki,True,False,False,NaT,50 days 05:26:00,2017-03-20 18:28:00
13,etwiki,True,True,False,50 days 05:26:00,308 days 17:24:00,2017-05-09 23:54:00
22,frwiki,False,False,False,NaT,55 days 07:02:00,2017-04-11 11:09:00
23,frwiki,True,True,False,55 days 07:02:00,156 days 20:24:00,2017-06-05 18:11:00
27,hewiki,True,False,False,NaT,29 days 10:38:00,2017-04-10 13:16:00
28,hewiki,True,True,False,29 days 10:38:00,308 days 17:24:00,2017-05-09 23:54:00
32,kowiki,False,True,True,NaT,0 days 00:00:00,2019-03-04 16:27:00
33,kowiki,True,True,True,0 days 00:00:00,NaT,2019-03-04 16:27:00
65,wikidatawiki,False,False,True,169 days 13:27:00,3 days 23:58:00,2017-10-26 13:21:00
66,wikidatawiki,True,True,True,3 days 23:58:00,28 days 05:52:00,2017-10-30 13:19:00


In [92]:
# kowiki was enabled over two commits with the same deploy time
cutoffs = cutoffs.drop(32)

In [93]:
cutoffs.loc[cutoffs.wiki_db == 'wikidatawiki',select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
63,wikidatawiki,True,False,False,NaT,321 days 08:41:00,2016-06-22 15:13:00
64,wikidatawiki,True,True,False,321 days 08:41:00,169 days 13:27:00,2017-05-09 23:54:00
65,wikidatawiki,False,False,True,169 days 13:27:00,3 days 23:58:00,2017-10-26 13:21:00
66,wikidatawiki,True,True,True,3 days 23:58:00,28 days 05:52:00,2017-10-30 13:19:00
67,wikidatawiki,False,False,True,28 days 05:52:00,0 days 00:00:00,2017-11-27 19:11:00
68,wikidatawiki,True,True,True,0 days 00:00:00,NaT,2017-11-27 19:11:00


In [94]:
# wikidatawiki had an issue on 2017-10-30 and 2017-11-27 with the move to default on watchlist so we'll ignore that cutoff

In [95]:
cutoffs = cutoffs.loc[~((cutoffs.wiki_db == 'wikidatawiki') & (cutoffs.date >= pd.to_datetime("2017-10-26 13:21:00")))]


In [96]:
cutoffs.loc[(cutoffs.time_since_last_cutoff<=pd.Timedelta(60,'D')) | (cutoffs.time_till_next_cutoff<=pd.Timedelta(60,'D')), select]

Unnamed: 0,wiki_db,has_ores,has_rcfilters,has_rcfilters_watchlist,time_since_last_cutoff,time_till_next_cutoff,date
12,etwiki,True,False,False,NaT,50 days 05:26:00,2017-03-20 18:28:00
13,etwiki,True,True,False,50 days 05:26:00,308 days 17:24:00,2017-05-09 23:54:00
22,frwiki,False,False,False,NaT,55 days 07:02:00,2017-04-11 11:09:00
23,frwiki,True,True,False,55 days 07:02:00,156 days 20:24:00,2017-06-05 18:11:00
27,hewiki,True,False,False,NaT,29 days 10:38:00,2017-04-10 13:16:00
28,hewiki,True,True,False,29 days 10:38:00,308 days 17:24:00,2017-05-09 23:54:00
33,kowiki,True,True,True,0 days 00:00:00,NaT,2019-03-04 16:27:00


In [97]:
# build a table of date intervals before and after cutoffs

In [98]:
cutoffs['period_start'] = cutoffs.date - pd.Timedelta(14,'d')
cutoffs['period_end'] = cutoffs.date + pd.Timedelta(14,'d')

In [99]:
# take a stratified sample of edits in the cutoffs
# stratify by wiki_db, is_newcomer, is_anon, is_reverted, and revert_tool
wmhist = spark.read.table("wmf.mediawiki_history")

wmhist = wmhist.filter(f.col("snapshot") == "2019-07")
# ok we're ready to fire up spark and make a stratified sample
# we only need the latest snapshot

wmhist = wmhist.filter((f.col("event_entity") == "revision"))

In [100]:
from spark_functions import build_wmhist_step1, process_reverts, broadcast_match_comment, add_revert_types

In [101]:
broadcast_match_comment(spark.sparkContext)

<function spark_functions.broadcast_match_comment.<locals>.my_match_comment>

In [102]:
wmhist = wmhist.filter(wmhist.page_namespace == 0)

In [103]:
wmhist = build_wmhist_step1(wmhist)

In [104]:
#wmhist.show()

In [105]:
reverts = process_reverts(wmhist)

In [106]:
# select only the columns we need from reverts
reverts = reverts.select(['wiki_db_l','revert_timestamp','reverted_revision_id',f.col('role_type').alias("revert_role_type"),f.col('anon_new_established').alias('reverted_anon_new_established'),'is_damage','time_to_revert','revert_comment'])


In [107]:
# for the time to revert analysis, we only want damaging ones, but for is_reverted we want all reverts
wmhist = wmhist.join(reverts, on =[wmhist.wiki_db == reverts.wiki_db_l, wmhist.revision_id == reverts.reverted_revision_id],how='left_outer')

In [108]:
#wmhist = wmhist.join(cutoffs, on=[wmhist.wiki_db==cutoffs.wiki_db_l, f.unix_timestamp(wmhist.event_timestamp) >= f.unix_timestamp(cutoffs.period_start), f.unix_timestamp(wmhist.event_timestamp) <= f.unix_timestamp(cutoffs.period_end)],how='right_outer')

In [109]:
#wmhist.show()

In [110]:
#wmhist = wmhist.withColumn("sec_to_cutoff", (f.unix_timestamp(f.col("event_timestamp")) - f.unix_timestamp(f.col("date"))) / 1000)

In [111]:
wmhist = add_revert_types(wmhist, comment_column='revert_comment')

In [112]:
wmhist = wmhist.cache()

In [113]:
cutoffs_df = spark.createDataFrame(cutoffs[['wiki_db','period_start','period_end','date']])
cutoffs_df = cutoffs_df.withColumnRenamed('wiki_db','wiki_db_l')
cutoffs_df = f.broadcast(cutoffs_df)

join_cond = (wmhist.wiki_db == cutoffs_df.wiki_db_l) & f.unix_timestamp(wmhist.event_timestamp).between(f.unix_timestamp(cutoffs_df.period_start),f.unix_timestamp(cutoffs_df.period_end))

In [117]:
wmhist = wmhist.repartition(10000)

In [122]:
wmhist = wmhist.join(cutoffs_df,on=join_cond, how='inner')

In [121]:
wmhist_joined.count()

36129911

In [35]:

cond = None
for _, cutoff in cutoffs.iterrows():
    cond_part = ((wmhist.wiki_db==cutoff.wiki_db) & (f.unix_timestamp(wmhist.event_timestamp) >= cutoff.period_start.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.period_end.timestamp()))
    
    if cond is None:
        cond = cond_part
    else:
        cond = cond | cond_part
        
#     part = wmhist.filter( (wmhist.wiki_db==cutoff.wiki_db) & (f.unix_timestamp(wmhist.event_timestamp) >= cutoff.period_start.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.period_start.timestamp()))
#     if wmhist_filtered is None:
#         wmhist_filtered = part
#     else:
#         wmhist_filtered = wmhist_filtered.union(part)

In [36]:
wmhist_filtered = wmhist_filtered.filter(cond)

In [37]:
wmhist_filtered.count()

13

In [290]:
# only keep the variables we need in the threshhold analysis and our strata variables
wmhist = wmhist.select(['wiki_db','event_timestamp','event_user_id','revision_is_identity_reverted','reverted_anon_new_established','is_damage','time_to_revert','revert_role_type','revert_tool'])

In [291]:
pre_cutoff_cond = None
for _, cutoff in cutoffs.iterrows():
    cond = ((wmhist.wiki_db == cutoff.wiki_db) & (f.unix_timestamp(wmhist.event_timestamp) >= cutoff.period_start.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.period_end.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.date.timestamp()))
    if pre_cutoff_cond is None:
        pre_cutoff_cond = cond
    else:
        pre_cutoff_cond = pre_cutoff_cond | cond


In [292]:
wmhist = wmhist.withColumn("pre_cutoff",pre_cutoff_cond)

In [293]:
#wmhist = wmhist.withColumn('pre_cutoff',wmhist.event_timestamp <= wmhist.date)

In [294]:
# ttr_before_after = wmhist.filter(f.col("revision_is_identity_reverted"))

# ttr_before_after = ttr_before_after.withColumn("strata",f.concat_ws('_',f.col("wiki_db"),f.col("date")))
# ttr_before_after_counts = ttr_before_after.groupby(f.col("strata")).count()

In [295]:
#ttr_before_after_counts.show()

In [296]:
def is_ores_cutoff(df):
    df = df.sort_values(by=['wiki_db','date'])
    df['ores_cutoff'] = (df.has_ores == True) & ( (df.has_ores.shift(1) == False) | (df.has_ores.shift(1).isnull()))
    return df
    
cutoffs = cutoffs.groupby(cutoffs.wiki_db).apply(is_ores_cutoff)

In [297]:
def is_rcfilters_cutoff(df):
    df = df.sort_values(by=['wiki_db','date'])
    df['rcfilters_cutoff'] = (df.has_rcfilters == True) & ( (df.has_rcfilters.shift(1) == False) | 
(df.has_rcfilters.shift(1).isnull()))
    return df


In [298]:
cutoffs = cutoffs.groupby('wiki_db').apply(is_rcfilters_cutoff)

In [299]:
def is_watchlist_cutoff(df):
    df = df.sort_values(by=['wiki_db','date'])
    df['watchlist_cutoff'] = (df.has_rcfilters_watchlist == True) & ( (df.has_rcfilters_watchlist.shift(1) == False) | (df.has_rcfilters_watchlist.shift(1).isnull()))
    return df


In [300]:
cutoffs = cutoffs.groupby('wiki_db').apply(is_watchlist_cutoff)

In [301]:
ores_cutoff_cond = None
rcfilters_cutoff_cond = None
rcfilters_watchlist_cutoff_cond = None
for _, cutoff in cutoffs.iterrows():
    cond = ((wmhist.wiki_db == cutoff.wiki_db) & (f.unix_timestamp(wmhist.event_timestamp) >= cutoff.period_start.timestamp()) & (f.unix_timestamp(wmhist.event_timestamp) <= cutoff.period_end.timestamp()))
            
    if ores_cutoff_cond is None:
        ores_cutoff_cond = (cond & f.lit(cutoff.ores_cutoff == True))
        rcfilters_cutoff_cond = (cond & f.lit(cutoff.rcfilters_cutoff == True))
        rcfilters_watchlist_cutoff_cond = (cond & f.lit(cutoff.watchlist_cutoff == True))
    else:
        ores_cutoff_cond = ores_cutoff_cond | (cond & f.lit(cutoff.ores_cutoff == True))
        rcfilters_cutoff_cond = rcfilters_cutoff_cond | (cond & f.lit(cutoff.rcfilters_cutoff == True))
        rcfilters_watchlist_cutoff_cond = rcfilters_watchlist_cutoff_cond | (cond & f.lit(cutoff.watchlist_cutoff == True))


In [302]:
wmhist = wmhist.withColumn("cutoff_type", f.when(ores_cutoff_cond,'has_ores').otherwise(f.when(rcfilters_cutoff_cond,'has_rcfilters').otherwise(f.when(rcfilters_watchlist_cutoff_cond,'has_rcfilters_watchlist').otherwise(None))))

In [125]:
wmhist = wmhist.withColumn("pre_cutoff",f.unix_timestamp(wmhist.event_timestamp) <= f.unix_timestamp(wmhist.date))

In [126]:
wmhist = wmhist.withColumn('strata',f.concat_ws('_',wmhist.wiki_db,wmhist.date,wmhist.pre_cutoff,wmhist.revert_tool,wmhist.reverted_anon_new_established,wmhist.revert_role_type,wmhist.revision_is_identity_reverted))

In [304]:
#wmhist = wmhist.cache()

In [305]:
wmhist = wmhist.repartition(1200,f.col("strata"))

In [137]:
wmhist_out = wmhist.select(['wiki_db','event_timestamp','page_id','page_title','user_id','user_text','event_user_isbot1','event_user_isbot2','revision_id','revision_is_identity_reverted','anon_new_established','event_user_is_newcomer','revert_tool','period_start','period_end','date','pre_cutoff'])

In [139]:
wmhist_out = wmhist_out.repartition(1)

In [141]:
wmhist_out.write.csv("/user/nathante/ores_bias_data/cutoff_revisions.csv",mode='overwrite',compression="None",header=True)

In [143]:
# get the proportion of observations in each strata
strata_count = wmhist.groupby(f.col('strata')).count()
#all_count = wmhist.count()


In [128]:
strata_count = strata_count.collect()

In [144]:
strata_count = strata_count.withColumn("fraction", f.when( 5000 < f.col("count"),5000/f.col("count")).otherwise(1))
strata_count = strata_count.withColumn("weight",1/strata_count.fraction)


In [145]:
samp_design = strata_count.collect()

In [150]:
#fractions = samp_design.locj['strata','fraction']
fractions = {r.strata:r.fraction for r in samp_design}

In [156]:
sample = wmhist.sampleBy("strata",fractions=fractions)

In [157]:
sample = sample.join(strata_count,on='strata')

In [159]:
sample = sample.select(['wiki_db','event_timestamp','page_id','page_title','user_id','user_text','event_user_isbot1','event_user_isbot2','revision_id','revision_is_identity_reverted','anon_new_established','event_user_is_newcomer','revert_tool','period_start','period_end','date','pre_cutoff','fraction','weight'])

In [161]:
sample = sample.repartition(1)

In [162]:
sample.write.csv("/user/nathante/ores_bias_data/cutoff_revisions_sample.csv",mode='overwrite',compression='None',header=True)

In [None]:
strata_count.write.csv("/user/nathante/ores_bias_data/threshhold_strata_counts.csv",mode='overwrite',compression="None",header=True)

In [None]:
conf = spark.sparkContext.getConf()

In [None]:
conf.getAll()

In [None]:
sc.

In [None]:
edits2 = edits.select(['wiki_db','event_timestamp','event_user_is_anonymous','event_user_is_anonymous','revision_id','revision_is_identity_reverted','rcfilters_cutoff','week','sec_to_cutoff'])

In [None]:
edits2.show()

In [None]:
spark.catalog.listDatabases()

In [None]:
ores_scores = spark.read.table("ores.revision_score_public")

In [None]:
ores_scores = ores_scores.filter((f.col("model")=="damaging") & f.col("model_version") == "0.3.2")

In [None]:
edits = edits.join(ores_scores,on=[edits.wiki_db == ores_scores.wiki, edits.revision_id == ores_scores.rev_id])

In [None]:
edits2 = edits2.withColumn("wikiweek",f.concat_ws(' ', edits.wiki_db, f.date_format(edits.week,'MM-dd-yyyy')))
by_wiki_week = edits.groupby('wikiweek')

In [None]:
by_wiki_week = edits2.groupby(['wikiweek'])


In [None]:
# take a sample stratified of N = 5000 by wiki and week

samp_design = by_wiki_week.count()
samp_design = samp_design.withColumn("fraction", f.when( 5000 < f.col("count"),5000/f.col("count")).otherwise(1))
samp_design = samp_design.withColumn("weight",1/samp_design.fraction)


In [None]:
samp_design

In [None]:
fractions = samp_design.select(['wikiweek','fraction']).collect()

In [None]:
fractions = {r.wikiweek:r.fraction for r in fractions}

In [None]:
fractions

In [None]:
sample = edits2.sampleBy("wikiweek",fractions=fractions)

In [None]:
pddf_sample = sample.toPandas()