diff --git a/README.md b/README.md index a0f00309..fc893744 100644 --- a/README.md +++ b/README.md @@ -7,39 +7,44 @@ This is a Scala "framework" to build derived datasets, also known as [batch view Raw JSON [pings](https://ci.mozilla.org/job/mozilla-central-docs/Tree_Documentation/toolkit/components/telemetry/telemetry/pings.html) are stored on S3 within files containing [framed Heka records](https://hekad.readthedocs.org/en/latest/message/index.html#stream-framing). Reading the raw data in through e.g. Spark can be slow as for a given analysis only a few fields are typically used; not to mention the cost of parsing the JSON blobs. Furthermore, Heka files might contain only a handful of records under certain circumstances. -Defining a derived [Parquet](https://parquet.apache.org/) dataset, which uses a columnar layout optimized for analytics workloads, can drastically improve the performance of analysis jobs while reducing the space requirements. A derived dataset might, and should, also perform heavy duty operations common to all analysis that are going to read from that dataset. +Defining a derived [Parquet](https://parquet.apache.org/) dataset, which uses a columnar layout optimized for analytics workloads, can drastically improve the performance of analysis jobs while reducing the space requirements. A derived dataset might, and should, also perform heavy duty operations common to all analysis that are going to read from that dataset (e.g., parsing dates into normalized timestamps). The converted datasets are stored in the bucket specified in [*application.conf*](https://github.com/vitillo/aws-lambda-parquet/blob/master/src/main/resources/application.conf#L2). ### Adding a new derived dataset -See the [streams](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/streams) folder for the currently defined datasets. +See the [streams](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/streams) folder for `DerivedStream`-based datasets. + +See the [views](https://github.com/mozilla/telemetry-batch-view/tree/master/src/main/scala/views) folder for view-based datasets. See the [docs](https://github.com/mozilla/telemetry-batch-view/tree/master/docs) folder for more information about the derived datasets. -### Local execution -Given a subtype of `DerivedStream` of type `MyStream`, a dataset for e.g. the 28th of October can be generated with: -``` -sbt "run-main telemetry.DerivedStream --from-date 20151028 --to-date 20151028 MyStream" -``` +### Development -### Distributed execution -Build an uber-jar with: -``` -sbt assembly -``` -then, submit the job with: +To set up a development environment, install [SBT](http://www.scala-sbt.org/) and [Spark](http://spark.apache.org/). + +To run tests: +```bash +sbt test ``` -spark-submit --master yarn-client --class telemetry.DerivedStream target/scala-2.10/telemetry-batch-view-X.Y.jar --from-date 20151028 --to-date 20151028 MyStream -``` -### Running the test suite +### Generating Datasets + +See the [documentation for specific streams](https://github.com/mozilla/telemetry-batch-view/tree/master/docs) for details about running/generating them. + +To generate a `DerivedStream`-based dataset `MyStream` for October 28, 2015 to October 29, 2015: +```bash +sbt "run-main telemetry.DerivedStream --from-date 20151028 --to-date 20151029 MyStream" ``` -sbt test + +For distributed execution, we pack all of the classes together into a single JAR, and then submit it to be run with Spark: +```bash +sbt assembly +spark-submit --master yarn-client --class telemetry.DerivedStream target/scala-2.10/telemetry-batch-view-*.jar --from-date 20151028 --to-date 20151029 MyStream ``` ### Caveats If you run into memory issues during compilation time issue the following command before running sbt: -``` +```bash export JAVA_OPTIONS="-Xss128M -Xmx2048M" ``` diff --git a/build.sbt b/build.sbt index 6bd3834b..cf159f9e 100644 --- a/build.sbt +++ b/build.sbt @@ -20,7 +20,7 @@ lazy val root = (project in file(".")). libraryDependencies += "com.typesafe" % "config" % "1.2.1", libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.4", libraryDependencies += "org.xerial.snappy" % "snappy-java" % "1.1.2", - libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.11", + libraryDependencies += "org.json4s" %% "json4s-jackson" % "3.2.10", libraryDependencies += "joda-time" % "joda-time" % "2.9.1", libraryDependencies += "org.apache.hadoop" % "hadoop-client" % "2.6.0" excludeAll(ExclusionRule(organization = "javax.servlet")), libraryDependencies += "org.apache.hadoop" % "hadoop-aws" % "2.6.0" excludeAll(ExclusionRule(organization = "javax.servlet")), diff --git a/docs/CrashAggregateView.md b/docs/CrashAggregateView.md new file mode 100644 index 00000000..168aab54 --- /dev/null +++ b/docs/CrashAggregateView.md @@ -0,0 +1,126 @@ +The Crash Aggregate View +======================== + +The crash aggregate view is generated by `src/main/scala/views/CrashAggregateView.scala`. It contains stats used for computing crash rates and other stability-related metrics, aggregated by a variety of criteria. + +Essentially, each row of the view represents the stats for a particular population. For example, one row might represent crash stats and usage hours for pings from Firefox Beta 46 on 64-bit Windows 7, and so on. + +To generate the dataset for April 10, 2016 to April 11, 2016: +```bash +sbt "run-main telemetry.views.CrashAggregateView --from 20160410 --to 20160411" +``` + +**Note:** Currently, due to [avro-parquet issues](https://issues.apache.org/jira/browse/HIVE-12828), Parquet writing only works under the `spark-submit` commands - the above example will fail. This will be fixed when avro-parquet updates or is removed. + +For distributed execution, we can build a self-contained JAR file, then run it with Spark. To generate the dataset for April 10, 2016 to April 11, 2016: +```bash +sbt assembly +spark-submit --master yarn-client --class telemetry.views.CrashAggregateView target/scala-2.10/telemetry-batch-view-*.jar --from 20160410 --to 20160411 +``` + +Notes: + +* A Spark analysis runs daily using a [scheduled analysis job](https://analysis.telemetry.mozilla.org/cluster/schedule). + * The job, `crash-aggregates`, runs the `run_crash_aggregator.ipynb` Jupyter notebook, which downloads, installs, and runs the crash-rate-aggregates on the cluster. + * Currently, this job is running under :azhang's account every day at 1am UTC, with the default settings for everything else. The job is named `crash-aggregates`. +* The job uploads the resulting data to S3 as [Parquet](https://parquet.apache.org/)-serialized [DataFrames](https://spark.apache.org/docs/1.6.0/api/java/org/apache/spark/sql/DataFrame.html), under prefixes of the form `crash-aggregates/v1/submission_date=(YYYY-MM-DD SUBMISSION DATE)/` in the `telemetry-parquet` bucket. + * Each of these prefixes is a partition. There is a partition for each submission date.. +* We have [Presto](https://prestodb.io/) set up to query the data on S3 using Presto's SQL query engine. + * Currently, this instance is available at `ec2-54-218-5-112.us-west-2.compute.amazonaws.com`. The [provisioning files for the Presto instance](https://github.com/vitillo/emr-bootstrap-presto) are available as well. + * At the moment, new partitions must be imported using [parquet2hive](https://github.com/vitillo/parquet2hive). There's a temporary cron job set up on the instance to do the importing, which will [eventually be replaced with something better](https://bugzilla.mozilla.org/show_bug.cgi?id=1251648). +* The [re:dash](https://sql.telemetry.mozilla.org/dashboard/general) setup connects to Presto and allows users to make SQL queries, build dashboards, etc. with the crash aggregates data. +* A watchdog notebook, `crash-rate-aggregates-watchdog.ipynb`, can be run regularly to send out email alerts if the crash aggregator fails in any way to output the results on S3. + * Currently, this is running every day at 1am as a [scheduled analysis job](https://analysis.telemetry.mozilla.org/cluster/schedule) under :azhang's account. + +Schemas and Making Queries +-------------------------- + +In the re:dash interface, make a new query, and set the data source to "Presto" if it isn't already. Now, we can make queries against the `crash_aggregates` table, which contains all of our crash data. + +A query that computes crash rates for each channel (sorted by number of usage hours) would look like this: + +```sql +SELECT dimensions['channel'] AS channel, + sum(stats['usage_hours']) AS usage_hours, + 1000 * sum(stats['main_crashes']) / sum(stats['usage_hours']) AS main_crash_rate, + 1000 * sum(stats['content_crashes']) / sum(stats['usage_hours']) AS content_crash_rate, + 1000 * sum(stats['plugin_crashes']) / sum(stats['usage_hours']) AS plugin_crash_rate, + 1000 * sum(stats['gmplugin_crashes']) / sum(stats['usage_hours']) AS gmplugin_crash_rate +FROM crash_aggregates +GROUP BY dimensions['channel'] +ORDER BY -sum(stats['usage_hours']) +``` + +An aggregate in this context is a combined collection of Telemetry pings. An aggregate for Windows 7 64-bit Firefox on March 15, 2016 represents the stats for all pings that originate from Windows 7 64-bit Firefox, on March 15, 2016. Aggregates represent all the pings that meet that aggregate's criteria. The subpopulations represented by individual aggregates are always disjoint. + +Presto has its own SQL engine, and therefore its own extensions to standard SQL. The SQL that Presto uses is documented [here](https://prestodb.io/docs/current/). + +The `crash_aggregates` table has 4 commonly-used columns: + +* `submission_date` is the date pings were submitted for a particular aggregate. + * For example, `select sum(stats['usage_hours']) from crash_aggregates where submission_date = '2016-03-15'` will give the total number of user hours represented by pings submitted on March 15, 2016. + * The dataset is partitioned by this field. Queries that limit the possible values of `submission_date` can run significantly faster. +* `activity_date` is the date pings were generated on the client for a particular aggregate. + * For example, `select sum(stats['usage_hours']) from crash_aggregates where activity_date = '2016-03-15'` will give the total number of user hours represented by pings generated on March 15, 2016. + * This can be several days before the pings are actually submitted, so it will always be before or on its corresponding `submission_date`. + * Therefore, queries that are sensitive to when measurements were taken on the client should prefer this field over `submission_date`. +* `dimensions` is a map of all the other dimensions that we currently care about. These fields include: + * `dimensions['build_version']` is the program version, like `46.0a1`. + * `dimensions['build_id']` is the YYYYMMDDhhmmss timestamp the program was built, like `20160123180541`. This is also known as the "build ID" or "buildid". + * `dimensions['channel']` is the channel, like `release` or `beta`. + * `dimensions['application']` is the program name, like `Firefox` or `Fennec`. + * `dimensions['os_name']` is the name of the OS the program is running on, like `Darwin` or `Windows_NT`. + * `dimensions['os_version']` is the version of the OS the program is running on. + * `dimensions['architecture']` is the architecture that the program was built for (not necessarily the one it is running on). + * `dimensions['country']` is the country code for the user (determined using geoIP), like `US` or `UK`. + * `dimensions['experiment_id']` is the identifier of the experiment being participated in, such as `e10s-beta46-noapz@experiments.mozilla.org`, or null if no experiment. + * `dimensions['experiment_branch']` is the branch of the experiment being participated in, such as `control` or `experiment`, or null if no experiment. + * `dimensions['e10s_enabled']` is whether E10S is enabled. + * `dimensions['e10s_cohort']` is the E10S cohort the user is part of, such as `control`, `test`, or `disqualified`. + * All of the above fields can potentially be blank, which means "not present". That means that in the actual pings, the corresponding fields were null. +* `stats` contains the aggregate values that we care about: + * `stats['usage_hours']` is the number of user-hours represented by the aggregate. + * `stats['main_crashes']` is the number of main process crashes represented by the aggregate (or just program crashes, in the non-E10S case). + * `stats['content_crashes']` is the number of content process crashes represented by the aggregate. + * `stats['plugin_crashes']` is the number of plugin process crashes represented by the aggregate. + * `stats['gmplugin_crashes']` is the number of Gecko media plugin (often abbreviated GMPlugin) process crashes represented by the aggregate. + +Plugin process crashes per hour on Nightly for March 14: + +```sql +SELECT sum(stats['plugin_crashes'] / sum(stats->>'usage_hours') FROM aggregates +WHERE dimensions->>'channel' = 'nightly' AND activity_date = '2016-03-14' +``` + +Main process crashes by build date and E10S cohort. + +```sql +WITH channel_rates AS ( + SELECT dimensions['build_id'] AS build_id, + SUM(stats['main_crashes']) AS main_crashes, -- total number of crashes + SUM(stats['usage_hours']) / 1000 AS usage_kilohours, -- thousand hours of usage + dimensions['e10s_cohort'] AS e10s_cohort -- e10s cohort + FROM crash_aggregates + WHERE dimensions['experiment_id'] is null -- not in an experiment + AND regexp_like(dimensions['build_id'], '^\d{14}$') -- validate build IDs + AND dimensions['build_id'] > '20160201000000' -- only in the date range that we care about + GROUP BY dimensions['build_id'], dimensions['e10s_cohort'] +) +SELECT cast(parse_datetime(build_id, 'yyyyMMddHHmmss') as date) as build_id, -- program build date + usage_kilohours, -- thousands of usage hours + e10s_cohort, -- e10s cohort + main_crashes / usage_kilohours AS main_crash_rate -- crash rate being defined as crashes per thousand usage hours +FROM channel_rates +WHERE usage_kilohours > 100 -- only aggregates that have statistically significant usage hours +ORDER BY build_id ASC +``` + +Technical Details +----------------- + +We ignore invalid pings in our processing. Invalid pings are defined as those that: + +* The submission dates or activity dates are invalid or missing. +* The build ID is malformed. +* The `docType` field is missing or unknown. +* The ping is a main ping without usage hours or a crash ping with usage hours. diff --git a/src/main/scala/utils/Utils.scala b/src/main/scala/utils/Utils.scala index 4142fb37..426e415f 100644 --- a/src/main/scala/utils/Utils.scala +++ b/src/main/scala/utils/Utils.scala @@ -66,8 +66,8 @@ object Utils{ val formatISO = org.joda.time.format.ISODateTimeFormat.dateTime() formatISO.withZone(org.joda.time.DateTimeZone.UTC).print( format.DateTimeFormat.forPattern("yyyyMMdd") - .withZone(org.joda.time.DateTimeZone.UTC) - .parseDateTime(YYYYMMDD.asInstanceOf[String]) + .withZone(org.joda.time.DateTimeZone.UTC) + .parseDateTime(YYYYMMDD.asInstanceOf[String]) ) } diff --git a/src/main/scala/views/CrashAggregateView.scala b/src/main/scala/views/CrashAggregateView.scala new file mode 100644 index 00000000..5d0709a1 --- /dev/null +++ b/src/main/scala/views/CrashAggregateView.scala @@ -0,0 +1,333 @@ +package telemetry.views + +import awscala.s3._ +import com.typesafe.config._ +import org.apache.spark.sql.{Row, SQLContext, SaveMode} +import org.apache.spark.sql.types._ +import org.apache.spark.{SparkConf, SparkContext, Accumulator} +import org.apache.spark.rdd.RDD +import scala.io.Source +import org.json4s._ +import org.json4s.jackson.JsonMethods._ +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import scala.math.{max, abs} +import telemetry.heka.{HekaFrame, Message} +import telemetry.utils.Utils +import org.joda.time._ +import org.rogach.scallop._ + +object CrashAggregateView { + class Conf(args: Array[String]) extends ScallopConf(args) { + val from = opt[String]("from", descr = "From submission date", required = false) + val to = opt[String]("to", descr = "To submission date", required = false) + verify() + } + + def main(args: Array[String]) { + // load configuration for the time range + val conf = new Conf(args) + val fmt = format.DateTimeFormat.forPattern("yyyyMMdd") + val to = conf.to.get match { + case Some(t) => fmt.parseDateTime(t) + case _ => DateTime.now.minusDays(1) + } + val from = conf.from.get match { + case Some(f) => fmt.parseDateTime(f) + case _ => DateTime.now.minusDays(1) + } + + // set up Spark + val sparkConf = new SparkConf().setAppName("CrashAggregateVie") + sparkConf.setMaster(sparkConf.get("spark.master", "local[*]")) + val sc = new SparkContext(sparkConf) + val sqlContext = new SQLContext(sc) + val hadoopConf = sc.hadoopConfiguration + hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") + val appConf = ConfigFactory.load() + val parquetBucket = appConf.getString("app.parquetBucket") + + for (offset <- 0 to Days.daysBetween(from, to).getDays()) { + val currentDate = from.plusDays(offset) + val currentDateString = currentDate.toString("yyyy-MM-dd") + + // obtain the crash aggregates from telemetry ping data + val messages = getRecords(sc, currentDate, "crash").union(getRecords(sc, currentDate, "main")) + val (rowRDD, main_processed, main_ignored, crash_processed, crash_ignored) = compareCrashes(sc, messages) + + // create a dataframe containing all the crash aggregates + val schema = buildSchema() + val records = sqlContext.createDataFrame(rowRDD.coalesce(1), schema) + + // upload the resulting aggregate Spark records to S3 + records.write.mode(SaveMode.Overwrite).parquet(s"s3://$parquetBucket/crash_aggregates/v1/submission_date=$currentDateString") + + println("=======================================================================================") + println(s"JOB COMPLETED SUCCESSFULLY FOR $currentDate") + println(s"${main_processed.value} main pings processed, ${main_ignored.value} pings ignored") + println(s"${crash_processed.value} crash pings processed, ${crash_ignored.value} pings ignored") + println("=======================================================================================") + } + } + + implicit lazy val s3: S3 = S3() + private def listS3Keys(bucket: Bucket, prefix: String, delimiter: String = "/"): Stream[String] = { + import com.amazonaws.services.s3.model.{ ListObjectsRequest, ObjectListing } + + val request = new ListObjectsRequest().withBucketName(bucket.getName).withPrefix(prefix).withDelimiter(delimiter) + val firstListing = s3.listObjects(request) + + def completeStream(listing: ObjectListing): Stream[String] = { + val prefixes = listing.getCommonPrefixes.asScala.toStream + prefixes #::: (if (listing.isTruncated) completeStream(s3.listNextBatchOfObjects(listing)) else Stream.empty) + } + + completeStream(firstListing) + } + private def matchingPrefixes(bucket: Bucket, seenPrefixes: Stream[String], pattern: List[String]): Stream[String] = { + if (pattern.isEmpty) { + seenPrefixes + } else { + val matching = seenPrefixes + .flatMap(prefix => listS3Keys(bucket, prefix)) + .filter(prefix => (pattern.head == "*" || prefix.endsWith(pattern.head + "/"))) + matchingPrefixes(bucket, matching, pattern.tail) + } + } + private def getRecords(sc: SparkContext, submissionDate: DateTime, docType: String): RDD[Map[String, Any]] = { + // obtain the prefix of the telemetry data source + val metadataBucket = Bucket("net-mozaws-prod-us-west-2-pipeline-metadata") + val Some(sourcesObj) = metadataBucket.get("sources.json") + val metaSources = parse(Source.fromInputStream(sourcesObj.getObjectContent()).getLines().mkString("\n")) + val JString(telemetryPrefix) = metaSources \\ "telemetry" \\ "prefix" + + // get a stream of object summaries that match the desired criteria + val bucket = Bucket("net-mozaws-prod-us-west-2-pipeline-data") + val summaries = matchingPrefixes( + bucket, + List("").toStream, + List(telemetryPrefix, submissionDate.toString("yyyyMMdd"), "telemetry", "4", docType) + ).flatMap(prefix => s3.objectSummaries(bucket, prefix)) + + // output the messages as heka ping maps + sc.parallelize(summaries).flatMap(summary => { + val key = summary.getKey() + val hekaFile = bucket.getObject(key).getOrElse(throw new Exception(s"Key is missing on S3: $key")) + for (message <- HekaFrame.parse(hekaFile.getObjectContent(), hekaFile.getKey())) + yield HekaFrame.fields(message) + }) + } + + // paths/dimensions within the ping to compare by + // if the path only has a single element, then the field is interpreted as a literal string rather than a JSON string + val comparableDimensions = List( + List("environment.build", "version"), + List("environment.build", "buildId"), + List("normalizedChannel"), + List("appName"), + List("environment.system", "os", "name"), + List("environment.system", "os", "version"), + List("environment.build", "architecture"), + List("geoCountry"), + List("environment.addons", "activeExperiment", "id"), + List("environment.addons", "activeExperiment", "branch"), + List("environment.settings", "e10sEnabled"), + List("environment.settings", "e10sCohort") + ) + + // names of the comparable dimensions above, used as dimension names in the database + val dimensionNames = List( + "build_version", + "build_id", + "channel", + "application", + "os_name", + "os_version", + "architecture", + "country", + "experiment_id", + "experiment_branch", + "e10s_enabled", + "e10s_cohort" + ) + + val statsNames = List( + "ping_count", + "usage_hours", "main_crashes", "content_crashes", + "plugin_crashes", "gmplugin_crashes", + "usage_hours_squared", "main_crashes_squared", "content_crashes_squared", + "plugin_crashes_squared", "gmplugin_crashes_squared" + ) + + private def getCountHistogramValue(histogram: JValue): Double = { + try { + histogram \ "values" \ "0" match { + case JInt(count) => count.toDouble + case _ => 0 + } + } catch { case _: Throwable => 0 } + } + + def compareCrashes(sc: SparkContext, messages: RDD[Map[String, Any]]): (RDD[Row], Accumulator[Int], Accumulator[Int], Accumulator[Int], Accumulator[Int]) = { + // get the crash pairs for all of the pings, keeping track of how many we see + val mainProcessedAccumulator = sc.accumulator(0, "Number of processed main pings") + val mainIgnoredAccumulator = sc.accumulator(0, "Number of ignored main pings") + val crashProcessedAccumulator = sc.accumulator(0, "Number of processed crash pings") + val crashIgnoredAccumulator = sc.accumulator(0, "Number of ignored crash pings") + val crashPairs = messages.flatMap((pingFields) => { + getCrashPair(pingFields) match { + case Some(crashPair) => { + pingFields.get("docType") match { + case Some("crash") => crashProcessedAccumulator += 1 + case Some("main") => mainProcessedAccumulator += 1 + case _ => null + } + List(crashPair) + } + case None => { + pingFields.get("docType") match { + case Some("crash") => crashIgnoredAccumulator += 1 + case Some("main") => mainIgnoredAccumulator += 1 + case _ => null + } + List() + } + } + }) + + // aggregate crash pairs by their keys + val aggregates = crashPairs.reduceByKey( + (crashStatsA: List[Double], crashStatsB: List[Double]) => + (crashStatsA, crashStatsB).zipped.map(_ + _) + ) + + val records = aggregates.map((aggregatedCrashPair: (List[Any], List[Double])) => { + // extract and compute the record fields + val (uniqueKey, stats) = aggregatedCrashPair + val (activityDate, dimensions) = (uniqueKey.head.asInstanceOf[String], uniqueKey.tail.asInstanceOf[List[Option[String]]]) + val dimensionsMap: Map[String, String] = (dimensionNames, dimensions).zipped.flatMap((key, value) => + (key, value) match { // remove dimensions that don't have values + case (key, Some(value)) => Some(key, value) + case (key, None) => None + } + ).toMap + val statsMap = (statsNames, stats).zipped.toMap + + Row(activityDate, mapAsJavaMap(dimensionsMap), mapAsJavaMap(statsMap)) + }) + + (records, mainProcessedAccumulator, mainIgnoredAccumulator, crashProcessedAccumulator, crashIgnoredAccumulator) + } + + private def getCrashPair(pingFields: Map[String, Any]): Option[(List[java.io.Serializable], List[Double])] = { + val build = pingFields.get("environment.build") match { + case Some(value: String) => parse(value) + case _ => JObject() + } + val info = pingFields.get("payload.info") match { + case Some(value: String) => parse(value) + case _ => JObject() + } + val keyedHistograms = pingFields.get("payload.keyedHistograms") match { + case Some(value: String) => parse(value) + case _ => JObject() + } + + // obtain the activity date clamped to a reasonable time range + val submissionDate = pingFields.get("submissionDate") match { + case Some(date: String) => + // convert YYYYMMDD timestamp to a real date + try { + format.DateTimeFormat.forPattern("yyyyMMdd").withZone(org.joda.time.DateTimeZone.UTC).parseDateTime(date) + } catch { + case _: Throwable => return None + } + case _ => return None + } + val activityDateRaw = pingFields.get("creationTimestamp") match { + case Some(date: Double) => { + // convert nanosecond timestamp to a second timestamp to a real date + try { + new DateTime((date / 1e6).toLong).withZone(org.joda.time.DateTimeZone.UTC).withMillisOfDay(0) // only keep the date part of the timestamp + } catch { + case _: Throwable => return None + } + } + case _ => return None + } + val activityDate = if (activityDateRaw.isBefore(submissionDate.minusDays(7))) { // clamp activity date to a good range + submissionDate.minusDays(7) + } else if (activityDateRaw.isAfter(submissionDate)) { + submissionDate + } else { + activityDateRaw + } + val activityDateString = format.DateTimeFormat.forPattern("yyyy-MM-dd").print(activityDate) // format activity date as YYYY-MM-DD + + // obtain the unique key of the aggregate that this ping belongs to + val uniqueKey = activityDateString :: ( + for (path <- comparableDimensions) yield { + pingFields.get(path.head) match { + case Some(topLevelField: String) => + if (path.tail == List.empty) { // list of length 1, interpret field as string rather than JSON + Some(topLevelField) + } else { // JSON field, the rest of the path tells us where to look in the JSON + val dimensionValue = path.tail.foldLeft(parse(topLevelField))((value, fieldName) => value \ fieldName) // retrieve the value at the given path + dimensionValue match { + case JString(value) => Some(value) + case JBool(value) => Some(if (value) "True" else "False") + case JInt(value) => Some(value.toString) + case _ => None + } + } + case _ => None + } + } + ) + + // validate build IDs + val buildId = uniqueKey(dimensionNames.indexOf("build_id") + 1) // we add 1 because the first element is taken by activityDateString + buildId match { + case Some(value: String) if value.matches("\\d{14}") => null + case _ => return None + } + + // obtain the relevant stats for the ping + val isMainPing = pingFields.get("docType") match { + case Some("main") => true + case Some("crash") => false + case _ => return None + } + val usageHours: Double = info \ "subsessionLength" match { + case JInt(subsessionLength) if isMainPing => // main ping, which should always have the subsession length field + Math.min(25, Math.max(0, subsessionLength.toDouble / 3600)) + case JNothing if !isMainPing => 0 // crash ping, which shouldn't have the subsession length field + case _ => return None // invalid ping - main ping without subsession length or crash ping with subsession length + } + val mainCrashes = if (isMainPing) 0 else 1 // if this is a crash ping, it represents one main process crash + val contentCrashes: Double = getCountHistogramValue(keyedHistograms \ "SUBPROCESS_CRASHES_WITH_DUMP" \ "content") + val pluginCrashes: Double = getCountHistogramValue(keyedHistograms \ "SUBPROCESS_CRASHES_WITH_DUMP" \ "plugin") + val geckoMediaPluginCrashes: Double = getCountHistogramValue(keyedHistograms \ "SUBPROCESS_CRASHES_WITH_DUMP" \ "gmplugin") + val stats = List( + if (isMainPing) 1 else 0, // number of pings represented by the aggregate + usageHours, mainCrashes, contentCrashes, + pluginCrashes, geckoMediaPluginCrashes, + + // squared versions in order to compute stddev (with $$\sigma = \sqrt{\frac{\sum X^2}{N} - \mu^2}$$) + usageHours * usageHours, mainCrashes * mainCrashes, contentCrashes * contentCrashes, + pluginCrashes * pluginCrashes, geckoMediaPluginCrashes * geckoMediaPluginCrashes + ) + + // return a pair so we can use PairRDD operations on this data later + Some((uniqueKey, stats)) + } + + def buildSchema(): StructType = { + StructType( + StructField("activity_date", StringType, false) :: + StructField("dimensions", MapType(StringType, StringType, true), false) :: + StructField("stats", MapType(StringType, DoubleType, true), false) :: + Nil + ) + } +} diff --git a/src/test/scala/ClientCountViewTest.scala b/src/test/scala/ClientCountViewTest.scala index eed05005..84067f6b 100644 --- a/src/test/scala/ClientCountViewTest.scala +++ b/src/test/scala/ClientCountViewTest.scala @@ -99,5 +99,7 @@ class ClientCountViewTest extends FlatSpec with Matchers{ count.size should be (1) count(0)(0) should be (Submission.dimensions("client_id").filter(x => x != null).size) + + sc.stop() } } diff --git a/src/test/scala/CrashAggregateViewTest.scala b/src/test/scala/CrashAggregateViewTest.scala new file mode 100644 index 00000000..3e8ba345 --- /dev/null +++ b/src/test/scala/CrashAggregateViewTest.scala @@ -0,0 +1,198 @@ +package telemetry.test + +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ +import org.apache.spark.{SparkConf, SparkContext, Accumulator} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.types._ +import org.scalatest.{FlatSpec, Matchers, PrivateMethodTester, BeforeAndAfterAll} +import telemetry.views.CrashAggregateView +import org.apache.avro.{Schema, SchemaBuilder} +import org.apache.avro.generic.{GenericRecord, GenericData, GenericRecordBuilder} +import org.apache.avro.generic.GenericData.Record +import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ +import telemetry.parquet.ParquetFile + +class CrashAggregateViewTest extends FlatSpec with Matchers with BeforeAndAfterAll { + val pingDimensions = List( + ("submission_date", List("20160305", "20160607")), + ("activity_date", List(1456906203503000000.0, 1464768617492000000.0)), + ("application", List("Firefox", "Fennec")), + ("doc_type", List("main", "crash")), + ("channel", List("nightly", "aurora")), + ("build_version", List("45.0a1", "45")), + ("build_id", List("20160301000000", "20160302000000")), + ("os_name", List("Linux", "Windows_NT")), + ("os_version", List("6.1", "3.1.12")), + ("architecture", List("x86", "x86-64")), + ("country", List("US", "UK")), + ("experiment_id", List(null, "displayport-tuning-nightly@experiments.mozilla.org")), + ("experiment_branch", List("control", "experiment")), + ("e10s", List(true, false)), + ("e10s_cohort", List("control", "test")) + ) + + var sc: Option[SparkContext] = None + var sqlContext: Option[SQLContext] = None + + override def beforeAll(configMap: org.scalatest.ConfigMap) { + // set up and configure Spark + val sparkConf = new SparkConf().setAppName("KPI") + sparkConf.setMaster(sparkConf.get("spark.master", "local[1]")) + sc = Some(new SparkContext(sparkConf)) + sqlContext = Some(new SQLContext(sc.get)) + } + + override def afterAll(configMap: org.scalatest.ConfigMap) { + sc.get.stop() + } + + def fixture = { + def cartesianProduct(dimensions: List[(String, List[Any])]): Iterable[Map[String, Any]] = { + dimensions match { + case Nil => List(Map[String, Any]()) + case (dimensionName, dimensionValues) :: rest => + dimensionValues.flatMap(dimensionValue => + cartesianProduct(dimensions.tail).map( + configuration => configuration + (dimensionName -> dimensionValue) + ) + ) + } + } + + def createPing(dimensions: Map[String, Any]): Map[String, Any] = { + val SCALAR_VALUE = 42 + val keyedHistograms = + ("SUBPROCESS_CRASHES_WITH_DUMP" -> + ("content" -> + ("bucket_count" -> 3) ~ + ("histogram_type" -> 4) ~ + ("range" -> List(1, 2)) ~ + ("sum" -> SCALAR_VALUE) ~ + ("values" -> Map("0" -> SCALAR_VALUE, "1" -> 0)) + ) ~ + ("plugin" -> + ("bucket_count" -> 3) ~ + ("histogram_type" -> 4) ~ + ("range" -> List(1, 2)) ~ + ("sum" -> SCALAR_VALUE) ~ + ("values" -> Map("0" -> SCALAR_VALUE, "1" -> 0)) + ) ~ + ("gmplugin" -> + ("bucket_count" -> 3) ~ + ("histogram_type" -> 4) ~ + ("range" -> List(1, 2)) ~ + ("sum" -> SCALAR_VALUE) ~ + ("values" -> Map("0" -> SCALAR_VALUE, "1" -> 0)) + ) + ) + val info = + JObject(if (dimensions("doc_type") == "main") List("subsessionLength" -> JInt(SCALAR_VALUE)) else List[JField]()) // only include the subsession length in main pings + val system = + ("os" -> + ("name" -> dimensions("os_name").asInstanceOf[String]) ~ + ("version" -> dimensions("os_version").asInstanceOf[String]) + ) + val settings = + ("e10sEnabled" -> dimensions("e10s").asInstanceOf[Boolean]) ~ + ("e10sCohort" -> dimensions("e10s_cohort").asInstanceOf[String]) + val build = + ("version" -> dimensions("build_version").asInstanceOf[String]) ~ + ("buildId" -> dimensions("build_id").asInstanceOf[String]) ~ + ("architecture" -> dimensions("architecture").asInstanceOf[String]) + val addons = + ("activeExperiment" -> + ("id" -> dimensions("experiment_id").asInstanceOf[String]) ~ + ("branch" -> dimensions("experiment_branch").asInstanceOf[String]) + ) + implicit val formats = DefaultFormats + Map( + "creationTimestamp" -> dimensions("activity_date").asInstanceOf[Double], + "submissionDate" -> dimensions("submission_date").asInstanceOf[String], + "docType" -> dimensions("doc_type").asInstanceOf[String], + "geoCountry" -> dimensions("country").asInstanceOf[String], + "normalizedChannel" -> dimensions("channel").asInstanceOf[String], + "appName" -> dimensions("application").asInstanceOf[String], + "payload.keyedHistograms" -> compact(render(keyedHistograms)), + "payload.info" -> compact(render(info)), + "environment.system" -> compact(render(system)), + "environment.settings" -> compact(render(settings)), + "environment.build" -> compact(render(build)), + "environment.addons" -> compact(render(addons)) + ) + } + + new { + val pings: List[Map[String, Any]] = (for (configuration <- cartesianProduct(pingDimensions)) yield createPing(configuration)).toList + + val ( + rowRDD, + mainProcessedAccumulator, mainIgnoredAccumulator, + crashProcessedAccumulator, crashIgnoredAccumulator + ) = CrashAggregateView.compareCrashes(sc.get, sc.get.parallelize(pings)) + val schema = CrashAggregateView.buildSchema() + val records = sqlContext.get.createDataFrame(rowRDD, schema) + records.count() // Spark is pretty lazy; kick it so it'll update our accumulators properly + } + } + + "Records" must "have the correct lengths" in { + // the number of aggregates is half of the number of pings originally - this is because pings vary all their dimensions, including the doc type + // when the doc type is "crash", the ping gets treated the same as if it was a "main" ping that also contains a main process crash + // we basically "fold" the doc type dimension into the aggregates + assert(fixture.records.count() == fixture.pings.length / 2) + assert(fixture.mainProcessedAccumulator.value == fixture.pings.length / 2) + assert(fixture.mainIgnoredAccumulator.value == 0) + assert(fixture.crashProcessedAccumulator.value == fixture.pings.length / 2) + assert(fixture.crashIgnoredAccumulator.value == 0) + } + + "activity date" must "be in a fixed set of dates" in { + val validValues = List( + "2016-03-02", "2016-06-01", // these are directly from the dataset + "2016-03-05", "2016-05-31" // these are bounded to be around the submission date + ) + for (row <- fixture.records.select("activity_date").collect()) { + assert(validValues contains row(0)) + } + } + + "dimensions" must "be converted correctly" in { + val dimensionValues = pingDimensions.toMap + for (row <- fixture.records.select("dimensions").collect()) { + val dimensions = row.getJavaMap[String, String](0) + assert(dimensionValues("build_version") contains dimensions.getOrElse("build_version", null)) + assert(dimensionValues("build_id") contains dimensions.getOrElse("build_id", null)) + assert(dimensionValues("channel") contains dimensions.getOrElse("channel", null)) + assert(dimensionValues("application") contains dimensions.getOrElse("application", null)) + assert(dimensionValues("os_name") contains dimensions.getOrElse("os_name", null)) + assert(dimensionValues("os_version") contains dimensions.getOrElse("os_version", null)) + assert(dimensionValues("architecture") contains dimensions.getOrElse("architecture", null)) + assert(dimensionValues("country") contains dimensions.getOrElse("country", null)) + assert(dimensionValues("experiment_id") contains dimensions.getOrElse("experiment_id", null)) + assert(dimensionValues("experiment_branch") contains dimensions.getOrElse("experiment_branch", null)) + assert(List("True", "False") contains dimensions.getOrElse("e10s_enabled", null)) + assert(dimensionValues("e10s_cohort") contains dimensions.getOrElse("e10s_cohort", null)) + } + } + + "crash rates" must "be converted correctly" in { + for (row <- fixture.records.select("stats").collect()) { + val stats = row.getJavaMap[String, Double](0) + assert(stats("ping_count") == 1) + assert(stats("usage_hours") == 42 / 3600.0) + assert(stats("main_crashes") == 1) + assert(stats("content_crashes") == 42 * 2) + assert(stats("plugin_crashes") == 42 * 2) + assert(stats("gmplugin_crashes") == 42 * 2) + assert(stats("usage_hours_squared") == 0.00013611111111111113) + assert(stats("main_crashes_squared") == 1) + assert(stats("content_crashes_squared") == 3528) + assert(stats("plugin_crashes_squared") == 3528) + assert(stats("gmplugin_crashes_squared") == 3528) + } + } +} diff --git a/src/test/scala/Longitudinal.scala b/src/test/scala/Longitudinal.scala index ac1aff8e..e4667310 100644 --- a/src/test/scala/Longitudinal.scala +++ b/src/test/scala/Longitudinal.scala @@ -273,7 +273,6 @@ class LongitudinalTest extends FlatSpec with Matchers with PrivateMethodTester{ assert(records.length == fixture.payloads.length) records.foreach{ x => val record = x.asInstanceOf[Record] - println(record) assert(record.get("model") == "SHARP") } }