In [None]:
import ujson as json
import datetime as dt
import os.path
import boto3
import botocore
import calendar
import requests
import moztelemetry.standards as moz_std

%pylab inline

### Miscellaneous functions

In [None]:
def vendor_name_from_id(id):
    """ Get the string name matching the provided vendor id.
    
    Args:
        id: A string containing the vendor id.
    
    Returns: 
        A string containing the vendor name or "(Other <ID>)" if
        unknown.
    """
    
    # TODO: We need to make this an external resource for easier
    # future updates.
    vendor_map = {
        '0x1013': 'Cirrus Logic',
        '0x1002': 'AMD',
        '0x8086': 'Intel',
        '0x5333': 'S3 Graphics',
        '0x1039': 'SIS',
        '0x1106': 'VIA',
        '0x10de': 'NVIDIA',
        '0x102b': 'Matrox',
        '0x15ad': 'VMWare',
        '0x80ee': 'Oracle VirtualBox',
        '0x1414': 'Microsoft Basic',
    }
    
    return vendor_map.get(id, "Other (" + id + ")")

### Functions to query the logintudinal dataset

In [None]:
# Reasons why the data for a client can be discarded.
REASON_INACTIVE = "inactive"
REASON_BROKEN_DATA = "broken"

def get_valid_client_record(r, data_index):
    """ Check if the referenced record is sane or contains partial/broken data.
    
    Args:
        r: The client entry in the longitudinal dataset.
        dat_index: The index of the sample within the client record.
    
    Returns:
        An object containing the client hardware data or REASON_BROKEN_DATA if the
        data is invalid.
    """
    gfx_adapters = r["system_gfx"][data_index]["adapters"]
    monitors = r["system_gfx"][data_index]["monitors"]
    
    # We should make sure to have the resolution and GFX adapter. If we don't,
    # discard this record.
    if not gfx_adapters or not gfx_adapters[0] or not monitors or not monitors[0]:
        return REASON_BROKEN_DATA
    
    # 0x0000 seems to be an invalid vendor id. Just discard the record.
    if gfx_adapters[0]["vendor_id"] == '0x0000':
        return REASON_BROKEN_DATA
    
    # At this point, we should have filtered out all the weirdness. Fetch
    # the data we need. 
    data = {
        'os_name': r["system_os"][data_index]["name"],
        'os_version': r["system_os"][data_index]["version"],
        'memory_mb': r["system"][data_index]["memory_mb"],
        'gfx0_vendor_id': gfx_adapters[0]["vendor_id"],
        'screen_width': monitors[0]["screen_width"],
        'screen_height': monitors[0]["screen_height"],
        'cpu_cores': r["system_cpu"][data_index]["cores"],
        'cpu_vendor': r["system_cpu"][data_index]["vendor"]
    }
    
    return REASON_BROKEN_DATA if None in data.values() else data

def get_latest_valid_per_client(entry, time_start, time_end):
    """ Get the most recently submitted ping for a client within the given timeframe.

    Then use this index to look up the data from the other columns (we can assume that the sizes
    of these arrays match, otherwise the longitudinal dataset is broken).
    Once we have the data, we make sure it's valid and return it.
    
    Args:
        entry: The record containing all the data for a single client.
        time_start: The beginning of the reference timeframe.
        time_end: The end of the reference timeframe.

    Returns:
        An object containing the valid hardware data for the client or a string
        describing why the data is discarded. Either REASON_INACTIVE, if the client didn't
        submit a ping within the desired timeframe, or REASON_BROKEN_DATA if it send
        broken data. 
    
    Raises:
        ValueError: if the columns within the record have mismatching lengths. This
        means the longitudinal dataset is corrupted.
    """
    latest_entry = None
    for index, pkt_date in enumerate(entry["submission_date"]):
        sub_date = dt.datetime.strptime(pkt_date, "%Y-%m-%dT%H:%M:%S.%fZ").date()
        # The data is ordered ascending, so the first item less or equal than the time_end
        # date is our thing.
        if sub_date >= time_start and sub_date <= time_end:
            latest_entry = index
            break
        
        # Ok, we went too far, we're not really interested in the data
        # outside of [time_start, time_end]. Since records are ordered,
        # we can actually skip this.
        if sub_date < time_start:
            break
    
    # This client wasn't active in the reference tiemframe, just map it to no data.
    if latest_entry is None:
        return REASON_INACTIVE

    # Some clients might be missing entire sections. Yeah, this is weird. Skip them,
    # we don't want partial data.
    desired_sections = ["system_os", "submission_date", "system", "system_gfx", "system_cpu"]
    for field in desired_sections:
        if entry[field] is None:
            # Just return None, we will be filtering them away in next passes.
            return REASON_BROKEN_DATA

        # All arrays in the longitudinal dataset should have the same length, for a
        # single client. If that's not the case, if our index is not there, throw.
        if entry[field][latest_entry] is None:
            raise ValueError("Null " + field + " index: " + str(latest_entry))

    return get_valid_client_record(entry, latest_entry)

### Define how we transform the data

In [None]:
def prepare_data(p):
    """ This function prepares the data for further analyses (e.g. unit conversion,
    vendor id to string, ...). """
    return {
        'cpu_cores': p['cpu_cores'],
        'cpu_vendor': p['cpu_vendor'],
        'gfx0_vendor_name': vendor_name_from_id(p['gfx0_vendor_id']),
        'resolution': str(p['screen_width']) + 'x' + str(p['screen_height']),
        'memory_gb': int(round(p['memory_mb'] / 1024.0)),
        'os': p['os_name'] + '-' + p['os_version'],
    }

def aggregate_data(processed_data):
    def seq(acc, v):
        # The dimensions over which we want to aggregate the different values.
        keys_to_aggregate = [
            'cpu_cores',
            'cpu_vendor',
            'gfx0_vendor_name',
            'resolution',
            'memory_gb',
            'os',
        ]

        for key_name in keys_to_aggregate:
            # We want to know how many users have a particular configuration (e.g. using a particualar
            # cpu vendor). For each dimension of interest, build a key as (hw, value) and count its
            # occurrences among the user base.
            acc_key = (key_name, v[key_name])
            acc[acc_key] = acc.get(acc_key, 0) + 1
        
        return acc

    def cmb(v1, v2):
        # Combine the counts from the two partial dictionaries. Hacky?
        return  { k: v1.get(k, 0) + v2.get(k, 0) for k in set(v1) | set(v2) }
    
    return processed_data.aggregate({}, seq, cmb)

def collapse_buckets(aggregated_data, count_threshold):
    """ Collapse uncommon configurations in generic groups to preserve privacy.
    
    This takes the dictionary of aggregated results from |aggregate_data| and collapses
    entries with a value less than |count_threshold| in a generic bucket.
    
    Args:
        aggregated_data: The object containing aggregated data.
        count_threhold: Groups (or "configurations") containing less than this value
        are collapsed in a generic bucket.
    """
    collapsed_groups = {}
    for k,v in aggregated_data.iteritems():
        # Don't clump this group into the "Other" bucket if it has enough
        # users it in.
        if v > count_threshold:
            collapsed_groups[k] = v
            continue
        
        # If we're here, it means that the key has not enough elements.
        # Fall through the next cases and try to group things together.
        new_group_key = 'Other'
        
        # Let's try to group similar resolutions together.
        key_type = k[0]
        if key_type == 'resolution':
            # Extract the resolution.
            [w, h] = k[1].split('x')
            # Round to the nearest hundred.
            w = int(round(int(w), -2))
            h = int(round(int(h), -2))
            # Build up a new key.
            new_group_key = '~' + str(w) + 'x' + str(h)
        elif key_type == 'os':
            [os, ver] = k[1].split('-', 1)
            new_group_key = os + '-' + 'Other'
        
        # We don't have enough data for this particular group/configuration.
        # Aggregate it with the data in the "Other" bucket
        other_key = (k[0], new_group_key)
        collapsed_groups[other_key] = collapsed_groups.get(other_key, 0) + v
    
    # The previous grouping might have created additional groups. Let's check again.
    final_groups = {}
    for k,v in collapsed_groups.iteritems():
        # Don't clump this group into the "Other" bucket if it has enough
        # users it in.
        if v > count_threshold:
            final_groups[k] = v
            continue

        # We don't have enough data for this particular group/configuration.
        # Aggregate it with the data in the "Other" bucket
        other_key = (k[0], 'Other')
        final_groups[other_key] = final_groups.get(other_key, 0) + v
    
    return final_groups

def finalize_data(data, sample_count, discarded_count, report_date):
    """ Finalize the aggregated data.
    
    Translate raw sample numbers to percentages and add the date for the reported
    week along with the percentage of discarded samples due to broken data.
    
    Rename the keys to more human friendly names.
    
    Args:
        data: Data in aggregated form.
        sample_count: The number of samples the aggregates where generated from.
        discarded_count: The number of samples discarded due to broken data.
        report_date: The starting day for the reported week.
    
    Returns:
        An object containing the reported hardware statistics.
    """

    denom = float(sample_count)

    aggregated_percentages = {
        'date': report_date.isoformat(),
        'discarded': discarded_count,
    }

    keys_translation = {
        'cpu_cores': 'cores_',
        'cpu_vendor': 'cpu_',
        'gfx0_vendor_name': 'gpu_',
        'resolution': 'display_',
        'memory_gb': 'ram_',
        'os': 'os_',
    }

    # Compute the percentages from the raw numbers.
    for k, v in data.iteritems():
        # The old key is a tuple (key, value). We translate the key part and concatename the
        # value as a string.
        new_key = keys_translation[k[0]] + unicode(k[1])
        aggregated_percentages[new_key] = v / denom

    return aggregated_percentages

### File and S3 serialization functions.

In [None]:
S3_PUBLIC_BUCKET = "telemetry-public-analysis-2"
S3_DATA_PATH = "game-hardware-survey/data/"

def get_file_name(suffix=""):
    return "hwsurvey-weekly" + suffix + ".json"

def serialize_results(aggregated_data, week_start, week_end):
    # Write the week start/end in the filename.
    suffix = "-" + week_start.strftime("%Y%d%m") + "-" + week_end.strftime("%Y%d%m")
    file_name = get_file_name(suffix)
    
    if os.path.exists(file_name):
        print "{} exists, we will overwrite it.".format(file_name)

    # Our aggregated data is a JSON object.
    json_entry = json.dumps(aggregated_data)

    with open(file_name, "w") as json_file:
        json_file.write("[" + json_entry.encode('utf8') + "]\n")

def fetch_previous_state(s3_source_file_name, local_file_name):
    """
    This function fetches the previous state from S3's bucket and stores it locally.
    
    Args:
        s3_source_file_name: The name of the file on S3.
        local_file_name: The name of the file to save to, locally.
    """

    # Fetch the previous state.
    client = boto3.client('s3', 'us-west-2')
    transfer = boto3.s3.transfer.S3Transfer(client)
    key_path = S3_DATA_PATH + s3_source_file_name
    
    try:
        transfer.download_file(S3_PUBLIC_BUCKET, key_path, local_file_name)
    except botocore.exceptions.ClientError as e:
        # If the file wasn't there, that's ok. Otherwise, abort!
        if e.response['Error']['Code'] != "404":
            raise e
        else:
            print "Did not find an existing file at '{}'".format(key_path)

def store_new_state(source_file_name, s3_dest_file_name):
    """
    Store the new state file to S3.
    
    Args:
        source_file_name: The name of the local source file.
        s3_dest_file_name: The name of the destination file on S3.
    """

    client = boto3.client('s3', 'us-west-2')
    transfer = boto3.s3.transfer.S3Transfer(client)
    
    # Update the state in the analysis bucket.
    key_path = S3_DATA_PATH + s3_dest_file_name
    transfer.upload_file(source_file_name, S3_PUBLIC_BUCKET, key_path)

### The main logic, wiring all the things together.

In [None]:
def generate_report(start_date=None, end_date=None):
    """ Generates the hardware survey dataset for the reference timeframe.
    
    If the timeframe is longer than a week, split it in in weekly chunks
    and process each chunk individually (eases backfilling).
    
    The report for each week is saved in a local JSON file.
    
    Args:
        start_date: The date from which we start generating the report. If None,
           the report starts from the beginning of the past week (Sunday).
        end_date: The date the marks the end of the reporting period. This only
           makes sense if a |start_date| was provided. If None, this defaults
           to the end of the past week (Saturday).
    """

    # If no start_date was provided, generate a report for the past complete week.
    last_week = moz_std.get_last_week_range()
    date_range = (
        moz_std.snap_to_beginning_of_week(start_date, "Sunday") if start_date != None else last_week[0],
        end_date if (end_date != None and start_date != None) else last_week[1]
    )

    # Connect to the longitudinal dataset.
    sqlQuery = "SELECT " +\
               "client_id, " +\
               "system_os," +\
               "submission_date," +\
               "system," +\
               "geo_country," +\
               "system_gfx," +\
               "system_cpu " +\
               "FROM longitudinal WHERE normalized_channel = 'release'"
    frame = sqlContext.sql(sqlQuery)

    # Split the submission period in chunks, so we don't run out of resources while aggregating if
    # we want to backfill.
    chunk_start = date_range[0]
    chunk_end = None

    while chunk_start < date_range[1]:
        chunk_end = chunk_start + dt.timedelta(days=6)

        # Fetch the data we need.
        data = frame.rdd.map(lambda r: get_latest_valid_per_client(r, chunk_start, chunk_end))
        
        # Filter out broken data.
        filtered_data = data.filter(lambda r: r not in [REASON_BROKEN_DATA, REASON_INACTIVE])
        
        # Count the broken records.
        broken_count = data.filter(lambda r: r is REASON_BROKEN_DATA).count()
        
        # Process the data, transforming it in the form we desire.
        processed_data = filtered_data.map(prepare_data)
        
        print "Aggregating entries..."
        aggregated_pings = aggregate_data(processed_data)

        # Get the sample count, we need it to compute the percentages instead of raw numbers.
        # Since we're getting only the newest ping for each client, we can simply count the
        # number of pings. THIS MAY NOT BE CONSTANT ACROSS WEEKS!
        sample_count = filtered_data.count()

        # Collapse together groups that count less than 1% of our samples.
        threshold_to_collapse = int(sample_count * 0.01)
        
        print "Collapsing smaller groups into the other bucket (threshold {th})".format(th=threshold_to_collapse)
        collapsed_aggregates = collapse_buckets(aggregated_pings, threshold_to_collapse)
        
        print "Post-processing raw values..."
        discarded_count = broken_count / float(sample_count + broken_count)
        processed_aggregates = finalize_data(collapsed_aggregates, sample_count, discarded_count, chunk_start)
        
        print "Serializing results locally..."
        # This either appends to an existing file, or creates a new one.
        serialize_results(processed_aggregates, chunk_start, chunk_end)

        # Move on to the next chunk, just add one day the end of the last chunk.
        chunk_start = chunk_end + dt.timedelta(days=1)

### Execute the Job

In [None]:
start_date = None # Only use this when backfilling, e.g. dt.date(2016,2,1)
end_date = None # Only use this when backfilling, e.g. dt.date(2016,3,26)

# Fetch the previous data from S3 and save it locally.
fetch_previous_state("hwsurvey-weekly.json", "hwsurvey-weekly-prev.json")
# Generate the report for the desired period.
generate_report(start_date, end_date)
# Concat the json files into the output.
print "Joining JSON files..."
!jq -s "[.[]|.[]]" *.json > "hwsurvey-weekly.json"
# Store the new state to S3. Since S3 doesn't support symlinks, make two copy
# of the file: one will always contain the latest data, the other for archiving.
archived_file_copy = "hwsurvey-weekly-" + datetime.date.today().strftime("%Y%d%m") + ".json"
store_new_state("hwsurvey-weekly.json", archived_file_copy)
store_new_state("hwsurvey-weekly.json", "hwsurvey-weekly.json")