Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 23 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,44 @@ This is a Scala "framework" to build derived datasets, also known as [batch view

Raw JSON [pings](https://ci.mozilla.org/job/mozilla-central-docs/Tree_Documentation/toolkit/components/telemetry/telemetry/pings.html) are stored on S3 within files containing [framed Heka records](https://hekad.readthedocs.org/en/latest/message/index.html#stream-framing). Reading the raw data in through e.g. Spark can be slow as for a given analysis only a few fields are typically used; not to mention the cost of parsing the JSON blobs. Furthermore, Heka files might contain only a handful of records under certain circumstances.

Defining a derived [Parquet](https://parquet.apache.org/) dataset, which uses a columnar layout optimized for analytics workloads, can drastically improve the performance of analysis jobs while reducing the space requirements. A derived dataset might, and should, also perform heavy duty operations common to all analysis that are going to read from that dataset.
Defining a derived [Parquet](https://parquet.apache.org/) dataset, which uses a columnar layout optimized for analytics workloads, can drastically improve the performance of analysis jobs while reducing the space requirements. A derived dataset might, and should, also perform heavy duty operations common to all analysis that are going to read from that dataset (e.g., parsing dates into normalized timestamps).

The converted datasets are stored in the bucket specified in [*application.conf*](https://github.com/vitillo/aws-lambda-parquet/blob/master/src/main/resources/application.conf#L2).

### Adding a new derived dataset

See the [streams](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/streams) folder for the currently defined datasets.
See the [streams](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/streams) folder for `DerivedStream`-based datasets.

See the [views](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/views) folder for view-based datasets.

See the [docs](https://github.com/mozilla/telemetry-batch-view/tree/master/docs) folder for more information about the derived datasets.

### Local execution
Given a subtype of `DerivedStream` of type `MyStream`, a dataset for e.g. the 28th of October can be generated with:
```
sbt "run-main telemetry.DerivedStream --from-date 20151028 --to-date 20151028 MyStream"
```
### Development

### Distributed execution
Build an uber-jar with:
```
sbt assembly
```
then, submit the job with:
To set up a development environment, install [SBT](http://www.scala-sbt.org/) and [Spark](http://spark.apache.org/).

To run tests:
```bash
sbt test
```
spark-submit --master yarn-client --class telemetry.DerivedStream target/scala-2.10/telemetry-batch-view-X.Y.jar --from-date 20151028 --to-date 20151028 MyStream
```

### Running the test suite
### Generating Datasets

See the [documentation for specific streams](https://github.com/mozilla/telemetry-batch-view/tree/master/docs) for details about running/generating them.

To generate a `DerivedStream`-based dataset `MyStream` for October 28, 2015 to October 29, 2015:
```bash
sbt "run-main telemetry.DerivedStream --from-date 20151028 --to-date 20151029 MyStream"
```
sbt test

For distributed execution, we pack all of the classes together into a single JAR, and then submit it to be run with Spark:
```bash
sbt assembly
spark-submit --master yarn-client --class telemetry.DerivedStream target/scala-2.10/telemetry-batch-view-*.jar --from-date 20151028 --to-date 20151029 MyStream
```

### Caveats
If you run into memory issues during compilation time issue the following command before running sbt:
```
```bash
export JAVA_OPTIONS="-Xss128M -Xmx2048M"
```
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ lazy val root = (project in file(".")).
libraryDependencies += "com.typesafe" % "config" % "1.2.1",
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.4",
libraryDependencies += "org.xerial.snappy" % "snappy-java" % "1.1.2",
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11",
libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.10",
libraryDependencies += "joda-time" % "joda-time" % "2.9.1",
libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0" excludeAll(ExclusionRule(organization = "javax.servlet")),
libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" excludeAll(ExclusionRule(organization = "javax.servlet")),
Expand Down
126 changes: 126 additions & 0 deletions docs/CrashAggregateView.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
The Crash Aggregate View
========================

The crash aggregate view is generated by `src/main/scala/views/CrashAggregateView.scala`. It contains stats used for computing crash rates and other stability-related metrics, aggregated by a variety of criteria.

Essentially, each row of the view represents the stats for a particular population. For example, one row might represent crash stats and usage hours for pings from Firefox Beta 46 on 64-bit Windows 7, and so on.

To generate the dataset for April 10, 2016 to April 11, 2016:
```bash
sbt "run-main telemetry.views.CrashAggregateView --from 20160410 --to 20160411"
```

**Note:** Currently, due to [avro-parquet issues](https://issues.apache.org/jira/browse/HIVE-12828), Parquet writing only works under the `spark-submit` commands - the above example will fail. This will be fixed when avro-parquet updates or is removed.

For distributed execution, we can build a self-contained JAR file, then run it with Spark. To generate the dataset for April 10, 2016 to April 11, 2016:
```bash
sbt assembly
spark-submit --master yarn-client --class telemetry.views.CrashAggregateView target/scala-2.10/telemetry-batch-view-*.jar --from 20160410 --to 20160411
```

Notes:

* A Spark analysis runs daily using a [scheduled analysis job](https://analysis.telemetry.mozilla.org/cluster/schedule).
* The job, `crash-aggregates`, runs the `run_crash_aggregator.ipynb` Jupyter notebook, which downloads, installs, and runs the crash-rate-aggregates on the cluster.
* Currently, this job is running under :azhang's account every day at 1am UTC, with the default settings for everything else. The job is named `crash-aggregates`.
* The job uploads the resulting data to S3 as [Parquet](https://parquet.apache.org/)-serialized [DataFrames](https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrame.html), under prefixes of the form `crash-aggregates/v1/submission_date=(YYYY-MM-DD SUBMISSION DATE)/` in the `telemetry-parquet` bucket.
* Each of these prefixes is a partition. There is a partition for each submission date..
* We have [Presto](https://prestodb.io/) set up to query the data on S3 using Presto's SQL query engine.
* Currently, this instance is available at `ec2-54-218-5-112.us-west-2.compute.amazonaws.com`. The [provisioning files for the Presto instance](https://github.com/vitillo/emr-bootstrap-presto) are available as well.
* At the moment, new partitions must be imported using [parquet2hive](https://github.com/vitillo/parquet2hive). There's a temporary cron job set up on the instance to do the importing, which will [eventually be replaced with something better](https://bugzilla.mozilla.org/show_bug.cgi?id=1251648).
* The [re:dash](https://sql.telemetry.mozilla.org/dashboard/general) setup connects to Presto and allows users to make SQL queries, build dashboards, etc. with the crash aggregates data.
* A watchdog notebook, `crash-rate-aggregates-watchdog.ipynb`, can be run regularly to send out email alerts if the crash aggregator fails in any way to output the results on S3.
* Currently, this is running every day at 1am as a [scheduled analysis job](https://analysis.telemetry.mozilla.org/cluster/schedule) under :azhang's account.

Schemas and Making Queries
--------------------------

In the re:dash interface, make a new query, and set the data source to "Presto" if it isn't already. Now, we can make queries against the `crash_aggregates` table, which contains all of our crash data.

A query that computes crash rates for each channel (sorted by number of usage hours) would look like this:

```sql
SELECT dimensions['channel'] AS channel,
sum(stats['usage_hours']) AS usage_hours,
1000 * sum(stats['main_crashes']) / sum(stats['usage_hours']) AS main_crash_rate,
1000 * sum(stats['content_crashes']) / sum(stats['usage_hours']) AS content_crash_rate,
1000 * sum(stats['plugin_crashes']) / sum(stats['usage_hours']) AS plugin_crash_rate,
1000 * sum(stats['gmplugin_crashes']) / sum(stats['usage_hours']) AS gmplugin_crash_rate
FROM crash_aggregates
GROUP BY dimensions['channel']
ORDER BY -sum(stats['usage_hours'])
```

An aggregate in this context is a combined collection of Telemetry pings. An aggregate for Windows 7 64-bit Firefox on March 15, 2016 represents the stats for all pings that originate from Windows 7 64-bit Firefox, on March 15, 2016. Aggregates represent all the pings that meet that aggregate's criteria. The subpopulations represented by individual aggregates are always disjoint.

Presto has its own SQL engine, and therefore its own extensions to standard SQL. The SQL that Presto uses is documented [here](https://prestodb.io/docs/current/).

The `crash_aggregates` table has 4 commonly-used columns:

* `submission_date` is the date pings were submitted for a particular aggregate.
* For example, `select sum(stats['usage_hours']) from crash_aggregates where submission_date = '2016-03-15'` will give the total number of user hours represented by pings submitted on March 15, 2016.
* The dataset is partitioned by this field. Queries that limit the possible values of `submission_date` can run significantly faster.
* `activity_date` is the date pings were generated on the client for a particular aggregate.
* For example, `select sum(stats['usage_hours']) from crash_aggregates where activity_date = '2016-03-15'` will give the total number of user hours represented by pings generated on March 15, 2016.
* This can be several days before the pings are actually submitted, so it will always be before or on its corresponding `submission_date`.
* Therefore, queries that are sensitive to when measurements were taken on the client should prefer this field over `submission_date`.
* `dimensions` is a map of all the other dimensions that we currently care about. These fields include:
* `dimensions['build_version']` is the program version, like `46.0a1`.
* `dimensions['build_id']` is the YYYYMMDDhhmmss timestamp the program was built, like `20160123180541`. This is also known as the "build ID" or "buildid".
* `dimensions['channel']` is the channel, like `release` or `beta`.
* `dimensions['application']` is the program name, like `Firefox` or `Fennec`.
* `dimensions['os_name']` is the name of the OS the program is running on, like `Darwin` or `Windows_NT`.
* `dimensions['os_version']` is the version of the OS the program is running on.
* `dimensions['architecture']` is the architecture that the program was built for (not necessarily the one it is running on).
* `dimensions['country']` is the country code for the user (determined using geoIP), like `US` or `UK`.
* `dimensions['experiment_id']` is the identifier of the experiment being participated in, such as `e10s-beta46-noapz@experiments.mozilla.org`, or null if no experiment.
* `dimensions['experiment_branch']` is the branch of the experiment being participated in, such as `control` or `experiment`, or null if no experiment.
* `dimensions['e10s_enabled']` is whether E10S is enabled.
* `dimensions['e10s_cohort']` is the E10S cohort the user is part of, such as `control`, `test`, or `disqualified`.
* All of the above fields can potentially be blank, which means "not present". That means that in the actual pings, the corresponding fields were null.
* `stats` contains the aggregate values that we care about:
* `stats['usage_hours']` is the number of user-hours represented by the aggregate.
* `stats['main_crashes']` is the number of main process crashes represented by the aggregate (or just program crashes, in the non-E10S case).
* `stats['content_crashes']` is the number of content process crashes represented by the aggregate.
* `stats['plugin_crashes']` is the number of plugin process crashes represented by the aggregate.
* `stats['gmplugin_crashes']` is the number of Gecko media plugin (often abbreviated GMPlugin) process crashes represented by the aggregate.

Plugin process crashes per hour on Nightly for March 14:

```sql
SELECT sum(stats['plugin_crashes'] / sum(stats->>'usage_hours') FROM aggregates
WHERE dimensions->>'channel' = 'nightly' AND activity_date = '2016-03-14'
```

Main process crashes by build date and E10S cohort.

```sql
WITH channel_rates AS (
SELECT dimensions['build_id'] AS build_id,
SUM(stats['main_crashes']) AS main_crashes, -- total number of crashes
SUM(stats['usage_hours']) / 1000 AS usage_kilohours, -- thousand hours of usage
dimensions['e10s_cohort'] AS e10s_cohort -- e10s cohort
FROM crash_aggregates
WHERE dimensions['experiment_id'] is null -- not in an experiment
AND regexp_like(dimensions['build_id'], '^\d{14}$') -- validate build IDs
AND dimensions['build_id'] > '20160201000000' -- only in the date range that we care about
GROUP BY dimensions['build_id'], dimensions['e10s_cohort']
)
SELECT cast(parse_datetime(build_id, 'yyyyMMddHHmmss') as date) as build_id, -- program build date
usage_kilohours, -- thousands of usage hours
e10s_cohort, -- e10s cohort
main_crashes / usage_kilohours AS main_crash_rate -- crash rate being defined as crashes per thousand usage hours
FROM channel_rates
WHERE usage_kilohours > 100 -- only aggregates that have statistically significant usage hours
ORDER BY build_id ASC
```

Technical Details
-----------------

We ignore invalid pings in our processing. Invalid pings are defined as those that:

* The submission dates or activity dates are invalid or missing.
* The build ID is malformed.
* The `docType` field is missing or unknown.
* The ping is a main ping without usage hours or a crash ping with usage hours.
4 changes: 2 additions & 2 deletions src/main/scala/utils/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ object Utils{
val formatISO = org.joda.time.format.ISODateTimeFormat.dateTime()
formatISO.withZone(org.joda.time.DateTimeZone.UTC).print(
format.DateTimeFormat.forPattern("yyyyMMdd")
.withZone(org.joda.time.DateTimeZone.UTC)
.parseDateTime(YYYYMMDD.asInstanceOf[String])
.withZone(org.joda.time.DateTimeZone.UTC)
.parseDateTime(YYYYMMDD.asInstanceOf[String])
)
}

Expand Down
Loading