# Partial block measurements

This work is tracked in [T209403](https://phabricator.wikimedia.org/T209403).

Originally, these measurements are geared towards being run on a monthly basis, using the Data Lake as the source. Currently, partial blocks are deployed on a low number of wikis and have not been deployed for long, meaning that monthly measurements makes little sense. Secondly, data on partial blocks are not yet available in the Data Lake.

Until further notice, we'll grab the data from the replicated MediaWiki databases and explore whether daily measurements make sense.

We have the following measurements defined in our master document, defined on a monthly basis:

* Number of partial blocks for users/IPs where the block has a start timestamp within the month of interest.
* Number of partial blocks for users/IPs where the duration of the partial block intersects with the month of interest and the user/IP makes ≥1 non-reverted edits during the intersection.
* Number of partial blocks of users/IPs where:
   * The partial block has a duration intersecting with the month of interest, and the block does not have pages added during the intersection.
   * The partial block has a duration intersecting with the month of interest, and the block is not replaced by a sitewide block during the intersection.
   * The partial block has an expiry timestamp within the month of interest and the expiration timestamp is not altered to increase the duration of the partial block.

The first part of this notebook explores the first measurement, making plots of number of partial blocks set (for both registered users and IPs) per day for all wikis. Once we have data for partial blocks in the Data Lake, we'll turn these into monthly metrics.

In [3]:
## Load Python libraries

import pymysql

import datetime as dt
import pandas as pd

import phpserialize as ps

from wmfdata import hive

import mwreverts

In [4]:
## Load the RPython library so we can use R for graphs

%load_ext rpy2.ipython

In [5]:
%%R
library(ggplot2);
library(data.table);
library(zoo);
library(tidyr);
library(RColorBrewer);


Attaching package: ‘zoo’



    as.Date, as.Date.numeric




In [38]:
## Some configuration variables.
## Start and end timestamps allow us to speed up database queries.

start_time = dt.datetime(2019, 1, 1, 0, 0, 0)
end_time = dt.datetime(2019, 4, 23, 0, 0, 0)

## The wikis that we are interested in studying
wikis = ['itwiki', 'arwiki', 'fawiki', 'metawiki', 'mediawikiwiki']

## Mapping from wiki DB name to host/port information
dbhost_map = dict()

## Mapping from wiki DB name to database connection
dbconn_map = dict()

## Format strings:
## MediaWiki database timestamp format
mw_format = "%Y%m%d%H%M%S"
hive_format = "%Y-%m-%dT%H:%M:%S"


In [7]:
# The second function needs dnspython to work
import dns.resolver
import glob

def get_mediawiki_section_dbname_mapping(mw_config_path, use_x1):
    db_mapping = {}
    if use_x1:
        dblist_section_paths = [mw_config_path.rstrip('/') + '/dblists/all.dblist']
    else:
        dblist_section_paths = glob.glob(mw_config_path.rstrip('/') + '/dblists/s[0-9]*.dblist')
    for dblist_section_path in dblist_section_paths:
        with open(dblist_section_path, 'r') as f:
            for db in f.readlines():
                db_mapping[db.strip()] = dblist_section_path.strip().rstrip('.dblist').split('/')[-1]

    return db_mapping


def get_dbstore_host_port(db_mapping, use_x1, dbname):
    if dbname == 'staging':
        shard = 'staging'
    elif use_x1:
        shard = 'x1'
    else:
        try:
            shard = db_mapping[dbname]
        except KeyError:
            raise RuntimeError("The database {} is not listed among the dblist files of the supported sections."
                               .format(dbname))
    answers = dns.resolver.query('_' + shard + '-analytics._tcp.eqiad.wmnet', 'SRV')
    host, port = str(answers[0].target), answers[0].port
    return (host,port)

wikidb_map = get_mediawiki_section_dbname_mapping('/srv/mediawiki-config', False)

In [40]:
for wiki in wikis:
    dbhost_map[wiki] = get_dbstore_host_port(wikidb_map, False, wiki)


In [41]:
for wiki in wikis:
    dbconn_map[wiki] = pymysql.connect(
        host = dbhost_map[wiki][0],
        port = dbhost_map[wiki][1],
        database = wiki,
        read_default_file = '/etc/mysql/conf.d/research-client.cnf',
        charset = 'utf8'
    )

In [10]:
## Code form wmfdata to decode bytestrings returned from the database into UTF-8 strings

def try_decode(cell):
    try:
        return cell.decode(encoding = "utf-8")
    except AttributeError:
        return cell


In [11]:
def extract_params(row):
    '''
    Extract relevant block parameters from the given row of partial block log
    data, and return a new `pandas.Series` that can be used to update a data frame
    with columns for those parameters.
    '''
    params = ps.loads(row['log_params'].encode('utf-8'), decode_strings=True)
    
    try:
        duration = params['5::duration']
    except KeyError:
        duration = None
        
    try:
        flags = params['6::flags']
    except KeyError:
        flags = None
        
    try:
        num_pages = len(params['7::restrictions'])
    except KeyError:
        num_pages = None
    
    return(pd.Series([duration, flags, num_pages]))

In [42]:
## SQL query to get data on partial blocks, adapted from
## https://github.com/dayllanmaza/wikireplicas-reports/blob/master/generators/partial_blocks.py

def get_partial_blocks(wikis, dbconns, start_timestamp, end_timestamp):
    pb_query = '''
    SELECT DATABASE() AS wiki,
           log_timestamp,
           log_params,
           log_user_text AS blocker,
           log_title AS blockee,
           comment_text AS reason
    FROM {wiki}.logging
    LEFT JOIN {wiki}.comment
    ON log_comment_id=comment_id
    WHERE log_timestamp >= "{start_timestamp}"
    AND log_timestamp < "{end_timestamp}"
    AND log_type = "block"
    AND log_action = "block" -- only interested in initial blocks created
    AND log_params LIKE '%"sitewide";b:0;%'
    '''
    
    pbs = []
    for wiki in wikis:
        df = pd.read_sql_query(
            pb_query.format(
                wiki = wiki,
                start_timestamp = start_timestamp.strftime(mw_format),
                end_timestamp = end_timestamp.strftime(mw_format)),
            dbconns[wiki])
        df = df.applymap(try_decode).rename(columns = try_decode)
        
        ## Turn the timestamps into datetime objects, and add a log_date string for convenience
        df['log_timestamp'] = pd.to_datetime(df['log_timestamp'], format=mw_format, utc=True)
        df['log_date'] = df['log_timestamp'].apply(lambda x: str(x.date()))

        df[['block_duration', 'block_flags', 'block_num_pages']] = df.apply(extract_params, axis=1)
        
        pbs.append(df)
    
    return(pd.concat(pbs))

partial_blocks = get_partial_blocks(wikis, dbconn_map, start_time, end_time)


In [13]:
partial_blocks.head()

Unnamed: 0,wiki,log_timestamp,log_params,blocker,blockee,reason,log_date,block_duration,block_flags,block_num_pages
0,itwiki,2019-01-16 12:15:11+00:00,"a:4:{s:11:""5::duration"";s:8:""infinite"";s:8:""6:...",Daimona Eaytoy,Eaytoy_Daimona,Test blocchi parziali,2019-01-16,infinite,nocreate,1.0
1,itwiki,2019-01-16 12:36:56+00:00,"a:4:{s:11:""5::duration"";s:8:""infinite"";s:8:""6:...",Daimona Eaytoy,Eaytoy_Daimona,,2019-01-16,infinite,nocreate,1.0
2,itwiki,2019-01-16 12:52:53+00:00,"a:4:{s:11:""5::duration"";s:7:""2 hours"";s:8:""6::...",Ruthven,151.20.139.29,[[WP:Vandalismo|Vandalismi]],2019-01-16,2 hours,"anononly,nocreate",1.0
3,itwiki,2019-01-16 13:01:55+00:00,"a:4:{s:11:""5::duration"";s:7:""8 hours"";s:8:""6::...",Ruthven,93.55.168.167,[[WP:Vandalismo|Vandalismi]],2019-01-16,8 hours,"anononly,nocreate",1.0
4,itwiki,2019-01-16 13:26:10+00:00,"a:3:{s:11:""5::duration"";s:9:""5 minutes"";s:8:""6...",Buggia,Wolframio,,2019-01-16,5 minutes,,


In [43]:
%%R -i partial_blocks

partial_blocks = data.table(partial_blocks);
partial_blocks[, log_date := as.Date(log_date)];

## Some configuration variables
graph_dir = 'graphs/';
pb_graph_prefix = 'partial_blocks_per_day_';
pb_graph_suffix = '.png';

make_pb_graphs = function(pbs, graph_dir, prefix, suffix) {
    wikis = unique(pbs$wiki);
    
    for(w in wikis) {
        ## Grab the subset for this wiki, as we have different date ranges for each
        wiki_blocks = pbs[wiki == w];
        
        ## Make a date sequence from the first to the last date, and left join against
        ## the data to fill in any dates with 0 blocks.

        dates = seq.Date(min(wiki_blocks$log_date), max(wiki_blocks$log_date), by='day');
        dates = data.table(log_date = dates);

        blocks_per_day = wiki_blocks[, list(num_blocks=sum(.N)), by=log_date];
        blocks_per_day = blocks_per_day[dates, on = 'log_date'];
        blocks_per_day[is.na(num_blocks), num_blocks := 0];

        ## Add the moving averages
        blocks_per_day[
            , num_blocks_1wma := rollapply(
                num_blocks,
                width = 7,
                FUN = mean,
                na.rm = TRUE,
                fill = 0,
                align = 'right')];
        blocks_per_day[
            , num_blocks_2wma := rollapply(
                num_blocks,
                width = 14,
                FUN = mean,
                na.rm = TRUE,
                fill = 0,
                align = 'right')];
        
        ## Tidy up and make the plot
        blocks_per_day_long = blocks_per_day %>% gather(measure, num_blocks, 2:4);
        blocks_per_day_long = data.table(blocks_per_day_long);
        blocks_per_day_long[measure == 'num_blocks', measure := 'raw data'];
        blocks_per_day_long[measure == 'num_blocks_1wma', measure := '1-week MA'];
        blocks_per_day_long[measure == 'num_blocks_2wma', measure := '2-week MA'];
        blocks_per_day_long[
            , measure := ordered(measure, rev(c('raw data', '1-week MA', '2-week MA')))];

        ## Choose blues with some contrast, with the raw data getting the strongest color
        b_palette = brewer.pal('Blues', n = 7)[c(3,5,7)];
        
        block_day_plot = ggplot(blocks_per_day_long,
                                aes(x=log_date, y=num_blocks, color=measure)) +
        scale_x_date(date_breaks = "1 week", date_labels = "%d %b") +
        scale_y_continuous() +
        scale_colour_manual(values = b_palette) +
        expand_limits(y = 0) +
        labs(title = paste0('Partial blocks created per day - ', w),
             x = 'Date',
             y = 'Number of blocks') +
        theme_light(base_size = 14) +
        geom_line();

        ggsave(paste0(graph_dir, prefix, w, suffix),
           plot = block_day_plot, width = 30, height = 20, units = "cm", dpi = "screen");
    }
}

make_pb_graphs(partial_blocks, graph_dir, pb_graph_prefix, pb_graph_suffix);

  res = PandasDataFrame.from_items(items)


# Partially blocked user/IP makes constructive edits

The second measurement we have defined is:

* Number of partial blocks for users/IPs where the duration of the partial block intersects with the month of interest and the user/IP makes ≥1 non-reverted edits during the intersection.

At this point, we do not have monthly data, so we'll instead look at the daily number of active partial blocks where the blocked user/IP makes ≥1 non-reverted edits that day.

In [35]:
def was_reverted(db_conn, rev_id, rev_page, rev_timestamp, rev_sha1,
                 is_archive = False, radius = 15, hours = 48):
    '''
    Use the mwreverts library to determine if the given edit is reverted within `radius`
    number of edits or `hours` number of hours. `is_archive` means the archive table will
    be used to identify the edit.
    '''
    
    ## We'll have to grab radius - 1 no. of edits from before the given edit,
    ## then radius no. of edits (within the given time) from after the given edit
    ## and then search through those.
    
    table_name = 'revision'
    id_column = 'rev_id'
    ts_column = 'rev_timestamp'
    sha1_column = 'rev_sha1'
    page_column = 'rev_page'
    if is_archive:
        table_name = 'archive'
        id_column = 'ar_rev_id'
        ts_column = 'ar_timestamp'
        sha1_column = 'ar_sha1'
        page_column = 'ar_page_id'
        
    pre_query = '''
    SELECT rev_id, rev_timestamp, rev_sha1
    FROM (
        SELECT {id_col} AS rev_id, {ts_col} AS rev_timestamp,
               {sha1_col} AS rev_sha1
        FROM {table}
        WHERE {page_col} = {rev_page}
        AND {ts_col} <= "{rev_timestamp}"
        ORDER BY {ts_col} DESC
        LIMIT {radius}
    ) AS pr
    ORDER BY rev_timestamp ASC'''
    
    post_query = '''
    SELECT {id_col} AS rev_id, {ts_col} AS rev_timestamp,
           {sha1_col} AS rev_sha1
    FROM {table}
    WHERE {page_col} = {rev_page}
    AND {ts_col} > "{rev_timestamp}"
    AND {ts_col} < "{max_timestamp}"
    ORDER BY {ts_col} ASC
    LIMIT {radius}'''
    
    max_timestamp = (dt.datetime.strptime(rev_timestamp, mw_format)
                     + dt.timedelta(hours = 48)).strftime(mw_format)
    
    rev_detector = mwreverts.Detector()
    
    was_reverted = False
    
    with db_conn.cursor() as db_cursor:
        db_cursor.execute(pre_query.format(
            table = table_name, id_col = id_column, ts_col = ts_column,
            sha1_col = sha1_column, page_col = page_column, rev_page = rev_page,
            rev_timestamp = rev_timestamp, radius = radius))
        for row in db_cursor:
            r_id = row[0]
            r_timestamp = row[1].decode('utf-8')
            r_sha1 = row[2].decode('utf-8')
            
            rev_detector.process(r_sha1, {'rev_id': r_id,
                                            'rev_timestamp': r_timestamp})
        
        db_cursor.execute(post_query.format(
            table = table_name, id_col = id_column, ts_col = ts_column,
            sha1_col = sha1_column, page_col = page_column, rev_page = rev_page,
            rev_timestamp = rev_timestamp, max_timestamp = max_timestamp,
            radius = radius))
        for row in db_cursor:
            r_id = row[0]
            r_timestamp = row[1].decode('utf-8')
            r_sha1 = row[2].decode('utf-8')
            
            revert = rev_detector.process(r_sha1, {'rev_id': r_id,
                                                     'rev_timestamp': r_timestamp})
            
            if revert:
                for revision in revert.reverteds:
                    if revision['rev_id'] == rev_id:
                        was_reverted = True
                        
    return(was_reverted)

def test_revert():
    enwiki = get_dbstore_host_port(wikidb_map, False, 'enwiki')
    en_conn = pymysql.connect(
        host = enwiki[0],
        port = enwiki[1],
        database = 'enwiki',
        read_default_file = '/etc/mysql/conf.d/research-client.cnf',
        charset = 'utf8'
    )
    print(was_reverted(en_conn, 794446073, 24911978,
                       '20170808022628', 'mp9mjock323d1tfi5likm1yf0lfa9j5'))
    print(was_reverted(en_conn, 793784102, 24911978,
                       '20170803225004', 'qw89z7d27jodyb26v35fmj33hirhko1'))
    en_conn.close()
    
test_revert()

True
False


In [37]:
## Note: we'll have to grab log data for _all_ partial blocks, so we know when they start,
## when they expire, and if they had any reblock/unblock events affecting their duration.
## This also means we'll need to figure out when they expire #sadface

## For each wiki...
##   For each block...
##     Grab any user edits during that timespan (from both revision and archive tables).
##     Investigate if either of those edits were reverted within k edits or 48 hours.
##     For any edits that were not reverted, increase the count for that day.

## So, we're looking at having a data structure with counts for every day
## (defaultdict of ints with day as the key).
## At the end, turn that into a pandas.DataFrame for the days and counts,
## then graph that.

## The range in the dataset needs to reflect the range in the block data. Meaning that
## the first and last dates in this measurement should be the first and last days for which
## we have at least one block set. "last day" will be the minimum of the configured end date
## and the last day we have a block set for.

def constructive_blocked_edits(df, start_timestamp, end_timestamp):
    '''
    For the given dataframe with data on partial blocks, calculate the number of blocks
    for each day between start_timestamp and end_timestamp where the blocked user/IP
    made at least one constructive edit.
    '''
    
    ## actual start date is maximum of the start_timestamp and the first block in the dataset
    ## actual last date is the minimum of the end_timestamp and the expiration of the last
    ## block in the dataset
    
    return()