In [1]:
import sys

sys.path.append('/home/jovyan/work')
%load_ext autoreload
%autoreload 2

In [26]:
import pandas as pd
import numpy as np

In [66]:
from helpers.logger import Logger
from helpers.vars import DUMPS_PATH, DATA_PATH, PRE_PATH, RES_PATH

### Load general Data

In [28]:
column_names = np.loadtxt(f'{DATA_PATH}/mediawiki_history_columns.txt', dtype=str)

In [37]:
def process_lang_history(lang, column_names, dtypes, path=DUMPS_PATH, ending='tsv.bz2', years=[2018, 2019, 2020]):
    df_lang = pd.DataFrame()
    for year in years:
        start = time.time()
        if lang == 'en':
            start = time.time()
            try:
                for month in range(1, 13): # just throw exception when out of bounds here
                    df_lang = pd.concat([df_lang, pd.read_csv(f'{path}/{lang}-{year}-{month:02d}.{ending}', sep='\t', names=list(column_names), dtype=dtypes, warn_bad_lines=True, error_bad_lines=False)])
                    Logger.instance().info(f'Loaded {lang}-{year}-{month:02d} in {time.time() - start}')
            except:
                Logger.instance().info(f'Error when processing {lang}-{year}-{month}')
        else:
            try:
                df_lang = pd.concat([df_lang, pd.read_csv(f'{path}/{lang}-{year}.{ending}', sep='\t', names=list(column_names), dtype=dtypes, warn_bad_lines=True, error_bad_lines=False)])
                Logger.instance().info(f'Loaded {lang}-{year} in {time.time() - start}')
            except:
                Logger.instance().info(f'Error when processing {lang}-{year}')
    return df_lang

### User Kind, Datetime, Title normalization

In [87]:
import swifter
import pendulum
from mw.lib import title as mw_t
import time
import traceback
from pathlib import Path

In [20]:
path_lang_codes = f'{DATA_PATH}/lang_code.csv'

tz_ = dict(pd.read_csv(path_lang_codes).set_index("code").timezone)
tz = {code: pendulum.timezone(t) for code, t in tz_.items()}
codes = list(tz.keys())

In [44]:
# TEST PARAMS!
PRE_PATH = '../../data/coronawiki/testrun'
DUMPS_PATH = '../../data/coronawiki/mw-history'
#codes = ['da', 'fi'] # for testing!

In [45]:
df_dict = {}
for code in codes:
    df_dict[code] = process_lang_history(code, column_names, dtypes=str)

02-18 15:23 : INFO : Loaded da-2018 in 14.023335695266724
02-18 15:23 : INFO : Loaded da-2019 in 14.11446475982666
02-18 15:23 : INFO : Loaded da-2020 in 12.36544680595398
02-18 15:24 : INFO : Loaded fi-2018 in 25.182634830474854
02-18 15:24 : INFO : Loaded fi-2019 in 26.957128286361694
02-18 15:25 : INFO : Loaded fi-2020 in 25.262845516204834


In [48]:
Path(f'{PRE_PATH}').mkdir(parents=True, exist_ok=True)

for code in codes:
    start = time.time()
    try:
        # === user kind merged to hear
        df_dict[code]['user_kind'] = df_dict[code].apply(lambda row: 'anonymous' if pd.isna(row.event_user_id) else 'bot' if not pd.isna(row.event_user_is_bot_by) else 'account', axis=1)
        Logger.instance('pipeline').info(f'User-Kind assignment for {code} in {time.time() - start}')
        
        # === Convert timestamp
        df_dict[code]['event_timestamp_t'] = df_dict[code].event_timestamp.swifter.apply(pd.to_datetime) #pd.to_datetime(df_dict[code].event_timestamp)
        # Set UTC date
        df_dict[code]["event_timestamp_t"] = df_dict[code].event_timestamp_t.dt.tz_localize("UTC", ambiguous='NaT', nonexistent='NaT')
        df_dict[code]["date_utc"] = df_dict[code].event_timestamp_t.dt.strftime("%Y%m%d").astype(int)
        # Localize date
        df_dict[code]["event_timestamp_t"] = df_dict[code].event_timestamp_t.dt.tz_convert(tz_[code])
        df_dict[code]["date"] = df_dict[code].event_timestamp_t.dt.strftime("%Y%m%d").astype(int)
        Logger.instance('pipeline').info(f'Finished date assignment for {code} in {time.time() - start}')
        
        # === Normalize date
        df_dict[code]['page_title_norm'] = df_dict[code].page_title.swifter.apply(lambda title: mw_t.normalize(str(title)) if not pd.isna(title) else np.nan)
        df_dict[code]['page_title_historical_norm'] = df_dict[code].page_title_historical.swifter.apply(lambda title: mw_t.normalize(str(title)) if not pd.isna(title) else np.nan)
        Logger.instance('pipeline').info(f'Finished name normalization for {code} in {time.time() - start}')
    except Exception as e:
        traceback.print_exc()
        Logger.instance('pipeline').info(f'Error for {code}: {str(e)}')

02-18 15:26 : INFO : Created new singleton instance
02-18 15:26 : INFO : User-Kind assignment for da in 57.78879165649414


New logging instance for ../logging//pipeline.log


  "This pandas object has duplicate indices, and swifter may not be able to improve performance. Consider resetting the indices with `df.reset_index(drop=True)`."
02-18 15:27 : INFO : Finished date assignment for da in 86.69635105133057


HBox(children=(FloatProgress(value=0.0, description='Pandas Apply', max=1466616.0, style=ProgressStyle(descrip…




HBox(children=(FloatProgress(value=0.0, description='Pandas Apply', max=1466616.0, style=ProgressStyle(descrip…

02-18 15:27 : INFO : Finished name normalization for da in 98.7143816947937





02-18 15:29 : INFO : User-Kind assignment for fi in 112.8374559879303
02-18 15:30 : INFO : Finished date assignment for fi in 168.15449857711792


HBox(children=(FloatProgress(value=0.0, description='Pandas Apply', max=2824278.0, style=ProgressStyle(descrip…




HBox(children=(FloatProgress(value=0.0, description='Pandas Apply', max=2824278.0, style=ProgressStyle(descrip…

02-18 15:30 : INFO : Finished name normalization for fi in 191.20214939117432





In [49]:
# write to processed
for code in codes:
    try:
        start = time.time()
        df_dict[code].to_csv(f'{PRE_PATH}/{code}_mwh_processed.tsv.gz', header=True, index=False, sep="\t", compression='gzip')
        Logger.instance('pipeline').info(f'Dumping {code} done in {time.time() - start}')
    except Exception as e:
        traceback.print_exc()
        Logger.instance('pipeline').info(f'Error when saving {code}: {str(e)}')

02-18 15:32 : INFO : Dumping da done in 96.48346042633057
02-18 15:35 : INFO : Dumping fi done in 188.5431933403015


### Group By Date and User_Kind to get newcomers

In [51]:
from helpers.files import save_to_pickle

In [52]:
dict_creation = {}
for code in codes:
    start = time.time()
    try:
        df_code = df_dict[code]
        # define masks
        create_event_mask = (df_code.event_entity=='user') & (df_code.event_type == 'create')
        create_revision_mask = (df_code.event_entity=='revision') & (df_code.event_type == 'create')
        no_bot_mask = (df_code['event_user_is_bot_by'].isna() | df_code['event_user_is_bot_by_historical'].isna())
        self_creation_mask = (df_code['event_user_is_created_by_self'] == 'true')
        no_anon_mask = (df_code['event_user_is_anonymous'] != 'true')

        # === get users by registration
        group_creation = df_code[create_event_mask & no_anon_mask & no_bot_mask & self_creation_mask].groupby(['date'])['event_user_id'].size()

        # === get user by nth edit
        # n=1
        edit_count_mask = df_code.event_user_revision_count == '1'
        group_edit1 = df_code[create_revision_mask & no_anon_mask & no_bot_mask & edit_count_mask].groupby(['date'])['event_user_id'].size().rename('edit_1')
        # n=5
        edit_count_mask = df_code.event_user_revision_count == '5'
        group_edit5 = df_code[create_revision_mask & no_anon_mask & no_bot_mask & edit_count_mask].groupby(['date'])['event_user_id'].size().rename('edit_5')
        dict_creation[code] = pd.concat([group_creation, group_edit1, group_edit5], axis=1).fillna(0)
        Logger.instance('pipeline').info(f'User creation computation for {code} done in {time.time() - start}')
    except Exception as e:
        traceback.print_exc()
        Logger.instance('pipeline').info(f'Error for {code}: {str(e)}')
        
save_to_pickle(f'{PRE_PATH}/dict_newcomers_selfcreated.pkl', dict_creation)
Logger.instance('pipeline').info(f'Finished newcomers')

02-18 15:36 : INFO : User creation computation for da done in 4.889431715011597
02-18 15:36 : INFO : User creation computation for fi done in 9.111894607543945
02-18 15:36 : INFO : Finished newcomers


### Group By Date and Page and user_kind to get edits per day

In [53]:
# Group By Date and Page and user_kind to get edits per day
dict_edits_byid = {} # grouped by id
dict_edits_bytitle = {} # grouped by title
for code in codes:
    try:
        start = time.time()
        df_code = df_dict[code]
        create_revision_mask = (df_code.event_entity=='revision') & (df_code.event_type == 'create')
        ns_mask = df_code.page_namespace == '0'

        # group by date, page_id, user_kind
        df_code.revision_text_bytes = pd.to_numeric(df_code['revision_text_bytes'], errors='coerce').fillna(0)
        df_code_masked = df_code[create_revision_mask & ns_mask]
        
        dict_edits_byid[code] = df_code_masked.groupby(['date', 'page_id', 'user_kind']).agg(
            {'event_user_id': 'size', 'revision_text_bytes': 'sum', 'page_title': 'last', 
             'page_title_norm': 'last', 'page_title_historical_norm': 'last'})
        dict_edits_bytitle[code] = df_code_masked.groupby(['date', 'page_title_norm', 'user_kind']).agg(
            {'event_user_id': 'size', 'revision_text_bytes': 'sum', 'page_id': lambda x: set(x), 'page_title': lambda x: set(x),  'page_title_historical_norm': lambda x: set(x)})

        Logger.instance('pipeline').info(f'Grouped by user/user_kind/pageid for {code} done in {time.time() - start}')
    except Exception as e:
        traceback.print_exc()
        Logger.instance('pipeline').info(f'Error for {code}: {str(e)}')  
    
save_to_pickle(f'{PRE_PATH}/dict_edits_byid.pkl', dict_edits_byid)
save_to_pickle(f'{PRE_PATH}/dict_edits_bytitle.pkl', dict_edits_bytitle)
Logger.instance('pipeline').info(f'Finished edits')

02-18 15:36 : INFO : Grouped by user/user_kind/pageid for da done in 32.320977210998535
02-18 15:37 : INFO : Grouped by user/user_kind/pageid for fi done in 61.646052837371826
02-18 15:38 : INFO : Finished edits


### Group By Date and user_kind to get identity reverts per day (see above)

In [54]:
dict_reverts = {}
for code in codes:
    try:
        start = time.time()
        df_code = df_dict[code]
        create_revision_mask = (df_code.event_entity=='revision') & (df_code.event_type == 'create')
        ns_mask = df_code.page_namespace == '0'

        # get reverts per day as well as reverted
        df_reverted = df_code[create_revision_mask & ns_mask & (df_code.revision_is_identity_reverted == 'true')].groupby(['date', 'user_kind'])['revision_is_identity_reverted'].size()
        df_reverts = df_code[create_revision_mask & ns_mask & (df_code.revision_is_identity_revert == 'true')].groupby(['date', 'user_kind'])['revision_is_identity_revert'].size()
        
        # reindex so all dates are filled
        df_reverted = df_reverted.reindex(
            pd.MultiIndex.from_product([df_code.date.unique(), df_reverted.index.levels[1]], names=['date', 'user_kind']), fill_value=0)
        df_reverts = df_reverts.reindex(
            pd.MultiIndex.from_product([df_code.date.unique(), df_reverts.index.levels[1]], names=['date', 'user_kind']), fill_value=0)
    
        dict_reverts[code] = pd.concat([df_reverted, df_reverts], axis=1).fillna(0)
        Logger.instance('pipeline').info(f'Computed reverts by {code} done in {time.time() - start}')
    except Exception as e:
        traceback.print_exc()
        Logger.instance('pipeline').info(f'Error for {code}: {str(e)}')  
    
save_to_pickle(f'{PRE_PATH}/dict_reverts.pkl', dict_reverts)
Logger.instance('pipeline').info(f'Finished identity reverts')

02-18 15:38 : INFO : Computed reverts by da done in 1.6375210285186768
02-18 15:38 : INFO : Computed reverts by fi done in 3.4197893142700195
02-18 15:38 : INFO : Finished identity reverts


## Combine edit dictionary with covid info

In [55]:
# This file was pre-generated using the .json-list of COVID-articles from:
# https://covid-data.wmflabs.org/
path_covid = f'{DATA_PATH}/covid_linked.f'
df_covid = pd.read_feather(path_covid)
df_covid['covid'] = True
df_covid['index'] = df_covid['index'].apply(lambda t: mw_t.normalize(str(t)))

In [58]:
df_edits_covid = {}
for code, df_code in dict_edits_bytitle.items():
    df_edits_covid[code] = df_code.reset_index().merge(df_covid[df_covid.site == f'{code}wiki'], left_on=['page_title_norm'], right_on=['index'], how='left').fillna({'covid': False}).drop(['index', 'site', 'qid'], axis=1)

save_to_pickle(f'{PRE_PATH}/dict_edits_bytitle_covid.pkl', df_edits_covid)

# Generate Final Aggregation

In [78]:
from helpers.preprocessing import aggregate_preprocess_results

In [83]:
final_agg = aggregate_preprocess_results(codes, df_edits_covid, dict_creation, dict_reverts)
final_agg

reading and making sure all dates are filled


02-18 16:06 : INFO : Processing da took 1.8816471099853516


reading and making sure all dates are filled


02-18 16:06 : INFO : Processing fi took 4.191196441650391


Unnamed: 0,date,covid,user_kind,count,rev_len_sum,actor_user,edit_1,edit_5,revision_is_identity_reverted,revision_is_identity_revert,code
0,2018-01-01,False,account,466,4553249.0,7.0,8.0,4.0,9,41,da
1,2018-01-01,False,anonymous,99,728527.0,0.0,0.0,0.0,29,1,da
2,2018-01-01,False,bot,44,190496.0,0.0,0.0,0.0,0,2,da
3,2018-01-02,False,account,454,3108653.0,23.0,18.0,7.0,17,37,da
4,2018-01-02,False,anonymous,128,912934.0,0.0,0.0,0.0,23,3,da
...,...,...,...,...,...,...,...,...,...,...,...
3875,2020-12-01,False,account,771,10177872.0,19.0,15.0,2.0,6,45,fi
3876,2020-12-01,False,anonymous,334,5552864.0,0.0,0.0,0.0,36,2,fi
3877,2020-12-01,False,bot,14,764952.0,0.0,0.0,0.0,2,9,fi
3878,2020-12-01,True,account,1,52463.0,19.0,15.0,2.0,6,45,fi


In [86]:
Path(f'{RES_PATH}').mkdir(parents=True, exist_ok=True)
final_agg.to_csv(f'{RES_PATH}/aggregated.tsv.gz', index=False, sep="\t", compression="gzip")