In [19]:
import ujson as json
import datetime as dt
import gzip
import os.path
import boto # S3
import calendar

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline

Populating the interactive namespace from numpy and matplotlib


Let's pick the report we want to generate here.

In [26]:
operating_mode = "weekly" # either "weekly" or "monthly"

In [52]:
def get_last_week_range():
    today = dt.date.today()
    # Get the first day of the past complete week. We need the weeks starting from "Sunday", not from "Monday",
    # so account for that using |(today.weekday() + 1) % 7)|
    start_of_week = today - datetime.timedelta(days=((today.weekday() + 1) % 7), weeks=1)
    end_of_week = start_of_week + datetime.timedelta(days=6)
    return (start_of_week, end_of_week)

def get_last_month_range():
    today = dt.date.today()
    # Get the last day for the previous month.
    end_of_last_month = today.replace(day=1) - datetime.timedelta(days=1)
    start_of_last_month = end_of_last_month.replace(day=1)
    return (start_of_last_month, end_of_last_month)

def snap_to_past_sunday(date):
    """ Get the closest, previous Sunday since date. """
    return date - datetime.timedelta(days=((date.weekday() + 1) % 7))

def snap_to_beginning_of_month(date):
    """ Get the date for the first day of this month. """
    return date.replace(day=1)

### Fetch the core pings

First thing, pick a submission range. Either last week or last month.

In [4]:
def fetch_deduped_pings(sub_range):
    core_pings = get_pings(sc,
                       app="Fennec",
                       doc_type="core",
                       source_version="1",
                       submission_date=sub_range,
                       fraction=1.0)

    # We don't need the whole ping. Just get the props we want.
    subset = get_pings_properties(core_pings, ["clientId",
                                               "osversion",
                                               "profileDate",
                                               "meta/submissionDate",
                                               "meta/geoCountry",
                                               "meta/appUpdateChannel",
                                               "meta/Timestamp",
                                               "meta/documentId"
                                              ])

    # We can (sadly) have duplicated pings. Apply deduping.
    return subset.map(lambda p: (p["meta/documentId"], p))\
                 .reduceByKey(lambda a, b: a)\
                 .map(lambda t: t[1])

### Aggregate the data

Fields documentation:
* *os_version* - core pings contain the API level, we need to output the codename+version
* *geo* - the country code from the country originating the pings. We're only interested in some countries, other countries are grouped as "Other"
* *channel* - the product channel
* *date* - the first day of the aggregated week/month
* *actives* - the number of clients that were active that day. It checked 'org.mozilla.appSessions' before, can we simply count the number of core pings submitted on that day (dedupe by client id!)?
* *new_records* - profile creation date == submission date
 * This could not hold due to broken clocks, temporary loss of network, ...
* *d1* - how many clients were active at least once after the profile creation date?
* *d7* - how many clients were active at least once in the 7 days following profile creation?
* *d30* - how many clients were active at least once in the 30 days following profile creation?
* *hours* - session duration in hours. Currently 0.
* *google, yahoo, bing, other* - Currently 0

In [5]:
def get_country(original_country):
    COUNTRIES_OF_INTEREST = set(['US','CA','BR','MX','FR','ES','IT','PL','TR','RU','DE','IN','ID','CN','JP','GB'])
    return original_country if original_country in COUNTRIES_OF_INTEREST else 'Other'

def android_api_level_to_version(api_level):
    """
    The core ping stores the API level, but we need to display the OS version/codename.
    We can map API Level -> Codename, the related information is available there:
    https://source.android.com/source/build-numbers.html
    """
    API_MAP = {
        '8': 'Froyo (2.2 - 2.2.3)',
        '9': 'Gingerbread (2.3 - 2.3.7)',
        '10': 'Gingerbread (2.3 - 2.3.7)',
        '11': 'Honeycomb (3.0 - 3.2.6)',
        '12': 'Honeycomb (3.0 - 3.2.6)',
        '13': 'Honeycomb (3.0 - 3.2.6)',
        '14': 'Ice Cream Sandwich (4.0 - 4.0.4)',
        '15': 'Ice Cream Sandwich (4.0 - 4.0.4)',
        '16': 'Jelly Bean (4.1 - 4.3.x)',
        '17': 'Jelly Bean (4.1 - 4.3.x)',
        '18': 'Jelly Bean (4.1 - 4.3.x)',
        '19': 'KitKat (4.4 - 4.4.4)',
        '21': 'Lollipop (5.0 - 5.1)',
        '22': 'Lollipop (5.0 - 5.1)',
        '23': 'Marshmallow (6.0)',
    }
    return API_MAP[api_level] if api_level in API_MAP else 'Other'

In [38]:
def parse_to_unix_days(s):
    """ Converts YYYYMMDD to days since unix epoch """
    return (dt.datetime.strptime(s, "%Y%m%d") - dt.datetime(1970,1,1)).days

def get_file_name(report_type, suffix=""):
    return "fennec-v4-" + operating_mode + suffix + ".csv"

def safe_get(p, key, default_value=0):
    """ Checks if key is in p. If it is, return its value. Otherwise, return default_value. """
    return p[key] if key in p else default_value

def safe_increment(p, key):
    """ Safely increments p[key]. """
    p[key] = safe_get(p, key, 0) + 1

def is_new_profile(submission_epoch, profile_epoch):
    """
    Determines if this is a new profile by checking if the submission date
    equals the profile creation date.
    """
    return submission_epoch == profile_epoch

def get_key(p, segments, report_type):
    """ Build a key-tuple with the dimensions we want to aggregate. """
    dims = []
    
    # Translate the API version to a name.
    if 'osversion' in segments:
        dims.append(android_api_level_to_version(p.get('osversion')))
    else:
        dims.append('all')

    # Only get some of the countries.
    if 'meta/geoCountry' in segments:
        dims.append(get_country(p.get('meta/geoCountry')))
    else:
        dims.append('all')

    # Only get some of the countries.
    if 'meta/appUpdateChannel' in segments:
        dims.append(p.get('meta/appUpdateChannel'))
    else:
        dims.append('all')
        
    # Append the date, at last. In the weekly mode, that's the first day of the
    # submission week. In the monthly mode, that's the first day of the month.
    submission_date = dt.datetime.strptime(p.get("meta/submissionDate"), "%Y%m%d")
    date_string = ""
    if report_type == "monthly":
        date_string = submission_date.strftime("%Y%m01")
    else:
        first_day = submission_date - datetime.timedelta(days=((submission_date.weekday() + 1) % 7))
        date_string = first_day.strftime("%Y%m%d")

    dims.append(date_string)
        
    return tuple(dims)

def run_query(pings, segments, report_type):
    """
    Aggregate the pings over the dimensions in "segments". We start by generating a key for each
    ping by chaining the values of the dimensions of interest. If we don't care about a particular
    dimension, its value is set to "all".
    All the pings belonging to a key are aggregated together.
    """

    # Segment the data by indexing them by dimensions.
    segmented_pings = pings.map(lambda p: (get_key(p, segments, report_type), p))
    
    # We require the profile age to measure the retention. Filter out those pings that don't have it.
    filtered = segmented_pings.filter(lambda p: p[1].get("profileDate", None) != None)
    
    # For metrics like d1, d7, d30, and new_records we need only one core ping per client, per day.
    # Generate a new RDD containing only one ping per client, for each day, within the segment:
    # Step 1 - Append the client id and submission date to the index key
    # Step 2 - ReduceByKey so that we get only one ping per day per client
    # Step 3 - Strip off the client id/submission date from the index key
    one_per_day = filtered.map(lambda p: ((p[0], p[1].get("clientId"), p[1].get("meta/submissionDate")), p[1]))\
                          .reduceByKey(lambda a, b: a)\
                          .map(lambda p: (p[0][0], p[1]))
    
    # Compute the aggregated counts.
    def retention_seq(acc, v):
        if not acc:
            acc = {}
        
        # Please note that retention *WILL* be broken by broken clocks.
        submission_epoch = parse_to_unix_days(v['meta/submissionDate'])
    
        # Check if this ping is on a new profile. If so, increments "new_records".
        if is_new_profile(submission_epoch, v['profileDate']):
            safe_increment(acc, 'new_records')
            
        # Evaluate the d1, d7, d30 retention metrics. First, get the delta between
        # the submission date and the profile creation date.
        days_after_creation = submission_epoch -  v['profileDate']

        # Is the user still engaged after 1 day (d1)?
        if days_after_creation == 1:
            safe_increment(acc, 'd1')
        
        # And after 7 days (d7)?
        if days_after_creation <= 7:
            safe_increment(acc, 'd7')

        # And after 30 days (d30)?
        if days_after_creation <= 30:
            safe_increment(acc, 'd30')

        return acc

    def cmb(v1, v2):
        # Combine the counts from the two partial dictionaries. Hacky?
        return  { k: v1.get(k, 0) + v2.get(k, 0) for k in set(v1) | set(v2) }

    retention_defaults = {
        'new_records': 0,
        'actives': 0,
        'd1': 0,
        'd7': 0,
        'd30': 0,
    }
    aggregated_retention = one_per_day.aggregateByKey(retention_defaults, retention_seq, cmb)

    # For each segment, count how many active clients:
    def count_actives(acc, v):
        acc["actives"] = acc["actives"] + 1
        return acc

    # We aggregate the active user count in an object to ease joining
    actives_per_segment = segmented_pings.map(lambda r: ((r[0], r[1].get("clientId")), 1))\
                                         .reduceByKey(lambda x,y: x)\
                                         .map(lambda r: (r[0][0], 1))\
                                         .aggregateByKey({"actives":0}, count_actives, cmb)
    
    # Join the RDDs.
    merged = aggregated_retention.join(actives_per_segment).mapValues(lambda r: cmb(r[0], r[1]))

    return merged

To build a single CSV file, we execute a series of queries and then serialize the output.

In [53]:
def run_queries(start_date=None, end_date=None):
    """
    This function has 3 operating modes:
     1. "weekly", which aggregates the data from the last full week and appends the results to the weekly CSV;
     2. "monthly", aggregates the last full month and appends the results to the monthly CSV;
     3. "backfill", given a start and end date, performs weekly or monthly aggregation over that period and
        appends to the CSV file.
    """
    # Each entry represents a different query over a set of dimensions of interest.
    QUERIES = [
        ['osversion', 'meta/geoCountry', 'meta/appUpdateChannel'],
        ['osversion', 'meta/appUpdateChannel'],
        ['osversion', 'meta/geoCountry'],
        ['meta/geoCountry', 'meta/appUpdateChannel'],
        ['osversion'],
        ['meta/geoCountry'],
        ['meta/appUpdateChannel'],
        []
    ]

    # The cumulative RDD holding all the results.
    results = sc.emptyRDD()

    # Check start_date and end_date for validity. If invalid, set them for last week/month
    date_range = get_last_month_range() if operating_mode is "monthly" else get_last_week_range()
    if start_date != None and end_date != None:
        sd = snap_to_past_sunday(start_date) if operating_mode is "weekly"\
                                             else snap_to_beginning_of_month(start_date)
        date_range = (sd, end_date)

    # Split the submission period in chunks, so we don't run out of resources while aggregating.
    chunk_start = date_range[0]
    chunk_end = None

    while chunk_start <= date_range[1]:
        # Compute the end of this time chunk.
        if operating_mode == "monthly":
            chunk_end = chunk_start.replace(day=calendar.monthrange(chunk_start.year, chunk_start.month)[1])
        else:
            chunk_end = chunk_start + dt.timedelta(days=6)

        # Fetch the pings we need.
        submissions_range = (chunk_start.strftime("%Y%m%d"), chunk_end.strftime("%Y%m%d"))
        deduped = fetch_deduped_pings(submissions_range)

        chunk_results = sc.emptyRDD()
        
        for query in QUERIES:
            print "Running query over dimensions: %s" % ", ".join(query) 
            query_result = run_query(deduped, query, operating_mode)
            # Append this RDD to the results for this chunk.
            chunk_results = chunk_results.union(query_result)


        # Serialize intermediate results to file, so we don't start from scratch if the batch fails.
        # We append the week/month at the end of the file name.
        serialize_results(chunk_results, submissions_range[0])

        # Move on to the next chunk, just add one day to either last month or week.
        chunk_start = chunk_end + dt.timedelta(days=1)
        
        # Append this chunk results to the whole RDD. We assume the keys DO NOT collide.
        results = results.union(chunk_results)
    
    return results

### CSV and S3 utility functions.

Some utility functions to read/write from the S3 store.

In [54]:
def fetch_previous_state():
    file_name = get_file_name(operating_mode)

    # Fetch the CSV from s3://net-mozaws-prod-metrics-data/fennec-dashboard
    conn = boto.connect_s3(host="s3-us-west-2.amazonaws.com")
    bucket = conn.get_bucket("net-mozaws-prod-metrics-data")

    key = bucket.get_key('fennec-dashboard/' + file_name)
    if key is None:
        print("No previous s3://net-mozaws-prod-metrics-data/fennec-dashboard/%s" % file_name)
        return
    
    key.get_contents_to_filename(file_name)

def store_new_state():
    file_name = get_file_name(operating_mode)

    # Fetch the CSV from s3://net-mozaws-prod-metrics-data/fennec-dashboard
    conn = boto.connect_s3(host="s3-us-west-2.amazonaws.com")
    bucket = conn.get_bucket("net-mozaws-prod-metrics-data")

    k = Key(bucket)
    k.key = 'fennec-dashboard/' + file_name
    k.set_contents_from_filename(file_name)

Utility functions to map our data to CSV and then save it to file.

In [55]:
def to_csv(r):
    # The key itself is a tuple containing the following data:
    # (os, geo, channel, date)
    data_from_key = r[0]
    formatted_date = dt.datetime.strptime(data_from_key[3], "%Y%m%d").strftime("%Y-%m-%d")
    return ",".join([
        data_from_key[0], # os
        data_from_key[1], # geo
        data_from_key[2], # channel
        formatted_date,
        str(r[1]['actives']),
        str(r[1]['new_records']),
        str(r[1]['d1']),
        str(r[1]['d7']),
        str(r[1]['d30']),
        "0", #str(r[1]['hours']),
        "0", #str(r[1]['google']),
        "0", #str(r[1]['yahoo']),
        "0", #str(r[1]['bing']),
        "0", #str(r[1]['other']),
    ])
            
def serialize_results(results, file_suffix=""):
    file_name = get_file_name(operating_mode, file_suffix)
    skip_csv_header = False
    
    # If the file is already there, append the new data, but don't print the header again.
    if os.path.exists(file_name):
        print("Omitting the CSV header")
        skip_csv_header = True 

    csv_lines = results.map(to_csv).collect()
    print("Writing %i new entries" % len(csv_lines))
    with open(file_name, "a") as csv_file:
        # The file didn't exist before this call, print the header.
        if not skip_csv_header:
            header = "os_version,geo,channel,date,actives,new_records,d1,d7,d30,hours,google,yahoo,bing,other"
            csv_file.write(header.encode('utf8') + "\n")
        
        # Finally append the data lines.
        for r in csv_lines:
            csv_file.write(r.encode('utf8') + "\n")
    

### Execute our script

In [None]:
if operating_mode not in ["weekly", "monthly"]:
    raise ValueError("Unknown operating mode: %s " % operating_mode)

start_date = None # Only use this when backfilling, e.g. dt.datetime(2016,3,6)
end_date = None # Only use this when backfilling, e.g. dt.datetime(2016,3,19)
# Run the query and compute the results.
result = run_queries(start_date, end_date)
# Fetch the previous CSV file from S3.
fetch_previous_state()
# Updates it.
serialize_results(result)
# Stores the updated one back to S3
store_new_state()

Running query over dimensions: osversion, meta/geoCountry, meta/appUpdateChannel
Running query over dimensions: osversion, meta/appUpdateChannel
Running query over dimensions: osversion, meta/geoCountry
Running query over dimensions: meta/geoCountry, meta/appUpdateChannel
Running query over dimensions: osversion
Running query over dimensions: meta/geoCountry
Running query over dimensions: meta/appUpdateChannel
Running query over dimensions: 