# Add a Link: Leading Indicators

The phab task for this is [T277355](https://phabricator.wikimedia.org/T277355)

We've got the following three leading indicators, and a fourth noted below it:

1. Revert rate: compare Add a Link edits to that of unstructured link tasks.
2. User rejection rate: do users reject more than 30% of links?
3. Task completion rate: what is the proportion of users who start the Add a Link task and complete it? If it is below 75%, we investigate.

With regards to rejection rate, we also want to calculate "proportion of users who accept all links". We then want to compare the rejection rate when exluding these users from the dataset.

We distinguish between users who registered before and after the feature was deployed. This is done in order to not create confusion. We decided prior to deployment that all existing users would get the unstructured task replaced by Add a Link so that we could show it to a wider user base. As a result, we put it in front of a large group of retained users who are likely prolific contributors.

Users who registered after deployment are randomly assigned (with 50% chance either way) to Add a Link or the unstructured task, and do not have an opportunity to switch. This is part of our experiment plan. As a result, a limited number of users are getting the unstructured link task. Splitting the analysis based on when users registered allows comparing the two types of tasks directly for the group who signed up after deployment where it makes sense (e.g. revert rate).

# Libraries and Configuration

In [1]:
import datetime as dt

import pandas as pd
import numpy as np

from collections import defaultdict

from wmfdata import spark, mariadb

from scipy import stats

In [2]:
## Start timestamp of the experiment (https://phabricator.wikimedia.org/T277356#7120922)
exp_start = '2021-05-27T19:12:03'

exp_start_ts = dt.datetime.strptime(exp_start, '%Y-%m-%dT%H:%M:%S')

## We'll limit data gathering to midnight June 14, the day we're gathering data
exp_end_ts = dt.datetime(2021, 6, 14, 0, 0, 0)

## List of wikis that we deployed to:
wikis = ['arwiki','bnwiki','cswiki', 'viwiki']

## Lists of known users to ignore (e.g. test accounts and experienced users)
known_users = defaultdict(set)
known_users['cswiki'].update([14, 127629, 303170, 342147, 349875, 44133, 100304, 307410, 439792, 444907,
                              454862, 456272, 454003, 454846, 92295, 387915, 398470, 416764, 44751, 132801,
                              137787, 138342, 268033, 275298, 317739, 320225, 328302, 339583, 341191,
                              357559, 392634, 398626, 404765, 420805, 429109, 443890, 448195, 448438,
                              453220, 453628, 453645, 453662, 453663, 453664, 440694, 427497, 272273,
                              458025, 458487, 458049, 59563, 118067, 188859, 191908, 314640, 390445,
                              451069, 459434, 460802, 460885, 79895, 448735, 453176, 467557, 467745,
                              468502, 468583, 468603, 474052, 475184, 475185, 475187, 475188, 294174,
                              402906, 298011])

known_users['kowiki'].update([303170, 342147, 349875, 189097, 362732, 384066, 416362, 38759, 495265,
                              515553, 537326, 566963, 567409, 416360, 414929, 470932, 472019, 485036,
                              532123, 558423, 571587, 575553, 576758, 360703, 561281, 595100, 595105,
                              595610, 596025, 596651, 596652, 596653, 596654, 596655, 596993, 942,
                              13810, 536529])

known_users['viwiki'].update([451842, 628512, 628513, 680081, 680083, 680084, 680085, 680086, 355424,
                              387563, 443216, 682713, 659235, 700934, 705406, 707272, 707303, 707681, 585762])

known_users['arwiki'].update([237660, 272774, 775023, 1175449, 1186377, 1506091, 1515147, 1538902,
                              1568858, 1681813, 1683215, 1699418, 1699419, 1699425, 1740419, 1759328, 1763990])

## Grab the user IDs of known test accounts so they can be added to the exclusion list

def get_known_users(wiki):
    '''
    Get user IDs of known test accounts and return a set of them.
    '''
    
    username_patterns = ["MMiller", "Zilant", "Roan", "KHarlan", "MWang", "SBtest",
                         "Cloud", "Rho2019", "Test"]

    known_user_query = '''
SELECT user_id
FROM user
WHERE user_name LIKE "{name_pattern}%"
    '''
    
    known_users = set()
    
    for u_pattern in username_patterns:
        new_known = mariadb.run(known_user_query.format(
            name_pattern = u_pattern), wiki)
        known_users = known_users | set(new_known['user_id'])

    return(known_users)
        
for wiki in wikis:
    known_users[wiki] = known_users[wiki] | get_known_users(wiki)

In [3]:
## Controlling the maximum number of rows, columns, and output width used
## by Pandas. Update it with larger values if tables start getting truncated.

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 150)

## Helper functions

In [4]:
def make_known_users_sql(kd, wiki_column, user_column):
    '''
    Based on the dictionary `kd` mapping wiki names to sets of user IDs of known users,
    create a SQL expression to exclude users based on the name of the wiki matching `wiki_column`
    and the user ID not matching `user_column`
    '''
    
    wiki_exp = '''({w_column} = '{wiki}' AND {u_column} NOT IN ({id_list}))'''
    
    expressions = list()

    ## Iteratively build the expression for each wiki
    for wiki_name, wiki_users in kd.items():
        expressions.append(wiki_exp.format(
            w_column = wiki_column,
            wiki = wiki_name,
            u_column = user_column,
            id_list = ','.join([str(u) for u in wiki_users])
        ))
    
    ## We then join all the expressions with an OR, and we're done.
    return(' OR '.join(expressions))
    

In [5]:
def make_when_then(wiki_list, wiki_column):
    '''
    Take the ordered list of wiki names and turn it into a string
    of "WHEN wiki_column = '{wiki}' THEN '{k}'" where `k` is the index
    of the wiki in the list, so it can be used for ordering results.
    '''

    whens = list()
    
    for k, wiki in enumerate(wiki_list):
        whens.append(f'WHEN {wiki_column} = "{wiki}" THEN "{k:02}"')
    
    ## Join them with line breaks to create the list
    return('\n'.join(whens))


In [6]:
def make_partition_statement(start_ts, end_ts, prefix = ''):
    '''
    This takes the two timestamps and creates a statement that selects
    partitions based on `year`, `month`, and `day` in order to make our
    data gathering not use excessive amounts of data. It assumes that
    `start_ts` and `end_ts` are not more than a month apart.
    This assumption simplifies the code and output a lot.
    
    An optional prefix can be set to enable selecting partitions for
    multiple tables with different aliases.
    
    :param start_ts: start timestamp
    :type start_ts: datetime.datetime
    
    :param end_ts: end timestamp
    :type end_ts: datetime.datetime
    
    :param prefix: prefix to use in front of partition clauses, "." is added automatically
    :type prefix: str
    '''
    
    if prefix:
        prefix = f'{prefix}.' # adds "." after the prefix
    
    # there are three cases:
    # 1: month and year are the same, output a "BETWEEN" statement with the days
    # 2: months differ, but the years the same.
    # 3: years differ too.
    # Case #2 and #3 can be combined, because it doesn't really matter
    # if the years are the same in the month-selection or not.
    
    if start_ts.year == end_ts.year and start_ts.month == end_ts.month:
        return(f'''{prefix}year = {start_ts.year}
AND {prefix}month = {start_ts.month}
AND {prefix}day BETWEEN {start_ts.day} AND {end_ts.day}''')
    else:
        return(f'''
(
    ({prefix}year = {start_ts.year}
     AND {prefix}month = {start_ts.month}
     AND {prefix}day >= {start_ts.day})
 OR ({prefix}year = {end_ts.year}
     AND {prefix}month = {end_ts.month}
     AND {prefix}day <= {end_ts.day})
)''')

In [7]:
def round_cell(x, num_decimals = 1):
    '''
    Try converting the value of `x` into a float, then rounding to the specified
    number of decimal places. Used when outputting `pandas.DataFrame` that contain
    columns full of `object` data types. If the value cannot be parsed as a float,
    the value is returned as-is.
    
    :param x: whatever we want to try to round
    :type x: obj
    
    :param num_decimals: the number of decimal places to round to
    :type num_decimals: int
    '''
    try:
        return(round(float(x), num_decimals))
    except ValueError:
        return(x)   

In [8]:
def add_and_sort_wiki_df(wiki_list, df, name_column = 'wiki', value_column = None):
    '''
    Takes the given list of wikis and compares them with the named column
    in the dataframe.
    
    If no value column is defined, it adds one empty row to the dataframe
    for each missing wiki.
    
    If a value column is defined, it identifies the unique values in that column
    and adds them for each wiki.
    
    Once the full new dataframe is completed, all NAs are replaced with 0,
    and the dataframe is sorted by the name and value columns.
    
    :param wiki_list: list of all the wikis we're expecting to have data for
    :type wiki_list: list
    
    :param df: dataframe with data, possibly missing some wikis.
    :type df: pandas.DataFrame
    
    :param name_column: column in the dataframe that contains wiki names
    :type name_column: str
    
    :param value_column: name of a column that holds values we should generate rows for.
    :type value_column: str
    '''

    ## We name a series out of the list of wikis, then use that to create a dataframe
    ## containing the name of any wiki not already in the named column. Then we set
    ## all the values to 0, and sort the resulting dataframe by the named column.

    wikis_s = pd.Series(wiki_list)
    
    if value_column is None:
        return(
            pd.concat([
                df,
                pd.DataFrame({name_column : wikis_s.loc[~wikis_s.isin(df[name_column])]})
            ]).fillna(0).sort_values(name_column))
    else:
        # Identify what wikis we're missing, and if we're not missing any just return
        # the existing dataframe
        missing_wikis = wikis_s.loc[~wikis_s.isin(df[name_column])]
        if missing_wikis.empty:
            return(df)
        
        ## From https://stackoverflow.com/a/26977495
        unique_values = pd.unique(df[value_column])

        new_df = pd.concat([
            df,
            pd.concat( # combine the results of the list comprehension
                [
                    pd.DataFrame( # for each element in the list, create a pandas.DataFrame
                        {name_column : [w] * len(unique_values), # repeat the wiki name to match the values
                         value_column : unique_values}) # add all the unique values of the value column
                    for w in wikis_s.loc[~wikis_s.isin(df[name_column])] # do this for every missing wiki
                ]
            )
        ])
        return(new_df.fillna(0).sort_values([name_column, value_column]))

In [45]:
def get_user_registrations(wiki, user_ids, slice_size = 1000):
    '''
    Query and return a `pandas.DataFrame` with columns `wiki`, `user_id`, and `user_registration`
    for all `user_ids` on the given `wiki`

    :param wiki: database code of the wiki we're querying
    :type wiki: str
    
    :param user_ids: the user IDs we're getting registration timestamps for
    :type prop: list
    
    :param slice_size: the number of users we'll query for on each iteration
    :type slice_size: int
    '''

    user_id_query = '''
    SELECT
        "{wiki}" AS wiki,
        user_id,
        user_registration
    FROM user
    WHERE user_id IN ({id_list})
    '''

    reg_df = pd.DataFrame()
    
    i = 0
    while i < len(user_ids):
        user_registrations = mariadb.run(user_id_query.format(
            wiki = wiki,
            id_list = ','.join([str(uid) for uid in user_ids[i:i+slice_size]])
        ), wiki)
        
        reg_df = pd.concat([reg_df, user_registrations])
        
        i += slice_size
    
    return(reg_df)

In [None]:
def make_user_page_list(df):
    '''
    Take a data frame with wikis, user IDs, and page IDs, and create a suitable SQL expression
    to filter on all three.
    '''
    
    df_subset = df.loc[(df['task_type'] == 'links') &
                       (df['num_edits'] == 0)]
    
    page_expressions = list()
    
    for wiki, user_id, page_id in zip(df_subset['wiki'], df_subset['user_id'], df_subset['page_id']):
        page_expressions.append(f'(wiki = "{wiki}" AND user_id = {user_id} AND page_id = {page_id})\n')
        
    return('OR\n'.join(page_expressions))

# Revert Rate

The way we've done this previously is to only look at user activity within 24 hours of registration, because that's when most of the visits to the Newcomer Homepage take place. In this case, we want to identify all users who visited the Homepage, clicked on either an Add a Link or unstructured link task, and saved an edit to that page. We'll then want to look at the revert rate overall for each type of task. As mentioned in the introduction, we split users based on when they registered.

Update on July 22: I removed the restriction on task type so it enables us to gather data across all task types. We can later filter on only Add a Link and the unstructured link task if needed. I also modified it to create a user registration category (pre/post) based on whether the user registered before or after deployment.

In [9]:
revert_query = '''
WITH hp_visits AS (
    SELECT
        hpv.wiki,
        hpv.event.user_id,
        hpv.event.homepage_pageview_token,
        hpv.dt AS event_dt
    FROM event.homepagevisit AS hpv
    WHERE {partition_statement}
    AND wiki IN ({wiki_list})
    AND ({known_user_id_expression})
    AND dt >= "{start_ts}" AND dt < "{end_ts}"
),
newcomer_tasks AS (
-- grab unique task token/task type data from newcomer tasks
    SELECT
        DISTINCT event.newcomer_task_token, event.task_type, event.page_id
    FROM event.newcomertask
    WHERE {partition_statement}
),
homepage_task_clicks AS (
    -- clicks to tasks in sessions found in hp_visits
    SELECT
        hpm.wiki,
        hpm.event.user_id,
        hpm.dt AS event_dt,
        str_to_map(hpm.event.action_data, ";", "=") AS action_data
    FROM event.homepagemodule AS hpm
    JOIN hp_visits
    ON hpm.event.homepage_pageview_token = hp_visits.homepage_pageview_token
    WHERE {partition_statement}
    AND event.action = "se-task-click"
    AND dt >= "{start_ts}" AND dt < "{end_ts}"
    AND dt > hp_visits.event_dt
),
postedit_task_clicks AS (
    -- clicks to tasks done after saving an edit
    SELECT
        hp.wiki,
        hp.event.user_id,
        hp.dt AS event_dt,
        str_to_map(hp.event.action_data, ";", "=") AS action_data
    FROM event.helppanel AS hp
    WHERE {partition_statement}
    AND wiki IN ({wiki_list})
    AND ({known_user_id_expression})
    AND event.action = "postedit-task-click"
    AND dt >= "{start_ts}" AND dt < "{end_ts}"

),
link_task_clicks AS (
-- filter task_clicks down to those on links and link recommendations
    SELECT
        task_clicks.wiki,
        task_clicks.user_id,
        task_clicks.event_dt,
        newcomer_tasks.page_id,
        newcomer_tasks.task_type,
        LEAD(task_clicks.event_dt, 1) OVER
            (PARTITION BY task_clicks.wiki, task_clicks.user_id, newcomer_tasks.page_id
             ORDER BY task_clicks.event_dt) AS next_click_dt
    FROM (
        SELECT
            *
        FROM homepage_task_clicks
        UNION ALL
        SELECT
            *
        FROM postedit_task_clicks) AS task_clicks
    JOIN newcomer_tasks
    ON task_clicks.action_data["newcomerTaskToken"] = newcomer_tasks.newcomer_task_token
),
edits AS (
-- edits and reverts (within 48 hours) of newcomer tasks
    SELECT
        `database` AS wiki,
        rev_id,
        FIRST_VALUE(page_id) AS page_id,
        FIRST_VALUE(performer.user_id) AS user_id,
        FIRST_VALUE(rev_timestamp) AS rev_timestamp,
        IF(FIRST_VALUE(performer.user_registration_dt) > "{start_ts}", "post", "pre")
            AS user_registration_cat,
        MAX(IF(array_contains(tags, 'mw-reverted') AND
               (unix_timestamp(meta.dt, "yyyy-MM-dd'T'HH:mm:ss'Z'") -
                unix_timestamp(rev_timestamp, "yyyy-MM-dd'T'HH:mm:ss'Z'") < 60*60*48), 1, 0)) AS was_reverted
    FROM event_sanitized.mediawiki_revision_tags_change
    WHERE {partition_statement}
    AND `database` IN ({wiki_list})
    AND ({known_user_database_expression})
    AND array_contains(tags, "newcomer task")
    GROUP BY wiki, rev_id
)
SELECT
    link_task_clicks.wiki,
    link_task_clicks.user_id,
    edits.user_registration_cat,
    link_task_clicks.task_type,
    COUNT(1) AS num_edits,
    SUM(edits.was_reverted) AS num_reverts
FROM link_task_clicks
JOIN edits
ON link_task_clicks.wiki = edits.wiki
AND link_task_clicks.user_id = edits.user_id
AND link_task_clicks.page_id = edits.page_id
WHERE (link_task_clicks.next_click_dt IS NULL
       OR link_task_clicks.event_dt != link_task_clicks.next_click_dt) -- removing duplicates
AND edits.rev_timestamp > link_task_clicks.event_dt
AND (
        (link_task_clicks.next_click_dt IS NOT NULL
         AND unix_timestamp(edits.rev_timestamp, "yyyy-MM-dd'T'HH:mm:ss'Z'") <
             unix_timestamp(link_task_clicks.next_click_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"))
    OR
        (unix_timestamp(edits.rev_timestamp, "yyyy-MM-dd'T'HH:mm:ss'Z'") -
         unix_timestamp(link_task_clicks.event_dt, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") < 60*60*24*7)
    )
GROUP BY link_task_clicks.wiki, link_task_clicks.user_id,
         edits.user_registration_cat, link_task_clicks.task_type
'''

In [10]:
link_task_edits_data = spark.run(
    revert_query.format(
        start_ts = exp_start_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        end_ts = exp_end_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        wiki_list = ','.join(['"{}"'.format(w) for w in wikis]),
        known_user_id_expression = make_known_users_sql(known_users, 'wiki', 'event.user_id'),
        known_userid_expression = make_known_users_sql(known_users, 'wiki', 'event.userid'),
        known_user_database_expression = make_known_users_sql(known_users,
                                                              '`database`', 'performer.user_id'),
        partition_statement = make_partition_statement(exp_start_ts, exp_end_ts)
    )
)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [None]:
link_task_edits_data.head(50)

Number of task edits, reverts, and revert rate in our dataset:

In [33]:
link_tasks_agg = (link_task_edits_data.groupby(['user_registration_cat', 'task_type'])
                  .agg({'num_edits' : 'sum', 'num_reverts' : 'sum'}))
link_tasks_agg['revert_rate'] = 100 * link_tasks_agg['num_reverts'] / link_tasks_agg['num_edits']
link_tasks_agg.round(1)

Unnamed: 0_level_0,Unnamed: 1_level_0,num_edits,num_reverts,revert_rate
user_registration_cat,task_type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
post,copyedit,160,45,28.1
post,expand,13,3,23.1
post,link-recommendation,290,28,9.7
post,links,63,22,34.9
post,references,5,2,40.0
post,update,6,0,0.0
pre,copyedit,115,11,9.6
pre,expand,15,0,0.0
pre,link-recommendation,958,49,5.1
pre,links,25,1,4.0


Exception in thread Thread-5:
Traceback (most recent call last):
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 1177, in run
    self.function(*self.args, **self.kwargs)
TypeError: stop_session() missing 1 required positional argument: 'session'



In [None]:
link_task_edits_data.sort_values('num_edits', ascending = False).head(24)

In [29]:
# For future reference: .loc[] operates on the index if there is one, meaning there's no need
# to define what we're matching on.
stats.chi2_contingency(
    link_tasks_agg.loc['post'].loc[['link-recommendation', 'links'], ['num_edits', 'num_reverts']]
)

(16.461800456213275,
 4.964022930903756e-05,
 1,
 array([[278.54590571,  39.45409429],
        [ 74.45409429,  10.54590571]]))

## Overall statistics

For the slide deck, for each wiki, aggregate the number of edits and editors.

In [34]:
overall_agg = (link_task_edits_data.groupby(['wiki', 'task_type'])
               .agg({'num_edits' : 'sum', 'num_reverts' : 'sum', 'user_id' : 'count'})
               .rename(columns = {'user_id' : 'num_editors'}))
overall_agg['revert_rate'] = 100 * overall_agg['num_reverts'] / overall_agg['num_edits']
overall_agg.round(1)

Unnamed: 0_level_0,Unnamed: 1_level_0,num_edits,num_reverts,num_editors,revert_rate
wiki,task_type,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
arwiki,copyedit,178,35,60,19.7
arwiki,expand,16,3,8,18.8
arwiki,link-recommendation,884,71,94,8.0
arwiki,links,51,13,17,25.5
arwiki,references,23,0,5,0.0
arwiki,update,47,0,6,0.0
bnwiki,copyedit,27,5,13,18.5
bnwiki,expand,1,0,1,0.0
bnwiki,link-recommendation,83,2,18,2.4
bnwiki,links,10,1,6,10.0


# Rejection Rate

Q: Do we count all rejections, or only those that are part of saved edits? Do we count all edit sessions, or only those made by new accounts?

A: We'll count edit sessions from all users, because in this case experienced users are going to be better able to determine if a suggested link is appropriate. We will, however, only count saved edits, because we want to ignore users loading up the editor and testing out Add a Link.

To make things easier, we'll use the structured task schema as our source of data because it has an `editsummary_save` event. Since we're relying on instrumented events, that's going to make it easier.

Also, the number of accepted, rejected, and skipped links is also saved in the edit summary of each edit. That data is, however, localized to each wiki so I'm not going to spend time digging those numbers out.

### Notes

There is the possibility that a user gets to the edit summary screen and then choses to go back and make changes. We'll treat this as two separate end states and count both. We do this because we expect the number of times this happens to be relatively low, and secondly that there isn't anything wrong with going back and changing your mind.

For the `skipall_dialog`, we'll need to go fetch their initial impression of the interface, because that's where the number of suggestions is stored.

To get registration categories into this and subsequent datasets, I grab `user_registration` from the `user` table in MariaDB. There's only 4 wikis and a limited number of users in the dataset, so that's not a costly operation.

In [39]:
rejection_rate_query = '''
WITH saved_edits AS (
    SELECT
        coalesce(lsi.dt, lsi.meta.dt) AS event_dt,
        lsi.homepage_pageview_token,
        hpv.wiki,
        hpv.event.user_id
    FROM event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    JOIN event.homepagevisit AS hpv
    ON lsi.homepage_pageview_token = hpv.event.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND {hpv_partition_statement}
    AND hpv.wiki IN ({wiki_list})
    AND ({known_user_id_expression})
    AND lsi.action = "editsummary_save"
    AND coalesce(lsi.dt, lsi.meta.dt) >= "{start_ts}" AND coalesce(lsi.dt, lsi.meta.dt) < "{end_ts}"
),
saved_rates AS ( -- grab accept/reject/skips for saved edits
    -- str_to_map() first splits on ";" (the pair delimiter),
    -- then splits each key/value pair on "=" (the key/value delimiter)
    SELECT
        lsi.homepage_pageview_token,
        saved_edits.user_id,
        saved_edits.wiki,
        coalesce(lsi.dt, lsi.meta.dt) as event_dt,
        str_to_map(lsi.action_data, ";", "=") AS rate_map
    FROM saved_edits
    JOIN event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    ON saved_edits.homepage_pageview_token = lsi.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND lsi.action = "impression"
    AND lsi.active_interface = "editsummary_dialog"
    AND coalesce(lsi.dt, lsi.meta.dt) >= "{start_ts}"
    AND coalesce(lsi.dt, lsi.meta.dt) < "{end_ts}"
    AND coalesce(lsi.dt, lsi.meta.dt) < saved_edits.event_dt
),
skip_alls AS (
    SELECT
        coalesce(lsi.dt, lsi.meta.dt) AS event_dt,
        lsi.homepage_pageview_token,
        hpv.wiki,
        hpv.event.user_id
    FROM event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    JOIN event.homepagevisit AS hpv
    ON lsi.homepage_pageview_token = hpv.event.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND {hpv_partition_statement}
    AND hpv.wiki IN ({wiki_list})
    AND ({known_user_id_expression})
    AND lsi.active_interface = "skipall_dialog"
    AND lsi.action = "confirm_skip_all_suggestions"
    AND coalesce(lsi.dt, lsi.meta.dt) >= "{start_ts}" AND coalesce(lsi.dt, lsi.meta.dt) < "{end_ts}"
),
skip_all_rates AS ( -- grab the suggestion count from start of the sessions
    SELECT
        lsi.homepage_pageview_token,
        skip_alls.user_id,
        skip_alls.wiki,
        coalesce(lsi.dt, lsi.meta.dt) as event_dt,
        -- building a map similar to what we have for saved edits
        map("accepted_count", "0", "rejected_count", "0",
            "skipped_count",
            str_to_map(lsi.action_data, ";", "=")["number_phrases_found"]) AS rate_map
    FROM skip_alls
    JOIN event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    ON skip_alls.homepage_pageview_token = lsi.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND lsi.action = "impression"
    AND lsi.active_interface = "machinesuggestions_mode"
    AND lsi.action = "impression"
    AND coalesce(lsi.dt, lsi.meta.dt) >= "{start_ts}"
    AND coalesce(lsi.dt, lsi.meta.dt) < "{end_ts}"
    AND coalesce(lsi.dt, lsi.meta.dt) < skip_alls.event_dt
),
rates_cast AS (
    SELECT
        wiki,
        user_id,
        homepage_pageview_token,
        CAST(rate_map['accepted_count'] AS INT) AS accepted_count,
        CAST(rate_map['rejected_count'] AS INT) AS rejected_count,
        CAST(rate_map['skipped_count'] AS INT) AS skipped_count
    FROM
        (SELECT
             *
         FROM saved_rates
         UNION ALL
         SELECT
             *
         FROM skip_all_rates) AS r
)
SELECT
    wiki,
    user_id,
    SUM(accepted_count) AS num_links_accepted,
    SUM(rejected_count) AS num_links_rejected,
    SUM(skipped_count) AS num_links_skipped,
    SUM(accepted_count + rejected_count + skipped_count) AS num_links_recommended,
    COUNT(1) AS num_edit_sessions,
    COUNT(IF(accepted_count = accepted_count + rejected_count + skipped_count, 1, NULL)) AS num_all_accepted,
    COUNT(IF(rejected_count = accepted_count + rejected_count + skipped_count, 1, NULL)) AS num_all_rejected,
    COUNT(IF(skipped_count = accepted_count + rejected_count + skipped_count, 1, NULL)) AS num_all_skipped
FROM rates_cast
GROUP BY wiki, user_id
'''

In [40]:
rate_data = spark.run(
    rejection_rate_query.format(
        start_ts = exp_start_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        end_ts = exp_end_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        wiki_list = ','.join(['"{}"'.format(w) for w in wikis]),
        known_user_id_expression = make_known_users_sql(known_users, 'wiki', 'hpv.event.user_id'),
        lsi_partition_statement = make_partition_statement(exp_start_ts, exp_end_ts, prefix = 'lsi'),
        hpv_partition_statement = make_partition_statement(exp_start_ts, exp_end_ts, prefix = 'hpv')
    )
)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [None]:
rate_data.loc[rate_data['wiki'] == 'arwiki'].sort_values('num_edit_sessions')

Get registration timestamps for these users and join with the rate data.

In [46]:
rate_registrations = pd.concat(
    [get_user_registrations(wiki, rate_data.loc[rate_data['wiki'] == wiki, 'user_id']) for wiki in wikis]
)

In [49]:
rate_registrations['user_registration_ts'] = pd.to_datetime(rate_registrations['user_registration'],
                                                            format = '%Y%m%d%H%M%S')

In [51]:
rate_registrations['user_registration_cat'] = rate_registrations['user_registration_ts'].apply(
    lambda x: 'post' if x > exp_start_ts else 'pre')

In [54]:
len(rate_data)

160

In [61]:
rate_data = rate_data.merge(rate_registrations, on = ['wiki', 'user_id'])

In [62]:
len(rate_data)

160

Now, let's aggregate to get rates:

In [None]:
rate_data.head()

In [72]:
rate_data_agg = (rate_data.groupby('user_registration_cat')
                 .agg({'user_id' : 'count', 'num_links_accepted' : 'sum', 'num_links_rejected' : 'sum',
                       'num_links_skipped' : 'sum', 'num_links_recommended' : 'sum'}))
rate_data_agg['perc_links_accepted'] = (100.0 * rate_data_agg['num_links_accepted'] /
                                        rate_data_agg['num_links_recommended'])
rate_data_agg['perc_links_rejected'] = (100.0 * rate_data_agg['num_links_rejected'] /
                                        rate_data_agg['num_links_recommended'])
rate_data_agg['perc_links_skipped'] = (100.0 * rate_data_agg['num_links_skipped'] /
                                        rate_data_agg['num_links_recommended'])
rate_data_agg.round(1)

Unnamed: 0_level_0,user_id,num_links_accepted,num_links_rejected,num_links_skipped,num_links_recommended,perc_links_accepted,perc_links_rejected,perc_links_skipped
user_registration_cat,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1
post,96,597,125,103,825,72.4,15.2,12.5
pre,64,1464,595,189,2248,65.1,26.5,8.4


Exception in thread Thread-7:
Traceback (most recent call last):
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 1177, in run
    self.function(*self.args, **self.kwargs)
TypeError: stop_session() missing 1 required positional argument: 'session'



We also want to calculate how often users accept all links or not:

In [73]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                  (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])])

10

In [74]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre')])

64

In [76]:
round(100 *
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                       (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])]) /
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre')]), 1)

15.6

So 15.6% of users who registered prior to the experiment have only sessions where they've accepted all the recommended links. We can check how many edit sessions these users have:

In [79]:
rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum()

22

In [77]:
rate_data.loc[(rate_data['user_registration_cat'] == 'pre'), 'num_edit_sessions'].sum()

477

In [80]:
round(100 *
     rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum() /
     rate_data.loc[(rate_data['user_registration_cat'] == 'pre'), 'num_edit_sessions'].sum(), 1)


4.6

They only account for 22 out of 477 edit sessions, or 4.6% of this data.

We then do a similar analysis for users registered after deployment:

In [84]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                  (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])])

31

In [85]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'post')])

96

In [86]:
round(100 *
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                       (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])]) /
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'post')]), 1)

32.3

In [81]:
rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum()

36

In [82]:
rate_data.loc[(rate_data['user_registration_cat'] == 'post'), 'num_edit_sessions'].sum()

199

In [83]:
round(100 *
     rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum() /
     rate_data.loc[(rate_data['user_registration_cat'] == 'post'), 'num_edit_sessions'].sum(), 1)


18.1

So 31 out of 96 users registered after deployment (32.3%) have only edit sessions where they've accepted all suggested links. These do not edit as much as the other users, though, since they only account for 36 out of 199 edit sessions (18.1%).

In [None]:
round(100 *
     len(rate_data.loc[rate_data['num_edit_sessions'] == rate_data['num_all_accepted']]) /
     len(rate_data), 1)

So about 27% of users have only edit sessions where they've accepted all the recommended links.

## Restricting it to >= 5 Edit Sessions

Because most of the users who only accepted links had only a few sessions, what happens when it restrict it to those who had a least 5 sessions?

In [87]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                  (rate_data['num_edit_sessions'] >= 5) &
                  (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])])

0

In [88]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                 (rate_data['num_edit_sessions'] >= 5)])

6

In [90]:
round(100 *
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                       (rate_data['num_edit_sessions'] >= 5) &
                       (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])]) /
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                      (rate_data['num_edit_sessions'] >= 5)]), 1)

0.0

In [91]:
rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
              (rate_data['num_edit_sessions'] >= 5) &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum()

0

In [92]:
rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
              (rate_data['num_edit_sessions'] >= 5), 'num_edit_sessions'].sum()

76

In [93]:
round(100 *
     rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                   (rate_data['num_edit_sessions'] >= 5) &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum() /
     rate_data.loc[(rate_data['user_registration_cat'] == 'post') &
                   (rate_data['num_edit_sessions'] >= 5), 'num_edit_sessions'].sum(), 1)


0.0

For users registered after deployment, none of those with more than 5 or more edit sessions have sessions where they accepted all links.

How does this play out for pre-deployment?

In [94]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                  (rate_data['num_edit_sessions'] >= 5) &
                  (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])])

1

In [95]:
len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                 (rate_data['num_edit_sessions'] >= 5)])

19

In [96]:
round(100 *
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                       (rate_data['num_edit_sessions'] >= 5) &
                       (rate_data['num_edit_sessions'] == rate_data['num_all_accepted'])]) /
     len(rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                      (rate_data['num_edit_sessions'] >= 5)]), 1)

5.3

In [97]:
rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
              (rate_data['num_edit_sessions'] >= 5) &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum()

7

In [98]:
rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
              (rate_data['num_edit_sessions'] >= 5), 'num_edit_sessions'].sum()

382

In [99]:
round(100 *
     rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                   (rate_data['num_edit_sessions'] >= 5) &
              (rate_data['num_edit_sessions'] == rate_data['num_all_accepted']), 'num_edit_sessions'].sum() /
     rate_data.loc[(rate_data['user_registration_cat'] == 'pre') &
                   (rate_data['num_edit_sessions'] >= 5), 'num_edit_sessions'].sum(), 1)


1.8

On the pre-deployment side, 1 of 19 users (5.3%) have 5 or more edit sessions and accepted all links. This user had 7 edit sessions, which is 1.8% of the 382 edit sessions overall.

# Task Completion Rate

Out of all users who start an Add a Link task, what proportion completes it?

We define the start of a Add a Link task session as the impression of the "machine suggestions" mode in Visual Editors. Completion is defined as either clicking to save the edit at the edit summary dialog, as before, or confirming that they've skipped all suggestions. We've confirmed that if a user rejects all suggestions and clicks "Submit" on the edit summary, the event logged is the same as if they accepted any recommended links.

In [100]:
task_completion_query = '''
WITH task_init AS (
    -- select sessions where the user started
    SELECT
        coalesce(lsi.dt, lsi.meta.dt) AS event_dt,
        lsi.homepage_pageview_token,
        hpv.wiki,
        hpv.event.user_id
    FROM event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    JOIN event.homepagevisit AS hpv
    ON lsi.homepage_pageview_token = hpv.event.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND {hpv_partition_statement}
    AND hpv.wiki IN ({wiki_list})
    AND ({known_user_id_expression})
    AND lsi.active_interface = "machinesuggestions_mode"
    AND lsi.action = "impression"
    AND coalesce(lsi.dt, lsi.meta.dt) >= "{start_ts}" AND coalesce(lsi.dt, lsi.meta.dt) < "{end_ts}"
),
task_completion AS (
    -- session where the user clicked "save"
    SELECT
        coalesce(lsi.dt, lsi.meta.dt) AS event_dt,
        lsi.homepage_pageview_token,
        1 AS saved_edit
    FROM event.mediawiki_structured_task_article_link_suggestion_interaction AS lsi
    JOIN task_init
    ON lsi.homepage_pageview_token = task_init.homepage_pageview_token
    WHERE {lsi_partition_statement}
    AND (
            (lsi.active_interface = "editsummary_dialog"
             AND lsi.action = "editsummary_save")
        OR
            (lsi.active_interface = "skipall_dialog"
             AND lsi.action = "confirm_skip_all_suggestions")
        )
    AND coalesce(lsi.dt, lsi.meta.dt) > task_init.event_dt
)
SELECT
    task_init.wiki,
    task_init.user_id,
    1 AS started_task,
    MAX(coalesce(task_completion.saved_edit, 0)) AS completed_task
FROM task_init
LEFT JOIN task_completion
ON task_init.homepage_pageview_token = task_completion.homepage_pageview_token
GROUP BY task_init.wiki, task_init.user_id
'''

In [101]:
task_completion_data = spark.run(
    task_completion_query.format(
        start_ts = exp_start_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        end_ts = exp_end_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        wiki_list = ','.join(['"{}"'.format(w) for w in wikis]),
        known_user_id_expression = make_known_users_sql(known_users, 'wiki', 'hpv.event.user_id'),
        lsi_partition_statement = make_partition_statement(exp_start_ts, exp_end_ts, prefix = 'lsi'),
        hpv_partition_statement = make_partition_statement(exp_start_ts, exp_end_ts, prefix = 'hpv')
    )
)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [None]:
task_completion_data.head()

We'll get registrations for these users in the same way as we did for the rate data:

In [103]:
task_registrations = pd.concat(
    [get_user_registrations(
        wiki,
        task_completion_data.loc[task_completion_data['wiki'] == wiki, 'user_id']) for wiki in wikis]
)

In [105]:
task_registrations['user_registration_ts'] = pd.to_datetime(task_registrations['user_registration'],
                                                            format = '%Y%m%d%H%M%S')

In [106]:
task_registrations['user_registration_cat'] = task_registrations['user_registration_ts'].apply(
    lambda x: 'post' if x > exp_start_ts else 'pre')

In [107]:
len(task_completion_data)

279

In [108]:
task_completion_data = task_completion_data.merge(task_registrations, on = ['wiki', 'user_id'])

In [109]:
len(task_completion_data)

279

We can now aggregate across registration category and get numbers. Since the dataset is on a per-user basis, we know that overall: 279 users started a task.

In [114]:
task_completion_agg = (task_completion_data.groupby('user_registration_cat')
                       .agg({'started_task' : 'sum', 'completed_task' : 'sum'}))
task_completion_agg['perc_completed'] = (100 * task_completion_agg['completed_task'] /
                                         task_completion_agg['started_task'])
task_completion_agg.round(1)

Unnamed: 0_level_0,started_task,completed_task,perc_completed
user_registration_cat,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
post,178,96,53.9
pre,101,64,63.4


Exception in thread Thread-8:
Traceback (most recent call last):
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 926, in _bootstrap_inner
    self.run()
  File "/home/nettrom/.conda/envs/2021-05-03T16.30.23_nettrom/lib/python3.7/threading.py", line 1177, in run
    self.function(*self.args, **self.kwargs)
TypeError: stop_session() missing 1 required positional argument: 'session'



We note that the number of users in both categories is the same as the number of users in the previous step (task acceptance).

In both cases, we see that the proportion of tasks completed relative to the number of tasks started is lower than the 75% threshold listed in the measurement plan.

# Clarifying Questions

What's the number of Add a Link edits in total?

In [None]:
add_link_tag_query = '''
SELECT
    `database` AS wiki,
    COUNT(DISTINCT rev_id) AS num_edits
    FROM event_sanitized.mediawiki_revision_tags_change
    WHERE {partition_statement}
    AND `database` IN ({wiki_list})
    AND ({known_user_database_expression})
    AND array_contains(tags, "newcomer task add link")
    AND rev_timestamp >= "{start_ts}" AND rev_timestamp < "{end_ts}"
    GROUP BY `database`
'''

In [None]:
tagged_edit_counts = spark.run(
    add_link_tag_query.format(
        start_ts = exp_start_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        end_ts = exp_end_ts.strftime('%Y-%m-%dT%H:%M:%S'),
        wiki_list = ','.join(['"{}"'.format(w) for w in wikis]),
        known_user_database_expression = make_known_users_sql(known_users, '`database`', 'performer.user_id'),
        partition_statement = make_partition_statement(exp_start_ts, exp_end_ts)
    )
)        

In [None]:
tagged_edit_counts

In [None]:
tagged_edit_counts['num_edits'].sum()