# Data-Gathering

## Imports

In [1]:
import pandas as pd
import numpy as np
import wmfdata as wmf

pd.options.display.max_columns = None
pd.options.display.max_rows = 250

from IPython.display import display_html
from IPython.display import display, HTML
from IPython.display import clear_output

import os
import requests
import warnings

## spark_session

In [4]:
spark_session = wmf.spark.get_active_session()

if type(spark_session) != type(None):
    spark_session.stop()
else:
    print('no active session')

no active session


In [3]:
spark_session = wmf.spark.create_custom_session(
    master="yarn",
    app_name='content-moderation-backlogs',
    spark_config={
        "spark.driver.memory": "6g",
        "spark.dynamicAllocation.maxExecutors": 64,
        "spark.executor.memory": "16g",
        "spark.executor.cores": 4,
        "spark.sql.shuffle.partitions": 256,
        "spark.driver.maxResultSize": "2g"
        
    }
)

clear_output()

spark_session.sparkContext.setLogLevel("ERROR")
spark_session

## functions

In [79]:
# prints a string at center of the output, bold if needed
def pr_centered(content, bold=False):
    if bold:
        content = f"<b>{content}</b>"
    
    centered_html = f"<div style='text-align:center'>{content}</div>"
    
    display(HTML(centered_html))


# display dataframes horizontally with title for each
def display_h(frames, space=100):
    html = ""
    
    for key in frames.keys():
        html_df =f'<div>{key} {frames[key]._repr_html_()}</div>'
        html += html_df
        
    html = f"""
    <div style="display:flex; justify-content: space-evenly;">
    {html}
    </div>"""
    
    display_html(html, raw=True)
    
# applies cell color to a given nth percentile
def style_percentile(i, percentile='50th'):
    return ['background-color: Aquamarine' if i.name == percentile else '' for _ in i]

In [74]:
# return quatiles for a given series (dataframe and column name)
def quantiles(frame, col='diff_sec', style_median=False):    
    qdict = {
        '10th': frame[col].quantile(0.1),
        '25th': frame[col].quantile(0.25),
        '50th': frame[col].quantile(0.5),
        '75th': frame[col].quantile(0.7),
        '90th': frame[col].quantile(0.9),
        '99th': frame[col].quantile(0.99)
    }
    
    df = pd.DataFrame(qdict.values(),
                      index=qdict.keys(),
                      columns=['seconds'])
    
    df['minutes'] = round(df['seconds'] / 60, 2)
    
    df = df.astype({'seconds': int})
    df.index.name = 'percentile'
    
    if style_median:
        df = df.style.apply(style_percentile, axis=1).format("{:.1f}")
        # df = df.astype({'seconds': int})
        return df
    else:
        return df

## query: flagged revisions

In [30]:
mwh_snapshot = '2023-11'

lang_list = ['en', 'es', 'ja', 'de', 'fr', 'ru', 'zh', 'it', 'pt', 'fa', 'id']
# the following languages do not have FlaggedRevisions enabled
exclude_langs = ['es', 'ja', 'fr', 'zh', 'it', 'pt', 'fa']

wikis_list = [f'{lang}wiki' for lang in lang_list if lang not in exclude_langs]
wikis_sql = wmf.utils.sql_tuple(wikis_list)

In [141]:
%%time

warnings.filterwarnings('ignore')

flagged_revs = pd.DataFrame()

for wiki in wikis_list:
    
    fr_query = """
    SELECT 
        fr_rev_id AS rev_id,
        page_namespace,
        fr_timestamp AS review_ts,
        MONTH(fr_timestamp) AS review_month,
        fr_rev_timestamp AS rev_ts,
        CASE
            WHEN user_editcount < 100 THEN '0-99'
            WHEN user_editcount BETWEEN 100 AND 999 THEN '100-999'
            WHEN user_editcount BETWEEN 1000 AND 4999 THEN '1000-4999'
            WHEN user_editcount >= 5000 THEN '5000+'
        END AS reviewer_edit_bucket,
        user_id AS reviewer_id    
    FROM 
        flaggedrevs fr
    JOIN 
        user u 
        ON fr.fr_user = u.user_id
    JOIN
        page p
        ON fr.fr_page_id = p.page_id
    WHERE
        fr_flags NOT LIKE '%auto%'
        AND user_name NOT LIKE '%bot%'
        AND YEAR(fr_rev_timestamp) = 2023
    ORDER BY 
        fr_timestamp DESC
    """

    flagged_revs_by_wiki = wmf.mariadb.run(fr_query, dbs=wiki)

    flagged_revs_by_wiki = (
        flagged_revs_by_wiki
        .assign(
            review_ts=pd.to_datetime(flagged_revs_by_wiki['review_ts']),
            rev_ts=pd.to_datetime(flagged_revs_by_wiki['rev_ts']),
            reviewer_edit_bucket=pd.Categorical(flagged_revs_by_wiki['reviewer_edit_bucket'])
        )
    )
    
    flagged_revs_by_wiki = (
        flagged_revs_by_wiki
        .assign(
            diff_sec=round((flagged_revs_by_wiki['review_ts'] - flagged_revs_by_wiki['rev_ts']) / np.timedelta64(1, 's')),
            diff_min=round((flagged_revs_by_wiki['review_ts'] - flagged_revs_by_wiki['rev_ts']) / np.timedelta64(1, 'm'), 2),
            wiki_db=wiki
        )
    )
    
    flagged_revs = pd.concat([flagged_revs, flagged_revs_by_wiki], ignore_index=True)
    
flagged_revs.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 897226 entries, 0 to 897225
Data columns (total 10 columns):
 #   Column                Non-Null Count   Dtype         
---  ------                --------------   -----         
 0   rev_id                897226 non-null  int64         
 1   page_namespace        897226 non-null  int64         
 2   review_ts             897226 non-null  datetime64[ns]
 3   review_month          897226 non-null  int64         
 4   rev_ts                897226 non-null  datetime64[ns]
 5   reviewer_edit_bucket  897226 non-null  object        
 6   reviewer_id           897226 non-null  int64         
 7   diff_sec              897226 non-null  float64       
 8   diff_min              897226 non-null  float64       
 9   wiki_db               897226 non-null  object        
dtypes: datetime64[ns](2), float64(2), int64(4), object(2)
memory usage: 68.5+ MB
CPU times: user 2min 9s, sys: 556 ms, total: 2min 10s
Wall time: 4min 35s


In [105]:
avg_monthly_fr_reviewers = (
    flagged_revs
    .groupby(['wiki_db', 'review_month'])['reviewer_id']
    .nunique()
    .reset_index()
    .groupby('wiki_db')
    .reviewer_id
    .mean()
    .reset_index()
    .set_index('wiki_db')
    .astype(int)
    .rename({
        'reviewer_id': '# Unique Reviewers'
    }, axis=1)
)

reviews_per_reviewer = (
    flagged_revs
    .groupby(['wiki_db', 'reviewer_id'])['rev_id']
    .nunique()
    .reset_index()
    .groupby('wiki_db')['rev_id']
    .median()
    .reset_index()
    .set_index('wiki_db')
    .astype(int)
    .rename({
        'rev_id': '# Reviews'
    }, axis=1)
)    

reviews_per_reviewer_by_bucket = (
    pd.merge(
        flagged_revs
        .groupby(['wiki_db', 'reviewer_edit_bucket'])['reviewer_id']
        .nunique()
        .reset_index()
        .rename({ 
            'reviewer_id': 'n_unique_reviewers' 
        }, axis=1),
        flagged_revs
        .groupby(['wiki_db', 'reviewer_edit_bucket'])['rev_id']
        .nunique()
        .reset_index()
        .rename({
            'rev_id': 'n_edits' 
        }, axis=1),
        on=['wiki_db', 'reviewer_edit_bucket'])
)

reviews_per_reviewer_by_bucket['edits_per_reviewer'] = round(reviews_per_reviewer_by_bucket['n_edits'] / reviews_per_reviewer_by_bucket['n_unique_reviewers']).astype(int)

rename_cols = {
    'reviewer_edit_bucket': 'Reviewer Edit Bucket',
    'n_unique_reviewers': '# Unique Reviewers',
    'edits_per_reviewer': '# Reviews per Reviewer'
}

reviews_per_reviewer_by_bucket = (
    reviews_per_reviewer_by_bucket
    .rename(rename_cols, axis=1)
    .set_index(['wiki_db', 'Reviewer Edit Bucket'], verify_integrity=True)
)

In [164]:
pr_centered('Median Time for a Flagged Revision to be Reviewed', True)
display_h({wiki:quantiles(flagged_revs.query(f"""wiki_db == '{wiki}'"""), style_median=True) for wiki in wikis_list})

Unnamed: 0_level_0,seconds,minutes
percentile,Unnamed: 1_level_1,Unnamed: 2_level_1
10th,2.0,0.0
25th,56.0,0.9
50th,790.0,13.2
75th,2637.0,44.0
90th,8541.0,142.4
99th,25873.0,431.2

Unnamed: 0_level_0,seconds,minutes
percentile,Unnamed: 1_level_1,Unnamed: 2_level_1
10th,10.0,0.2
25th,346.0,5.8
50th,13358.0,222.6
75th,134967.0,2249.5
90th,1827769.0,30462.8
99th,5338227.0,88970.5

Unnamed: 0_level_0,seconds,minutes
percentile,Unnamed: 1_level_1,Unnamed: 2_level_1
10th,0.0,0.0
25th,44.0,0.7
50th,13049.0,217.5
75th,170257.0,2837.6
90th,4034150.0,67235.8
99th,18728238.0,312137.3

Unnamed: 0_level_0,seconds,minutes
percentile,Unnamed: 1_level_1,Unnamed: 2_level_1
10th,15.0,0.2
25th,281.0,4.7
50th,7272.0,121.2
75th,35097.0,585.0
90th,825148.0,13752.5
99th,6775111.0,112918.5


In [160]:
display_h({
    'Average Monthly Unique Reviewers Reviewing Flagged Revs (2023)': avg_monthly_fr_reviewers,
    'Median Number of Reviews by Each Reviewer (2023)': reviews_per_reviewer
})

Unnamed: 0_level_0,# Unique Reviewers
wiki_db,Unnamed: 1_level_1
dewiki,2455
enwiki,231
idwiki,21
ruwiki,737

Unnamed: 0_level_0,# Reviews
wiki_db,Unnamed: 1_level_1
dewiki,7
enwiki,3
idwiki,5
ruwiki,32


In [165]:
pr_centered('Number of Reviews by Each Reviewer by Edit bucket', True)
display_h({
    '': reviews_per_reviewer_by_bucket
})

Unnamed: 0_level_0,Unnamed: 1_level_0,# Unique Reviewers,n_edits,# Reviews per Reviewer
wiki_db,Reviewer Edit Bucket,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
dewiki,0-99,1,1,1
dewiki,100-999,1526,20133,13
dewiki,1000-4999,1948,61347,31
dewiki,5000+,2359,396962,168
enwiki,100-999,16,249,16
enwiki,1000-4999,154,2741,18
enwiki,5000+,758,18967,25
idwiki,1000-4999,14,190,14
idwiki,5000+,57,2285,40
ruwiki,0-99,5,41,8


## query: recent changes revisions

### enwiki

In [3]:
def get_rc_min_timestamp(wiki):

    query = """
        SELECT
            MIN(rc_timestamp) AS ts
        FROM
            recentchanges
    """
    
    result = wmf.mariadb.run(query, wiki)
    return int(result.ts.values[0])

In [None]:
%%time

warnings.filterwarnings('ignore')

unpatrolled_rc = pd.DataFrame()

for wiki in ['enwiki', 'idwiki']:
    unpatrolled_rc_query = f"""
    SELECT
        rc_id,
        rc_timestamp AS rc_ts,
        rc_title,
        rc_namespace,
        rc_new,
        rc_this_oldid,
        rc_type,
        rc_deleted,
        actor_name AS rc_user_name
    FROM
        recentchanges rc
    JOIN
        actor a ON rc.rc_actor = a.actor_id 
    WHERE
        rc_bot = 0
        AND rc_patrolled = 0
        AND actor_name NOT LIKE '%bot%'
    """

unpatrolled_rc = wmf.mariadb.run(unpatrolled_rc_query, dbs='enwiki')
unpatrolled_rc_query.info()

In [16]:
def get_recent_changes(wiki):
    
    rc_start = get_rc_min_timestamp(wiki)
    
    rc_query = f"""
    WITH
        recent_changes AS (
            SELECT
                rc_id,
                rc_timestamp AS rc_ts,
                rc_title,
                rc_namespace,
                rc_new,
                rc_this_oldid,
                rc_type,
                rc_deleted,
                rc_patrolled,
                actor_name AS rc_user_name
            FROM
                recentchanges rc
            JOIN
                actor a ON rc.rc_actor = a.actor_id 
            WHERE
                rc_bot = 0
                AND rc_patrolled IN (0, 1)
                AND actor_name NOT LIKE '%bot%'
        ),

        patrol_logs AS (
            SELECT
                log_id,
                log_timestamp,
                log_title,
    CAST(
        SUBSTRING(
            log_params, 
            LOCATE('"4::curid";s:', log_params) + CHAR_LENGTH('"4::curid";s:') + LOCATE(':', SUBSTRING(log_params, LOCATE('"4::curid";s:', log_params) + CHAR_LENGTH('"4::curid";s:'))) + 1,
            LOCATE('"', SUBSTRING(log_params FROM LOCATE('"4::curid";s:', log_params) + CHAR_LENGTH('"4::curid";s:') + LOCATE(':', SUBSTRING(log_params, LOCATE('"4::curid";s:', log_params) + CHAR_LENGTH('"4::curid";s:'))) + 1)) - 1
        ) AS UNSIGNED) AS log_rev_id,
                log_page,
                u.user_name AS patroller_user_name,
            CASE
                WHEN user_editcount < 100 THEN '0-99'
                WHEN user_editcount BETWEEN 100 AND 999 THEN '100-999'
                WHEN user_editcount BETWEEN 1000 AND 4999 THEN '1000-4999'
                WHEN user_editcount >= 5000 THEN '5000+'
            END AS reviewer_edit_bucket,
            CASE
                WHEN is_bot THEN TRUE
                ELSE FALSE
            END AS is_patroller_bot
            FROM
                logging log
            JOIN
                actor a
                ON log.log_actor = a.actor_id
            JOIN
                user u
                ON a.actor_user = u.user_id
            LEFT JOIN
                (
                    SELECT
                        ug_user AS user_id,
                        TRUE AS is_bot
                    FROM
                        user_groups
                    WHERE
                        ug_group = 'bot'
                ) bots
                ON bots.user_id = u.user_id
            WHERE
                log_type = 'patrol'
                AND log_timestamp >= {rc_start}
        ),

        rc_patrols AS (
            SELECT
                *
            FROM
                recent_changes rc
            LEFT JOIN
                patrol_logs pl
                ON rc.rc_this_oldid = pl.log_rev_id)

    SELECT 
        * 
    FROM
        rc_patrols
    """

    return wmf.mariadb.run(rc_query, dbs=wiki)

In [17]:
%%time

warnings.filterwarnings('ignore')

idwiki_rc = get_recent_changes('idwiki')

IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out
IOStream.flush timed out


CPU times: user 33.6 s, sys: 51.6 s, total: 1min 25s
Wall time: 4h 59s
