In [2]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py
import operator
import json
from __future__ import division

from moztelemetry import get_pings, get_pings_properties, get_one_ping_per_client

%pylab inline

MaxPartitions = sc.defaultParallelism * 4

Populating the interactive namespace from numpy and matplotlib


In [3]:
def fmt_date(d):
    return d.strftime("%Y%m%d")

TimeWindow = 14
PingFraction = 0.05

# Get two weeks worth of pings across all channels. For testing, we
# currently select a very small subset.
t1 = fmt_date(datetime.datetime.now() - datetime.timedelta(TimeWindow + 2)) # go back 16 days
t2 = fmt_date(datetime.datetime.now() - datetime.timedelta(2)) # go back 2 days
all_pings = get_pings(sc, app="Firefox", build_id=(t1, t2), fraction=PingFraction)

In [4]:
pings = get_pings_properties(all_pings, [
  "clientID",
  "environment/build/version",
  "environment/system/os/name",
  "environment/system/os/version",
  "environment/system/os/servicePackMajor",
  "environment/system/os/servicePackMinor",
  "environment/system/gfx/adapters",
  "histograms/DEVICE_RESET_REASON",
  "histograms/GRAPHICS_SANITY_TEST",
])
pings = get_one_ping_per_client(pings)

# Transform each ping to make it easier to work with in later stages.
def Validate(p):
    name = p.get("environment/system/os/name") or 'w'
    version = p.get("environment/system/os/version") or '0'
    if name == 'Linux':
        p['OSVersion'] = None
        p['OS'] = 'Linux'
        p['OSName'] = 'Linux'
    elif name == 'Windows_NT':
        spmaj = p.get("environment/system/os/servicePackMajor") or '0'
        p['OSVersion'] = version + '.' + str(spmaj)
        p['OS'] = 'Windows-' + version + '.' + str(spmaj)
        p['OSName'] = 'Windows'
    elif name == 'Darwin':
        p['OSVersion'] = version
        p['OS'] = 'Darwin-' + version
        p['OSName'] = 'Darwin'
    else:
        return p
    
    # Telemetry data isn't guaranteed to be well-formed so unfortunately
    # we have to do some validation on it. If we get to the end, we set
    # p['valid'] to True, and this gets filtered over later. In addition
    # we have a wrapper below to help fetch strings that may be null.
    if not p.get("environment/build/version", None):
        return p
    p['FxVersion'] = p["environment/build/version"].split('.')[0]
    
    # Verify that we have at least one adapter.
    try:
        adapter = p["environment/system/gfx/adapters"][0]
    except:
        return p
    if adapter is None or not hasattr(adapter, '__getitem__'):
        return p
    
    def T(obj, key):
        return obj.get(key, None) or 'Unknown'
    
    # We store the device ID as a vendor/device string, because the device ID
    # alone is not enough to determine whether the key is unique.
    #
    # We also merge 'Intel Open Source Technology Center' with the device ID
    # that should be reported, 0x8086, for simplicity.
    vendorID = T(adapter, 'vendorID')
    if vendorID == u'Intel Open Source Technology Center':
        p['vendorID'] = u'0x8086'
    else:
        p['vendorID'] = vendorID
    p['deviceID'] = u'{0}/{1}'.format(p['vendorID'], T(adapter, 'deviceID'))
    p['driverVersion'] = u'{0}/{1}'.format(p['vendorID'], T(adapter, 'driverVersion'))
    if adapter['driverVersion']:
        p['driverShortVersion'] = '{0}/{1}'.format(p['vendorID'], '.'.join(T(adapter, 'driverVersion').split('.')[0:3]))
    else:
        p['driverShortVersion'] = '{0}/Unknown'.format(p['vendorID'])
        
    p['valid'] = True
    return p

# Transform step.
annotated_pings = pings.map(Validate)

In [5]:
# Filter out pings we didn't understand.
AllData = annotated_pings.filter(lambda p: p.get('valid', False) == True)
AllData = AllData.cache()

# Windows gets some preferential breakdown treatment.
WindowsSubset = AllData.filter(lambda p: p['OSName'] == 'Windows')
WindowsSubset = WindowsSubset.cache()

In [6]:
# Take each key in |b| and add it to |a|, accumulating its value into
# |a| if it already exists.
def combiner(a, b):
    result = a
    for key in b:
        countA = a.get(key, 0)
        countB = b[key]
        result[key] = countA + countB
    return result

# Return an aggregation based on combiner.
def aggregation(data, fn):
    view = data.map(fn)
    return view.reduceByKey(combiner)

# After reduceByKey(combiner), we get a mapping like:
#  key => { variable => value }
#
# This function collapses 'variable' instances below a threshold into
# a catch-all identifier ('Other').
def coalesce_to_n_items(agg, max_items):
    obj = []
    for superkey, breakdown in agg:
        if len(breakdown) <= max_items:
            obj += [(superkey, breakdown)]
            continue
        items = sorted(breakdown.items(), key=lambda obj: obj[1], reverse=True)
        new_breakdown = {k: v for k, v in items[0:max_items]}
        total = 0
        for k, v in items[max_items:]:
            total += v
        if total:
            new_breakdown['Other'] = new_breakdown.get('Other', 0) + total
        obj += [(superkey, new_breakdown)]
    return obj

def union_pipelines(a, b):
    if a is None:
        return b
    return a + b

In [7]:
def map_x_to_y(data, sourceKey, destKey):
    def extract(p):
        return (p[sourceKey], { p[destKey]: 1 })
    return aggregation(data, extract)

def map_x_to_count(data, sourceKey):
    def extract(p):
        return (p[sourceKey],)
    return data.map(extract).countByKey()

# Results by operating system.
OSToVendor = map_x_to_y(AllData, 'OSName', 'vendorID')
OSToDevice = map_x_to_y(AllData, 'OSName', 'deviceID')
OSShare = map_x_to_count(AllData, 'OSName')

# Results by Windows version.
WindowsToVendor = map_x_to_y(WindowsSubset, 'OSVersion', 'vendorID')
WindowsToDevice = map_x_to_y(WindowsSubset, 'OSVersion', 'deviceID')
WindowsToDriver = map_x_to_y(WindowsSubset, 'OSVersion', 'driverVersion')
WindowsShare = map_x_to_count(WindowsSubset, 'OSVersion')
DriverShare = map_x_to_count(WindowsSubset, 'driverVersion')

# Results by Firefox version.
FxToVendor = map_x_to_y(AllData, 'FxVersion', 'vendorID')
FxToDevice = map_x_to_y(AllData, 'FxVersion', 'deviceID')
FxShare = map_x_to_count(AllData, 'FxVersion')

# Top-level stats.
VendorShare = map_x_to_count(AllData, 'vendorID')
DeviceShare = map_x_to_count(AllData, 'deviceID')

In [8]:
#############################
# Perform the TDR analysis. #
#############################
NumTDRReasons = 8
def ping_has_tdr_for(p, reason):
    return p['histograms/DEVICE_RESET_REASON'][reason] > 0

# Specialized version of map_x_to_y, for TDRs. We cast to int because for
# some reason the values Spark returns do not serialize with JSON.
def map_reason_to_vendor(p, reason, destKey):
    return (int(reason), { p[destKey]: int(p['histograms/DEVICE_RESET_REASON'][reason]) })
def map_vendor_to_reason(p, reason, destKey):
    return (p[destKey], { int(reason): int(p['histograms/DEVICE_RESET_REASON'][reason]) })

# Filter out pings that do not have any TDR data. We expect this to be a huge reduction
# in the sample set, and the resulting partition count gets way off. We repartition
# immediately for performance.
TDRSubset = WindowsSubset.filter(lambda p: p.get('histograms/DEVICE_RESET_REASON', None) is not None)
TDRSubset = TDRSubset.repartition(MaxPartitions)
TDRSubset = TDRSubset.cache()

# For each TDR reason, get a list tuple of (reason, vendor => resetCount). Then
# we combine these into a single series.
reason_to_vendor_tuples = None
vendor_to_reason_tuples = None
for reason in xrange(1, NumTDRReasons):
    subset = TDRSubset.filter(lambda p: ping_has_tdr_for(p, reason))
    subset = subset.cache()
    
    tuples = subset.map(lambda p: map_reason_to_vendor(p, reason, 'vendorID'))
    reason_to_vendor_tuples = union_pipelines(reason_to_vendor_tuples, tuples)
    
    tuples = subset.map(lambda p: map_vendor_to_reason(p, reason, 'vendorID'))
    vendor_to_reason_tuples = union_pipelines(vendor_to_reason_tuples, tuples)

TDRReasonToVendor = reason_to_vendor_tuples.reduceByKey(combiner, MaxPartitions)
TDRVendorToReason = vendor_to_reason_tuples.reduceByKey(combiner, MaxPartitions)

In [9]:
#########################
# Sanity test analysis. #
#########################
SANITY_TEST_PASSED = 0
SANITY_TEST_FAILED_RENDER = 1
SANITY_TEST_FAILED_VIDEO = 2
SANITY_TEST_CRASHED = 3
SANITY_TEST_LAST_VALUE = 4

sanity_test_pings = WindowsSubset.filter(lambda p: p.get('histograms/GRAPHICS_SANITY_TEST', None) is not None)
sanity_test_pings = sanity_test_pings.repartition(MaxPartitions)
sanity_test_pings = sanity_test_pings.cache()

# Aggregate the sanity test data.
SanityTestResults = sanity_test_pings.map(lambda p: p['histograms/GRAPHICS_SANITY_TEST']).reduce(lambda x, y: x + y)

sanity_test_by_vendor = None
sanity_test_by_os = None
sanity_test_by_device = None
sanity_test_by_driver = None
for value in xrange(SANITY_TEST_FAILED_RENDER, SANITY_TEST_LAST_VALUE):
    subset = sanity_test_pings.filter(lambda p: p['histograms/GRAPHICS_SANITY_TEST'][value] > 0)
    subset = subset.cache()

    tuples = subset.map(lambda p: (value, { p['vendorID']: int(p['histograms/GRAPHICS_SANITY_TEST'][value]) }))
    sanity_test_by_vendor = union_pipelines(sanity_test_by_vendor, tuples)
    
    tuples = subset.map(lambda p: (value, { p['OS']: int(p['histograms/GRAPHICS_SANITY_TEST'][value]) }))
    sanity_test_by_os = union_pipelines(sanity_test_by_os, tuples)
    
    tuples = subset.map(lambda p: (value, { p['deviceID']: int(p['histograms/GRAPHICS_SANITY_TEST'][value]) }))
    sanity_test_by_device = union_pipelines(sanity_test_by_device, tuples)
    
    tuples = subset.map(lambda p: (value, { p['driverShortVersion']: int(p['histograms/GRAPHICS_SANITY_TEST'][value]) }))
    sanity_test_by_driver = union_pipelines(sanity_test_by_driver, tuples)
    
SanityTestByVendor = sanity_test_by_vendor.reduceByKey(combiner)
SanityTestByOS = sanity_test_by_os.reduceByKey(combiner)
SanityTestByDevice = sanity_test_by_device.reduceByKey(combiner)
SanityTestByDriver = sanity_test_by_driver.reduceByKey(combiner)

In [10]:
# Helper for writing files.
def Export(obj, filename):
    with open(filename, 'w') as fp:
        json.dump(obj, fp)

In [11]:
# Start writing stuff out.
Export({
    'vendors': OSToVendor.collect(),
    'devices': OSToDevice.collect(),
}, 'os-statistics.json')
        
Export({
    'vendors': WindowsToVendor.collect(),
    'devices': WindowsToDevice.collect(),
    'driversByVersion': WindowsToDriver.collect(),
    'driverShare': DriverShare,
}, 'windows-statistics.json')
    
Export({
    'vendors': FxToVendor.collect(),
    'devices': FxToDevice.collect(),
}, 'fx-statistics.json')
    
Export({
    'os': OSShare,
    'windows': WindowsShare,
    'firefox': FxShare,
    'vendors': VendorShare,
    'devices': DeviceShare,
    'totalPings': pings.count(),
    'validPings': AllData.count(),
    'timeWindow': TimeWindow,
    'pingFraction': PingFraction,
}, 'general-statistics.json')


In [12]:
# Write TDR statistics.
Export({
    'tdrPings': TDRSubset.count(),
    'windowsPings': WindowsSubset.count(),
    'reasonToVendor': TDRReasonToVendor.collect(),
    'vendorToReason': TDRVendorToReason.collect(),
}, 'tdr-statistics.json')

In [13]:
# Write Sanity Test statistics.
Export({    
    'totalSessions': WindowsSubset.count(),
    'sanityTestPings': sanity_test_pings.count(),
    'results': [int(value) for value in SanityTestResults],
    'byVendor': SanityTestByVendor.collect(),
    'byOS': SanityTestByOS.collect(),
    'byDevice': coalesce_to_n_items(SanityTestByDevice.collect(), 10),
    'byDriver': coalesce_to_n_items(SanityTestByDriver.collect(), 10),
}, 'sanity-test-statistics.json')