# Welcome Survey Aggregation

We aggregate Welcome Survey responses from all wikis where the survey is deployed and store the data in two tables in Hive.

Note that this notebook makes the following assumptions:

1. The MediaWiki database is the authoritative source of welcome survey data.
2. We identify responses by when they were made, not when the user registered.
3. Users have to have registered in the same month as we are aggregating for, or some time in the previous month.
4. We use the ServerSideAccountCreation schema to identify whether the user registered on the desktop or mobile site. If the user is not found in this schema, they are assumed to have registered on the desktop site.
5. Known test accounts from Growth Team members are not counted.
6. We are measuring *responses* to the Welcome Survey, not how accounts were created. That means that a user who responded to the survey is assumed to have seen it and been able to respond to it in a normal way (i.e. through the normal account creation process).

As the Growth team expands coverage to more wikis, this notebook is set up to automatically discover and grab data from those wikis. This is done by getting the wiki configuration from WMFs servers and for each Wikipedia wiki checking if there's any Welcome Survey data stored in the database. If there is, the data gets processed and stored in the aggregate tables. Since this notebook is run once a month, this overhead of checking every Wikipedia isn't problematic.

We also coalesce the aggregate tables at the end of the data update into a single file directly through Spark. This is done in order to speed up subsequent queries of the data.

## FIXME

Now that the Growth features are on all the wikis, it would be a lot faster to search through `ServerSideAccountCreation` *once* and then reuse that dataset, rather than query that dataset for every wiki. This worked great when we were on few wikis, and was designed to work great under those conditions. Now those conditions changes, and we should update. We could also create a view of the last two months of registrations and cache that view, then query that.

Note that even with caching of ServerSideAccountCreation in Spark, we're still looking at about forty minutes of wall time to complete this. There's north of 200 wikis in the list, so that's around 10–12 seconds per wiki (which seems reasonable). I think having the caching is useful, but I also think it's important to change the timeout for the notebook to 2 hours.

## Libraries

In [1]:
import re
import sys
import json
import logging
import datetime as dt
import pandas as pd

from collections import defaultdict

import os.path
import requests

from wmfdata import spark, mariadb

## Configuration variables

Let's define some configuration variables. First, the names of control and experiment groups we've used in Welcome Survey experiments.

In [2]:
## Definition of survey groups and control groups
control_groups = set(['exp1_group2', 'NONE'])
survey_groups = set(['exp2_target_popup', 'exp1_group1', 'exp2_target_specialpage'])

Second, definitions of the questions in the survey. This dictionary maps the name of a question to either a set of possible responses, an iterator with possible responses, or an empty set. If a set or iterator of responses is defined those will be aggregated, and all non-matching responses aggregated as "other". If an empty set is defined, the script will figure out if the responses are a list and aggregate over it, otherwise it accepts any value as a valid response and aggregates over all values.

In [3]:
question_specs = {
    # "Why did you create your account today?"
    'reason' : set([
        'add-image',
        'program-participant',
        'placeholder',
        'other',
        'edit-info-add-change',
        'read',
        'edit-info',
        'edit-typo',
        'new-page'
    ]),
    # "Have you ever edited Wikipedia?"
    'edited' : set([
        'placeholder',
        'yes-many',
        'dont-remember',
        'yes-few',
        'no-other',
        'dunno'
    ]),
    # Wikipedia is available in nearly 300 languages. Are there other languages you read and write in?
    'languages' : set()
}

The database tables in Hive, we store overview counts (save/skip/abandon) in one table, and aggregates of the responses in the other.

In [4]:
overview_table = 'growth_welcomesurvey.monthly_overview'
response_table = 'growth_welcomesurvey.survey_response_aggregates'

The name of the temporary view of ServerSideAccountCreation that we create, which will later be used to grab whether the registration happened on desktop or mobile.

In [5]:
ssac_temp_view = 'ssac_temp_view'

The PHP files where MediaWiki language codes and language names are stored.

In [6]:
lang_urls = [
    "https://raw.githubusercontent.com/wikimedia/mediawiki-extensions-cldr/master/CldrNames/CldrNamesEn.php",
    "https://raw.githubusercontent.com/wikimedia/mediawiki-extensions-cldr/master/LocalNames/LocalNamesEn.php"
]

For reference, the create table statement used to create the tables:

In [7]:
create_overview_table_statement = '''
CREATE TABLE {overview_table}
(
    wiki STRING COMMENT "the wiki we gathered data for",
    log_month DATE COMMENT "the month surveys were responded to (or rendered, if skipped)",
    platform STRING COMMENT "the platform the user registered on (desktop or mobile)",
    user_group STRING COMMENT "the user group the user was in (control or survey)",
    survey_response STRING COMMENT "the response to the survey (save/skip/abandon)",
    num_responses INT COMMENT "the number of responses"
)
'''

create_table_statement = '''
CREATE TABLE {aggregate_table}
(
    wiki STRING COMMENT "the wiki we gathered data for",
    log_month DATE COMMENT "the month surveys were responded to (or rendered, if skipped)",
    platform STRING COMMENT "the platform the user registered on (desktop or mobile)",
    q_name STRING COMMENT "the name of the question (as used in the HTML form)",
    q_response STRING COMMENT "the response to the question",
    num_responses INT COMMENT "the number of responses"
)
'''

## Functions for Finding All Wikipedia Wikis

In [8]:
# Helper functions for retrieving wikis

def get_dblist(list_name):
    list_url = ("https://noc.wikimedia.org/conf/dblists/" + list_name + ".dblist")
    list_content = requests.get(list_url).text.split("\n")
    return(pd.Series(list_content))

def get_lang_names(url):
    r = requests.get(url)
    m = re.search(r"languageNames = (\[[\s\S]+?\])", r.text)
    php_ln = m.group(1)
    
    json_ln = php_ln
    repl = [
        # Convert from PHP array format to JSON
        (" =>", ":"),
        ("\[", "{"),
        ("\]", "}"),
        # Trailing commas will cause problems
        (",\n}", "\n}"),
        # ...so will single quotes
        ("'", '"'),
        # ...and comments
        (r"/\*[\s\S]*?\*/", ""),
        (r"#(.*?)\n", ""),
        # One hack to deal with a single quote in a language name
        ('O"odham', "O'odham")
    ]
    for old, new in repl:
        json_ln = re.sub(old, new, json_ln)
    
    py_ln = json.loads(json_ln)
    return(py_ln)

def apply_to_index(df, true_list, true_label, false_label):
    idx_ser = df.index.to_series()
    return(idx_ser.isin(true_list).apply(lambda x: true_label if x else false_label))


In [9]:
def get_wikis():
    '''
    Returns a `pandas.DataFrame` with an overview of all currently defined wikis,
    their status (open/closed), and visibility (public/private).
    '''
    
    wiki_query = '''
SELECT
    site_global_key AS database_code,
    CONCAT(TRIM(LEADING "." FROM REVERSE(site_domain))) AS domain_name,
    site_group AS database_group,
    site_language AS language_code
FROM enwiki.sites'''
    
    wikis = mariadb.run(wiki_query, 'enwiki').sort_values("database_code").set_index("database_code")
    
    langs = {}
    for url in lang_urls:
        langs.update(get_lang_names(url))

    # Add languages not included in the CLDR files
    langs.update({
        "als": "Alsatian",
        "atj": "Atikamekw",
        "diq": "Zazaki",
        "fiu-vro": "Võro",
        "map-bms": "Banyumasan",
        "nah": "Nahuatl",
        "pih": "Norfuk-Pitkern",
        "rmy": "Vlax Romani",
        "simple": "Simple English"
    })

    wikis["language_name"] = wikis["language_code"].apply(langs.get)
    
    closed = get_dblist("closed")
    private = get_dblist("private")
    
    wikis = (
        wikis
        .assign(
            status=lambda df: apply_to_index(df, closed, "closed", "open"),
            visbility=lambda df: apply_to_index(df, private, "private", "public")
        )
    )
    
    return(wikis)

## Defining Known Users to Exclude

In [10]:
def get_known_users(wiki, username_patterns):
    '''
    For the given wiki, get the user IDs of accounts with a user name
    matching any of the username patterns.
    '''
    
    known_user_query = '''
SELECT user_id
FROM user
WHERE user_name LIKE "{name_pattern}%"
'''

    known_users = set()
    for u_pattern in username_patterns:
        new_known = mariadb.run(known_user_query.format(
            name_pattern = u_pattern), wiki)
        known_users = known_users | set(new_known['user_id'])

    # ok, done
    return(known_users)

In [11]:
## Username patterns of known test accounts, mainly Growth team members
known_user_patterns = ["MMiller", "Zilant", "Roan", "KHarlan", "MWang", "SBtest",
                       "Cloud", "Rho2019", "KacemMhenni", "Test"]

## Set of known users, can be initialized with known accounts you want to make sure are ignored
known_users = defaultdict(set)

## Here are some known users from our four initial target wikis that we'll skip 
known_users['arwiki'].update([237660, 272774, 775023, 1175449, 1186377, 1506091, 1515147, 1538902,
                      1568858, 1681813, 1683215, 1699418, 1699419, 1699425])
known_users['cswiki'].update([303170, 342147, 349875, 44133, 100304, 307410, 439792, 444907, 454862,
                      456272, 454003, 454846, 92295, 387915, 398470, 416764, 44751, 132801,
                      137787, 138342, 268033, 275298, 317739, 320225, 328302, 339583, 341191,
                      357559, 392634, 398626, 404765, 420805, 429109, 443890, 448195, 448438,
                      453220, 453628, 453645, 453662, 453663, 453664, 440694, 427497, 272273,
                      458025, 458487, 458049, 59563, 118067, 188859, 191908, 314640, 390445,
                      451069, 459434, 460802, 460885])
known_users['kowiki'].update([303170, 342147, 349875, 189097, 362732, 384066, 416362, 38759, 495265,
                      515553, 537326, 566963, 567409, 416360, 414929, 470932, 472019, 485036,
                      532123, 558423, 571587, 575553, 576758])
known_users['viwiki'].update([451842, 628512, 628513, 680081, 680083, 680084, 680085, 680086, 355424,
                      387563, 443216, 682713])

## Functions to Get and Process Survey Responses

In [12]:
def get_responses(month, wiki, known_user_set):
    '''
    Get all survey responses from the given wiki that were either submitted
    or rendered (for abandoned responses) in the given month, excluding any
    response from a user with a user ID in `known_user_set`
    
    :param month: (the date of the first day of) the month we are getting data for
    :type month: datetime.date
    
    :param wiki: the database code of the wiki we are getting data for
    :type wiki: str
    
    :param known_user_set: set of user IDs of known users that we exclude
    :type known_user_set: set
    
    Returns a pandas.DataFrame with non-decoded JSON data.
    '''

    ## Get all Welcome Survey data where the response (skip/save)
    ## was made in the given month, or the survey was rendered
    ## in the given month (survey abandoned, or control group members).
    ##
    ## User has to have registered either in the same month or the previous month.
    
    ## Grabbing the first day of the previous month
    prev_month = month - dt.timedelta(days = 1)
    prev_month = prev_month.replace(day = 1)
    
    ## Getting the first day of the next month
    if month.month == 12:
        next_month = month.replace(year = month.year + 1, month = 1)
    else:
        next_month = month.replace(month = month.month + 1)

    ws_data_query = '''
SELECT
    up_user AS user_id,
    up_value AS survey_data
FROM user_properties
JOIN user
ON up_user = user_id
WHERE user_registration >= "{prev_month_start}"
AND user_registration < "{next_month_start}"
AND up_property = "welcomesurvey-responses"
AND (
        (
         json_value(up_value, '$._submit_date') IS NULL -- not responded
         AND CAST(json_value(up_value, '$._render_date') AS CHAR CHARACTER SET utf8)
             REGEXP "^{month}") -- month of rendering matches
    OR
        (
         CAST(json_value(up_value, '$._submit_date') AS CHAR CHARACTER SET utf8)
             REGEXP "^{month}" -- month of submission matches
        )
    )
'''
    
    if known_user_set:
        ws_data_query = '''{query}
        AND up_user NOT IN ({id_list})'''.format(
            query = ws_data_query, id_list = ','.join([str(uid) for uid in known_user_set]))

    responses = mariadb.run(
        ws_data_query.format(
            month = month.strftime('%Y%m'),
            prev_month_start = prev_month.strftime('%Y%m%d000000'),
            next_month_start = next_month.strftime('%Y%m%d000000')
        ), wiki)
    
    return(responses)

In [13]:
def get_cached_mobile_flags(month, spark_session):
    '''
    Sets up a query of ServerSideAccountCreation to get registration data for the
    month we're getting data for as well as the month prior, then caches the result.
    Returns a cached Spark DataFrame that can later be queried for a specific wiki.
    
    :param month: (the date of the first day of) the month we are getting data for
    :type month: datetime.date

    :param spark_session: the PySpark session we're working with
    :type spark_session: pyspark.SparkSession
    '''
    
    ssac_query = '''
SELECT
    wiki AS wiki_db,
    event.userid AS user_id,
    CAST(event.displaymobile AS INT) AS reg_on_mobile
FROM event_sanitized.serversideaccountcreation
WHERE ((year = {prev_year} AND month = {prev_month})
    OR (year = {cur_year} AND month = {cur_month}))
    '''
    
    ## This expects `month` to be the first day of the month. It should be.
    last_month = month - dt.timedelta(days = 1)
    
    ssac_df = spark_session.sql(
        ssac_query.format(
            prev_year = last_month.year,
            prev_month = last_month.month,
            cur_year = month.year,
            cur_month = month.month
        )
    ).cache()
    
    return(ssac_df)

In [14]:
def get_mobile_flags(df, wiki_name):
    '''
    For all users who registered on the given wiki in the current or previous month,
    get the displayMobile flag of their registration from the cached search of registrations.

    `get_cached_mobile_flags()` sets up the caching.
    
    Returns only the user ID and whether the user registered on desktop or mobile,
    as those are the two columns expected from `store_data()` because it's iterating
    over each wiki.
    
    :param df: The cached DataFrame of ServerSideAccountCreation registration data
    :type df: pyspark.sql.DataFrame

    :param wiki_db: the database name of the wiki we are getting data for
    :type wiki_db: str
    '''

    return(df.filter(df.wiki_db == wiki_name).select('user_id', 'reg_on_mobile').toPandas())

In [15]:
def process_responses(df):
    '''
    Process the survey responses found in the given DataFrame `df` and determine what
    group (control or survey) the user was in, and whether they saved, skipped, or
    abandoned the survey.
    
    Returns a `pandas.DataFrame` with five columns:
    user id, desktop/mobile, user group, response (save/skip/abandon), 
    '''
    
    groups = []
    userids = []
    responses = []
    render_timestamps = []
    submit_timestamps = []
    
    for row in df.itertuples():
        user_id = row.up_user
        response = json.loads(row.up_value)
        
        userids.append(user_id)
        
        if response['_group'] == 'exp2_target_popup':
            groups.append('C')
        elif response['_group'] == 'exp2_target_specialpage':
            groups.append('treatment')
        elif response['_group'] == 'exp1_group1':
            groups.append('target')
        elif response['_group'] == 'exp1_group2':
            groups.append('control')
        elif response['_group'] == 'NONE':
            groups.append('control')
            
        if not '_render_date' in response \
            or not response['_render_date']:
            render_timestamps.append(None)
        else:
            render_timestamps.append(dt.datetime.strptime(response['_render_date'],
                                                          '%Y%m%d%H%M%S'))
            
        if not '_submit_date' in response:
            responses.append('abandon')
            submit_timestamps.append(None)
            continue
        else:
            submit_timestamps.append(dt.datetime.strptime(response['_submit_date'],
                                                          '%Y%m%d%H%M%S'))
            if '_skip' in response and response['_skip'] == True:
                responses.append('skip')
            else:
                responses.append('save')
            
    return(pd.DataFrame(
        {'group': groups,
         'user_id': userids,
         'response': responses,
         'render_ts' : render_timestamps,
         'submit_ts' : submit_timestamps}))

In [16]:
def get_toplines(rdf, c_groups, s_groups, wiki, month):
    '''
    Summarize the responses (which should be from a specific wiki and month)
    and return the summaries as a dictionary that can be used in a query
    to be inserted into Hive (or some other type of storage).
    
    :param rdf: survey responses to process
    :type rdf: pandas.DataFrame
    
    :param c_groups: set of names of the control group(s) used
    :type c_groups: set
    
    :param s_groups: set of names of the survey group(s) used
    :type s_groups: set
    
    :param wiki: name of the wiki we're summarizing for
    :type wiki: str
    
    :param month: first day of the month we're summarizing for
    :type month: datetime.date
    '''
    
    ## Columns in the output pandas.DataFrame
    ## 1. wiki (str)
    ## 2. month (datetime.date)
    ## 3. platform (str, desktop/mobile)
    ## 4. user group (str, survey/control)
    ## 5. survey response (str, save/skip/abandon)
    ## 6. number of responses (int)

    ## Define the preferred order of the columns in the returned pandas.DataFrame
    column_order = ['wiki', 'log_month', 'platform',
                    'user_group', 'survey_response', 'num_responses']

    user_ids = list() # used for counting
    platforms = list()
    user_groups = list()
    responses = list()
    
    for row in rdf.itertuples():
        user_ids.append(row.user_id)
        
        platform = 'desktop'
        if(row.reg_on_mobile):
            platform = 'mobile'
        platforms.append(platform)
        
        response = json.loads(row.survey_data)
        
        if response['_group'] in c_groups:
            user_groups.append('control')
            responses.append('N/A')
        elif response['_group'] in s_groups:
            user_groups.append('survey')
            if not '_submit_date' in response:
                responses.append('abandon')
            else:
                if '_skip' in response and response['_skip'] == True:
                    responses.append('skip')
                else:
                    responses.append('save')
        else:
            user_groups.append('unknown')
            responses.append('N/A')
        
    res = pd.DataFrame({
        'user_id': user_ids,
        'platform' : platforms,
        'user_group' : user_groups,
        'survey_response' : responses
    })
    
    res_agg = (res.groupby(['platform', 'user_group', 'survey_response'])
               .agg({'user_id': 'count'})
               .reset_index()
               .rename(columns = {'user_id': 'num_responses'}))
    
    ## Add wiki and month columns
    res_agg['wiki'] = wiki
    res_agg['log_month'] = month
    
    ## Now, return the aggregate with the columns in the preferred order
    return(res_agg[column_order])

In [17]:
def get_q(rdf, s_groups, q_name, q_responses, wiki, month):
    '''
    Process responses to a given question given by users in one of the survey groups.
    If a user has entered a response that is invalid, we capture that and label
    the response as 'other'.
    
    :param rdf: survey responses
    :type rdf: pandas.DataFrame
    
    :param s_groups: set of names of the survey group(s) used, users not in those groups are skipped
    :type s_groups: set
    
    :param q_name: name of the question (the key in the response dictionary)
    :type q_name: str
    
    :param q_responses: valid responses to the question
    :type q_responses: iterator
    
    :param wiki: name of the wiki we're summarizing for
    :type wiki: str
    
    :param month: first day of the month we're summarizing for
    :type month: datetime.date
    '''
    
    ## Columns in the output pandas.DataFrame
    ## 1. wiki (str)
    ## 2. month (datetime.date)
    ## 3. platform (str, desktop/mobile)
    ## 4. question name
    ## 5. question response
    ## 6. number of responses (int)
    
    ## Define the preferred order of the columns in the returned pandas.DataFrame
    column_order = ['wiki', 'log_month', 'platform',
                    'q_name', 'q_response', 'num_responses']
   
    user_ids = list() # used for counting
    platforms = list()
    names = list()
    responses = list()
    
    for row in rdf.itertuples():
        response = json.loads(row.survey_data)
        
        ## only counting responses from survey group users
        if response['_group'] not in s_groups:
            continue
        
        ## only counting users who saved the survey
        if '_skip' in response or not '_submit_date' in response:
            continue
        
        ## We have four possibilities here:
        ## 1: the question name isn't in the response: we record a "N/A"
        ## 2: the response is a list: we ignore the config and record the entire list of answers
        ## 3: we have a set of defined responses: we check against it and record "other" if no match
        ## 4: otherwise, we record the response and keep going
        
        try:
            res = response[q_name]
            if isinstance(res, list):
                cur_response = res
            elif q_responses:
                if res in q_responses:
                    cur_response = [res]
                else:
                    cur_response = ['other']
            else:
                cur_response = [res]
        except KeyError:
            cur_response = ['N/A']

        ## Note: we make all these into a list of length 1, because we can then multiply them
        ## to make their length match the length of the response:

        cur_user_id = [row.user_id]
        cur_platform = ['desktop']
        if(row.reg_on_mobile):
            cur_platform = ['mobile']
        cur_q_name = [q_name]

        ## Et voila!
        user_ids.extend(cur_user_id * len(cur_response))
        platforms.extend(cur_platform * len(cur_response))
        names.extend(cur_q_name * len(cur_response))
        responses.extend(cur_response)
        
    res = pd.DataFrame({
        'user_id': user_ids,
        'platform' : platforms,
        'q_name' : names,
        'q_response' : responses
    })
    
    res_agg = (res.groupby(['platform', 'q_name', 'q_response'])
               .agg({'user_id': 'count'})
               .reset_index()
               .rename(columns = {'user_id': 'num_responses'}))
    
    ## The survey responses are defined to be strings, so we enforce that data type
    res_agg['q_response'] = res_agg['q_response'].astype('string')
    
    ## Add wiki and month columns
    res_agg['wiki'] = wiki
    res_agg['log_month'] = month
    
    ## Now, return the aggregate with the columns in the preferred order
    return(res_agg[column_order])

In [19]:
## Main function to store data for all wikis for a given month

def store_data(month, wikis, known_users,
               control_groups, survey_groups, q_config, overview_table, aggregate_response_table):
    '''
    Process survey responses from the given `month` on every wiki in `wikis`, excluding
    responses from test accounts (and other non-relevant respondents). Test accounts can
    be defined by wiki and page ID in `known_users` (a defaultdict of set).
    
    :param month: the first day of the month we're getting data for (e.g. 2019-11-01)
    :type month: datetime.date
    
    :param known_users: dictionary mapping wiki to a set of known users by user ID
    :type known_users: collections.defaultdict
    
    :param control_groups: names of the control groups used in the data
    :type control_groups: set
    
    :param survey_groups: names of the survey groups used in the data
    :type survey_groups: set
    
    :param q_config: mapping of survey questions to valid responses
                    (see previous documentation earlier in the notebook)
    :type q_config: dict
    
    :param overview_table: name of the database table in which to store the monthly toplines
    :type overview_table: str
    
    :param aggregate_response_table: name of the database table in which to store the
                                     per-question aggregated responses
    :type aggregate_response_table: str
    '''
    
    ## Grab the PySpark session that we'll use for querying ServerSideAccountCreation
    ## and for writing data to the Data Lake
    spark_session = spark.get_session()

    ## Set up the cached query of ServerSideAccountCreation
    ssac_cached_df = get_cached_mobile_flags(month, spark_session)

    for wiki in wikis:
        try:
            known_users[wiki] | get_known_users(wiki, known_user_patterns)
        except:
            print('Unable to grab a list of known users from {}: {}'.format(wiki, sys.exc_info()[0]))
            continue
        
        try:
            response_data = get_responses(month, wiki, known_users[wiki])
        except:
            print('Unable to retrieve responses from {}: {}'.format(wiki, sys.exc_info()[0]))
            continue
        
        ## Skip this wiki if we had no responses
        if response_data.empty:
            continue

        ## Get mobile flags, merge, those who are not found are assumed to be
        ## on the desktop site since we fill with 0.
        response_data = response_data.merge(
            get_mobile_flags(ssac_cached_df, wiki), how = 'left', on = 'user_id').fillna(0)
        
        ## Because of the left join reg_on_mobile becomes a float, so let's force it to be int
        response_data['reg_on_mobile'] = response_data['reg_on_mobile'].astype(int)
        
        ## Get the toplines
        toplines = get_toplines(response_data, control_groups, survey_groups, wiki, month)
        
        ## This takes the toplines DF, turns it into a Spark DataFrame
        ## then does an INSERT INTO the overview table of the data in that DataFrame.
        toplines_sdf = spark_session.createDataFrame(toplines)
        toplines_sdf.write.insertInto(overview_table)

        ## Iterate over the configuration and extract responses
        aggregated_responses = pd.DataFrame()
        for q_name, q_responses in q_config.items():
            aggregated_responses = pd.concat(
                [aggregated_responses,
                 get_q(response_data, survey_groups, q_name, q_responses, wiki, month)
                ])
        
        ## Similarly as for toplines, convert into a Spark DataFrame and then
        ## insert the data into the table with aggregated responses.
        if not aggregated_responses.empty:
            aggregated_responses_sdf = spark_session.createDataFrame(aggregated_responses)
            aggregated_responses_sdf.write.insertInto(aggregate_response_table)
        
        print('aggregated responses for {}'.format(wiki))
          
    # ok, done
    return()

In [20]:
def coalesce_table(table_name, n_partitions = 1):
    '''
    This function takes the given table and changes the number of Hive partitions
    it uses. It does this by reading the table contents, calling pyspark's coalesce() to
    change the number of partitions, and writing the result out into to a temporary table.
    It will then delete and overwrite the original table. It should be used with
    caution because the optimal number of partitions depends on the size and
    structure of your dataset.
    '''
    
    # make a temp table name based on today's date
    today = dt.datetime.now(dt.timezone.utc).date()
    temp_table_name = "{}_temp_{}".format(table_name, today.strftime("%Y%m%d"))
    
    # grab a spark session
    spark_session = spark.get_session()
    # read in the data table, coalesce it, then write it to the temp table
    spark_session.read.table(table_name).coalesce(n_partitions).write.saveAsTable(temp_table_name)

    # We refresh the table as otherwise Spark think its stored in a different file and throws
    # a FileNotFoundException when we try to read and overwrite the original table
    spark_session.catalog.refreshTable(temp_table_name)

    try:
        # Now we can overwrite the original table and refresh that too, so hopefully nothing goes wrong
        spark_session.read.table(temp_table_name).write.saveAsTable(table_name, mode = 'overwrite')
        spark_session.catalog.refreshTable(table_name)
    except:
        logging.error(f'unable to overwrite table "{table_name}", keeping "{temp_table_name}" as backup')
    else:
        try:
            spark.run(f'DROP TABLE {temp_table_name}')
        except UnboundLocalError:
            # wmfdata currently (late Feb 2021) has an issue with DDL/DML SQL queries,
            # and so we ignore that error
            pass

**NOTE:** The date range in the code below is _inclusive_, so the last month will also be run. Therefore, no overlap between a current run and an earlier run is necessary. A bunch of old data pulls are kept here for reference so it's clear when we pulled data, which wikis we ran it for, and so on.

In [21]:
## We expect this notebook to be run once a month through a cron job (or equivalent)
## during the month _after_ which we're grabbing data. We'll grab today's date,
## change it to the first of the month, walk back one day to get the previous month.
## We then set the date to the first of that month because that's what we use as `log_month`.

today = dt.datetime.now(dt.timezone.utc).date()
first_of_this_month = today.replace(day = 1)
last_of_previous_month = first_of_this_month - dt.timedelta(days = 1)
first_of_previous_month = last_of_previous_month.replace(day = 1)

In [26]:
## Grab an updated list of all wikis, then limit it to all open and public
## Wikipeda wikis:

all_wikis = get_wikis()
all_wikipedia_wikis = all_wikis.loc[(all_wikis['database_group'] == 'wikipedia') &
                                    (all_wikis['status'] == 'open') &
                                    (all_wikis['visbility'] == 'public')].reset_index()

In [27]:
store_data(first_of_previous_month,
           all_wikipedia_wikis['database_code'],
           known_users,
           control_groups,
           survey_groups,
           question_specs,
           overview_table,
           response_table)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


aggregated responses for afwiki
aggregated responses for akwiki
aggregated responses for alswiki
aggregated responses for altwiki
aggregated responses for amwiki
aggregated responses for anwiki
aggregated responses for arwiki
aggregated responses for arywiki
aggregated responses for arzwiki
aggregated responses for astwiki
aggregated responses for aswiki
aggregated responses for atjwiki
aggregated responses for avkwiki
aggregated responses for aywiki
aggregated responses for azbwiki
aggregated responses for azwiki
aggregated responses for barwiki
aggregated responses for bawiki
aggregated responses for bclwiki
aggregated responses for be_x_oldwiki
aggregated responses for bewiki
aggregated responses for bgwiki
aggregated responses for bhwiki
aggregated responses for biwiki
aggregated responses for bmwiki
aggregated responses for bnwiki
aggregated responses for bowiki
aggregated responses for bpywiki
aggregated responses for brwiki
aggregated responses for bswiki
aggregated responses fo

()

In [28]:
coalesce_table(overview_table)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.


In [29]:
coalesce_table(response_table)

PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
PySpark executors will use /usr/lib/anaconda-wmf/bin/python3.
