In [1]:
import ujson as json
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

from moztelemetry.dataset import Dataset

%matplotlib inline

### Basics

We will use the Dataset API to fetch data.  Documentation can be found at: https://python-moztelemetry.readthedocs.io/en/stable/api.html#dataset

The goal of this example is to plot the startup distribution for each OS. Let's see how many parallel workers we have at our disposal:

In [2]:
sc.defaultParallelism

32

We can look at the schema of the dataset we are interested in:

In [3]:
Dataset.from_source('telemetry').schema

[u'submissionDate',
 u'sourceName',
 u'sourceVersion',
 u'docType',
 u'appName',
 u'appUpdateChannel',
 u'appVersion',
 u'appBuildId']

Let's create a Dataset of Telemetry submissions for a given submission date:

In [4]:
pings_dataset = (
    Dataset.from_source('telemetry')
    .where(docType='main')
    #.where(appBuildId='20180721100146')
    .where(submissionDate='20180828')
    .where(appUpdateChannel="release")
)

Select only the properties we need and then take a 10% sample:

In [5]:
pings = (
    pings_dataset
    .select(
        'clientId',
        buildId='application.buildId',
        experiments='environment.experiments',
        os='environment.system.os',
        gfx='environment.system.gfx')
    .records(sc, sample=0.01)
)

This 'sampling' is based on s3 files and is highly
susceptible to skew. Use only for quicker performance
while prototyping.
fetching 9648.72877MB in 1698 files...


In [6]:
#pings = (
#    pings_dataset
#    .records(sc, sample=0.01)
#)
#pings.take(1)

In [7]:
pings.count()

2448364

In [8]:
pings.take(4)

[{'buildId': u'20180807170231',
  'clientId': u'8f2bc685-96cb-4a44-b0b5-fb06b518ec8d',
  'experiments': {u'rollout-release-61-tls-fallback-1-3': {u'branch': u'active',
    u'type': u'normandy-prefrollout'},
   u'searchCohort': {u'branch': u'nov17-2'}},
  'gfx': {u'ContentBackend': u'Skia',
   u'D2DEnabled': None,
   u'DWriteEnabled': None,
   u'adapters': [{u'GPUActive': True,
     u'RAM': None,
     u'description': None,
     u'deviceID': u'0x1180',
     u'driver': None,
     u'driverDate': None,
     u'driverVersion': None,
     u'subsysID': None,
     u'vendorID': u'0x10de'}],
   u'features': {u'compositor': u'opengl',
    u'gpuProcess': {u'status': u'unused'}},
   u'monitors': [{u'scale': 1, u'screenHeight': 1440, u'screenWidth': 2560},
    {u'scale': 2, u'screenHeight': 1920, u'screenWidth': 1080}]},
  'os': {u'locale': u'en-US', u'name': u'Darwin', u'version': u'17.7.0'}},
 {'buildId': u'20180807170231',
  'clientId': u'3a9028f7-e8c1-4b09-a4f3-3e7d922fd651',
  'experiments': {u'r

In [9]:
# We add two extra steps. The first rewrites the ping to have some
# information more easily accessible (like the primary adapter),
# and the second step removes any pings that don't have adapter
# information.
def rewrite_ping(p):
    adapters = p.get('gfx', None).get('adapters', None)
    if not adapters:
        return None
    adapter = adapters[0]
            
    p['adapter'] = adapter
            
    # Convert the version to a tuple of integers.
    #if 'driverVersion' in adapter:
    #    p['driverVersion'] = [int(n) for n in adapter['driverVersion'].split('.') if n.isdigit()]
    return p

def filter_ping(p):
    return 'adapter' in p
rpings = pings.map(rewrite_ping).filter(filter_ping)
rpings = rpings.cache()
rpings.count()

2448364

To prevent pseudoreplication, let's consider only a single submission for each client. As this step requires a distributed shuffle, it should always be run only after extracting the attributes of interest with *Dataset.select()*.

In [10]:
subset = (
    rpings
    .map(lambda p: (p['clientId'], p))
    .reduceByKey(lambda p1, p2: p1)
    .map(lambda p: p[1])
)

Caching is fundamental as it allows for an iterative, real-time development workflow:

In [11]:
cached = subset.cache()

How many pings are we looking at?

In [12]:
cached.count()

2281814

In [13]:
wrQualified = cached.filter(lambda p: "features" in p["gfx"])
wrQualified = wrQualified.filter(lambda p: "wrQualified" in p["gfx"]["features"])
wrQualified.count()

2

In [14]:
wrQualified.map(lambda p: p["gfx"]["features"]["wrQualified"]["status"]).countByValue()

defaultdict(int, {u'blocked': 2})

In [15]:
wrQualified.map(lambda p: p['adapter']['vendorID']).countByValue()

defaultdict(int, {u'Intel Open Source Technology Center': 1, u'X.Org': 1})

In [16]:
wrQualified.map(lambda p: p["gfx"]["features"]["wrQualified"]["status"]).countByValue()

defaultdict(int, {u'blocked': 2})

In [17]:
wrQualified.map(lambda p: p["os"]["name"]).countByValue()

defaultdict(int, {u'Linux': 2})

In [18]:
wrAvailable = wrQualified.filter(lambda p: p["gfx"]["features"]["wrQualified"]["status"] == "available" )
wrAvailable.count()

0

In [19]:
wrBlocked = wrQualified.filter(lambda p: p["gfx"]["features"]["wrQualified"]["status"] == "blocked" )
wrBlocked.count()

2

In [20]:
100.*wrAvailable.count()/cached.count()

0.0

In [21]:
import json
import urllib2

gpu_db = json.load(urllib2.urlopen('https://raw.githubusercontent.com/jrmuizel/gpu-db/master/nvidia.json'))
devices = {}
for gen in gpu_db['10de'].items():
    for chipset in gen[1].items():
        for dev in chipset[1]:
            #print dev, gen[0]
            devices[int(dev,16)] = chipset[0]

In [23]:
nv = cached.filter(lambda p: p["adapter"]["vendorID"] == "0x10de")
nv.count()

360307

In [24]:
nv10 = nv.filter(lambda p: p["os"]["name"] == "Windows_NT" and p["os"]["version"] == "10.0")
nv10.count()

156249

In [25]:
nv10tesla = nv10.filter(lambda p: int(p["adapter"]["deviceID"],16) >= 0x6c0)
nv10tesla.count()

147484

In [26]:
nv10teslanom = nv10tesla.filter(lambda p: int(p["adapter"]["deviceID"],16) in devices).filter(lambda p: not devices[int(p["adapter"]["deviceID"],16)].endswith("M"))
nv10teslanom.count()

133872

In [27]:
nv10tesla.filter(lambda p: int(p["adapter"]["deviceID"],16) not in devices).map(lambda p: p['adapter']['deviceID']).countByValue()

defaultdict(int, {u'0x0f03': 1})

In [28]:
(nv10teslanom.count(),cached.count())

(133872, 2281814)

In [29]:
100.*nv10teslanom.count()/cached.count()

5.8669111505144595