### Longitudinal Dataset Tutorial

The longitudinal dataset is logically organized as a table where rows represent profiles and columns the various metrics (e.g. startup time). Each field of the table contains a list of values, one per Telemetry submission received for that profile.

The dataset is going to be regenerated from scratch every week, this allows us to apply non backward compatible changes to the schema and not worry about merging procedures. 

The current version of the longitudinal dataset has been build with all main pings received from 1% of profiles across all channels after mid November, which is shortly after Unified Telemetry landed. Future version will store up to 180 days of data.

In [None]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import plotly.plotly as py

%pylab inline

In [None]:
sc.defaultParallelism

The longitudinal dataset can be accessed as a Spark [DataFrame](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame), which is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python.

In [None]:
frame = sqlContext.sql("SELECT * FROM longitudinal")

Number of profiles:

In [None]:
frame.count()

The dataset contains all histograms but it doesn't yet include all metrics stored in the various sections of the pings. See the [code](https://github.com/vitillo/telemetry-batch-view/blob/longitudinal/src/main/scala/streams/Longitudinal.scala#L68) that generates the dataset for a complete list of supported metrics. More metrics are going to be included in future versions of the dataset, inclusion of specific metrics can be prioritized by filing a bug.

### Scalar metrics

A Spark bug is slowing down the *first* and *take* methods on a dataframe. A way around that for now is to first convert the dataframe to a rdd and then invoke *first* or *take*, e.g.:

In [None]:
first = frame.filter("normalized_channel = 'release'")\
    .select("build",
            "system", 
            "gc_ms",
            "fxa_configured",
            "browser_set_default_always_check",
            "browser_set_default_dialog_prompt_rawcount")\
    .rdd.first()

As mentioned earlier on, each field of the dataframe is an array containing one value per submission per client. The submissions are chronologically sorted.

In [None]:
len(first.build)

In [None]:
first.build[:5]

Different sections of the ping are stored in different fields of the dataframe. Refer to the schema of the dataframe for a complete layout.

In [None]:
first.system[0]

Dataframes support fields that can contain structs, maps, arrays, scalars and combination thereof. Note that in the previous example the system field is an array of [Rows](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.Row). You can think of a Row as a struct that allows each field to be accessed invididually. 

In [None]:
first.system[0].memory_mb

### Histograms

Not all profiles have all histograms. If a certain histogram, say GC_MS, is N/A for all submissions of a profile, then the field in the DataFrame will be N/A.

In [None]:
first.gc_ms == None

If at least one histogram is present in the history of a profile, then all other submission that do not have that histogram will be initialized with an empty histogram.

Flag and count "histograms" are represented as scalars.

In [None]:
first.fxa_configured[:5]

Boolean histograms are represented with an array of two integers. Similarly, enumerated histograms are represented with an array of N integers.

In [None]:
first.browser_set_default_always_check[:5]

Exponential and linear histograms are represented as a struct containing an array of N integers (*values* field) and the sum of the entries (*sum* field).

In [None]:
first.browser_set_default_dialog_prompt_rawcount[:5]

Keyed histograms are stored within a map from strings to values where the values depend on the histogram types and and have the same structure as mentioned above.

In [None]:
frame.select("search_counts").rdd.take(2)

### Queries

Note that the following queries are run on a single machine and have been annotated with their run-time.

##### Project a column with [select](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.select):

In [None]:
%time frame.select("system").rdd.first().system[:2]

###### Project a nested field:

In [None]:
%time frame.select("system.memory_mb").rdd.first()

###### Project a set of sql expressions with [selectExpr](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.DataFrame.selectExpr):

In [None]:
%time frame.selectExpr("size(system.memory_mb) as num_submissions").rdd.take(5)

In [None]:
%time frame.selectExpr("system_os.name[0] as os_name").rdd.take(5)

##### Filter profiles with where:

In [None]:
%time frame.selectExpr("system_os.name[0] as os_name").where("os_name = 'Darwin'").count()

Note that metrics that don't tend to change often can be "uplifted" from their nested structure for fast selection. One of such metrics is the operating system name. More metrics can be uplifed on request.

In [None]:
%time frame.select("os").where("os = 'Darwin'").count()

##### Transform to RDD

Dataframes can be transformed to RDDs that allow to easily apply user defined functions. In general it's worthwhile spending some time learning the Dataframe API as operations are optimized and run entirely in the JVM which can make queries faster.

In [None]:
rdd = frame.rdd

In [None]:
out = rdd.map(lambda x: x.search_counts).take(2)

##### Window functions

Select the earliest build-id with which a profile was seen using [window functions](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html):

In [None]:
from pyspark.sql.window import Window
from pyspark.sql import Row
import pyspark.sql.functions as func

In [None]:
subset = frame.selectExpr("client_id", "explode(build.build_id) as build_id")

The [explode](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#pyspark.sql.functions.explode) function returns a new row for each element in the given array or map. See the [documentation](https://spark.apache.org/docs/1.5.2/api/python/pyspark.sql.html#module-pyspark.sql.functions) for the complete list of functions supported by DataFrames.

In [None]:
window_spec = Window.partitionBy(subset["client_id"]).orderBy(subset["build_id"])

In [None]:
min_buildid = func.min(subset["build_id"]).over(window_spec)

In [None]:
%time subset.select("client_id", "build_id", min_buildid.alias("first_build_id")).count()

##### Count the number searches performed with yahoo from the urlbar

Note how individual keys can be accessed without any custom Python code.  

In [None]:
%time sensitive = frame.select("search_counts.`yahoo.urlbar`").map(lambda x: np.sum(x[0]) if x[0] else 0).sum()

And the same operation without custom Python:

In [None]:
%time sensitive = frame.selectExpr("explode(search_counts.`yahoo.urlbar`) as searches").agg({"searches": "sum"}).collect()

Exploding arrays seems not to be more efficient compared to custom Python code. That said, while RDD based analyses are likely not going to improve in terms of speed over time with new Spark releases, the same isn't true for DataFrame based ones.

##### Aggregate GC_MS histograms for all users with extended Telemetry enabled

In [None]:
%%time

def sum_array(x, y):    
    tmp = [0]*len(x)
    for i in range(len(x)):
        tmp[i] = x[i] + y[i]
    return tmp

histogram = frame.select("GC_MS", "settings.telemetry_enabled")\
    .where("telemetry_enabled[0] = True")\
    .flatMap(lambda x: [v.values for v in x.GC_MS] if x.GC_MS else [])\
    .reduce(lambda x, y: sum_array(x, y))

histogram

In [None]:
pd.Series(histogram).plot(kind="bar")

### Schema

In [None]:
frame.printSchema()