In [None]:
import datetime as dt
import pandas as pd
import ujson as json
from pyspark.sql.types import *

from moztelemetry import get_pings, get_pings_properties

%pylab inline

Take the set of pings, make sure we have actual clientIds and remove duplicate pings. We collect each unique ping.

In [None]:
def dedupe_pings(rdd):
    return rdd.filter(lambda p: p["meta/clientId"] is not None)\
              .map(lambda p: (p["meta/documentId"], p))\
              .reduceByKey(lambda x, y: x)\
              .map(lambda x: x[1])


Transform and sanitize the pings into arrays.

In [None]:
def transform(ping):
    # Should not be None since we filter those out.
    clientId = ping["meta/clientId"]

    profileDate = None
    profileDaynum = ping["environment/profile/creationDate"]
    if profileDaynum is not None:
        profileDate = (dt.date(1970, 1, 1) + dt.timedelta(int(profileDaynum))).isoformat()

    # Create date should already be in ISO format
    creationDate = ping["creationDate"]

    # Added via the ingestion process so should not be None.
    submissionDate = dt.datetime.strptime(ping["meta/submissionDate"], "%Y%m%d").date().isoformat()

    appVersion = ping["application/version"]
    osVersion = ping["environment/system/os/version"]
    if osVersion is not None:
        osVersion = int(osVersion)
    locale = ping["environment/settings/locale"]
    
    # Truncate to 32 characters
    defaultSearch = ping["environment/settings/defaultSearchEngine"]
    if defaultSearch is not None:
        defaultSearch = defaultSearch[0:32]

    # Build up the device string, truncating like we do in 'core' ping.
    device = ping["environment/system/device/manufacturer"]
    model = ping["environment/system/device/model"]
    if device is not None and model is not None:
        device = device[0:12] + "-" + model[0:19]

    xpcomABI = ping["application/xpcomAbi"]
    arch = "arm"
    if xpcomABI is not None and "x86" in xpcomABI:
        arch = "x86"

    return [clientId, profileDate, submissionDate, creationDate, appVersion, osVersion, locale, defaultSearch, device, arch]


Create a set of pings from "saved-session" to build a set of core client data. Output the data to CSV or Parquet.

This script is designed to loop over a range of days and output a single day for the given channels. Use explicit date ranges for backfilling, or now() - '1day' for automated runs.

In [None]:
channels = ["nightly", "aurora", "beta", "release"]

start = dt.datetime(2016, 1, 8) #now() - dt.timedelta(1)
end = dt.datetime(2016, 1, 21) #now() - dt.timedelta(1)

day = start
while day <= end:
    for channel in channels:
        print "channel: " + channel + ", date: " + day.strftime("%Y%m%d")

        pings = get_pings(sc, app="Fennec", channel=channel,
                          submission_date=(day.strftime("%Y%m%d"), day.strftime("%Y%m%d")),
                          build_id=("20100101000000", "99999999999999"),
                          fraction=1)

        subset = get_pings_properties(pings, ["meta/clientId",
                                              "meta/documentId",
                                              "meta/submissionDate",
                                              "creationDate",
                                              "application/version",
                                              "environment/system/os/version",
                                              "environment/profile/creationDate",
                                              "environment/settings/locale",
                                              "environment/settings/defaultSearchEngine",
                                              "environment/system/device/model",
                                              "environment/system/device/manufacturer",
                                              "application/xpcomAbi"])

        subset = dedupe_pings(subset)
        #print subset.first()

        transformed = subset.map(transform)
        #print transformed.first()

        #grouped = pd.DataFrame(transformed.collect(), columns=["clientid", "profiledate", "submissiondate", "creationdate", "appversion", "osversion", "locale", "defaultsearch", "device", "arch"])
        #!mkdir -p ./output
        #grouped.to_csv("./output/android-clients-" + channel + "-" + day.strftime("%Y%m%d") + ".csv", index=False, encoding="utf-8")

        s3_output = "s3n://net-mozaws-prod-us-west-2-pipeline-analysis/mobile/android_clients"
        s3_output += "/v1/channel=" + channel + "/submission=" + day.strftime("%Y%m%d") 
        schema = StructType([
            StructField("clientid", StringType(), False),
            StructField("profiledate", StringType(), True),
            StructField("submissiondate", StringType(), False),
            StructField("creationdate", StringType(), True),
            StructField("appversion", StringType(), True),
            StructField("osversion", IntegerType(), True),
            StructField("locale", StringType(), True),
            StructField("defaultsearch", StringType(), True),
            StructField("device", StringType(), True),
            StructField("arch", StringType(), True)
        ])
        grouped = sqlContext.createDataFrame(transformed, schema)
        grouped.write.parquet(s3_output)

    day += dt.timedelta(1)
