Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ metastore_db/
.idea/
derby.log
*.iml
.DS_Store
3 changes: 2 additions & 1 deletion .sbtopts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
-J-Xss4m
-J-Xss16M
-J-Xmx2048M
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")

sparkVersion := "1.6.0"

sparkComponents ++= Seq("yarn", "sql", "hive")
sparkComponents ++= Seq("core", "yarn", "sql", "hive")

spDependencies += "vitillo/spark-hyperloglog:1.0.2"

Expand Down
5 changes: 0 additions & 5 deletions src/main/scala/DerivedStream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import org.json4s.jackson.JsonMethods._
import scala.collection.JavaConverters._
import scala.io.Source
import telemetry.streams.{E10sExperiment, ExecutiveStream, Churn, Longitudinal}
import telemetry.streams.main_summary.MainSummary
import telemetry.utils.Utils

// key is the S3 filename, size is the object size in bytes.
Expand Down Expand Up @@ -191,10 +190,6 @@ object DerivedStream {
val exp = E10sExperiment("e10s-beta47-cohorts", "telemetry/4/saved_session/Firefox/beta/47.0/")
Some(from, exp)

case "MainSummary" => // https://bugzilla.mozilla.org/show_bug.cgi?id=1260847
val mainSummary = MainSummary("telemetry/4/main/Firefox")
Some(options.getOrElse('fromDate, to), mainSummary)

case _ =>
None
}
Expand Down
9 changes: 1 addition & 8 deletions src/main/scala/parquet/ParquetFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,11 @@ object ParquetFile {

hadoopConf.set("fs.file.impl", classOf[org.apache.hadoop.fs.LocalFileSystem].getName)

private def temporaryFileName(): Path = {
val tmpDir = System.getProperty("java.io.tmpdir")
val vmid = new VMID().toString().replaceAll(":|-", "")
val uri = URI.create(s"file:///$tmpDir/$vmid.tmp")
return new Path(uri)
}

def serialize(data: Iterator[GenericRecord], schema: Schema, blockSizeMultiplier: Int = 1): Path = {
val blockSize = blockSizeMultiplier*ParquetWriter.DEFAULT_BLOCK_SIZE
val pageSize = ParquetWriter.DEFAULT_PAGE_SIZE
val enableDict = ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED
val parquetFile = temporaryFileName()
val parquetFile = telemetry.utils.Utils.temporaryFileName()
val parquetWriter = new AvroParquetWriter[GenericRecord](parquetFile, schema, CompressionCodecName.SNAPPY, blockSize, pageSize, enableDict, hadoopConf)

// Disable Parquet logging
Expand Down
426 changes: 0 additions & 426 deletions src/main/scala/streams/main_summary/MainSummary.scala

This file was deleted.

20 changes: 9 additions & 11 deletions src/main/scala/streams/main_summary/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package telemetry.streams.main_summary

import org.apache.spark.sql.Row
import org.json4s.JsonAST._

object Utils{
Expand Down Expand Up @@ -88,30 +89,27 @@ object Utils{
}

private val searchKeyPattern = "^(.+)\\.(.+)$".r
def searchHistogramToMap(name: String, hist: JValue): Option[Map[String, Any]] = {

def searchHistogramToRow(name: String, hist: JValue): Option[Row] = {
// Split name into engine and source, then insert count from histogram.
try {
val searchKeyPattern(engine, source) = name
val count = (hist \ "sum") match {
case JInt(x) => x.toInt
case JInt(x) => x.toLong
case _ => -1
}
Some(Map(
"engine" -> engine,
"source" -> source,
"count" -> count
))
Some(Row(engine, source, count))
} catch {
case e: scala.MatchError => None
}
}

def getSearchCounts(searchCounts: JValue): Option[List[Map[String,Any]]] = {
def getSearchCounts(searchCounts: JValue): Option[List[Row]] = {
searchCounts match {
case JObject(x) => {
val buf = scala.collection.mutable.ListBuffer.empty[Map[String,Any]]
val buf = scala.collection.mutable.ListBuffer.empty[Row]
for ((k, v) <- x) {
for (c <- searchHistogramToMap(k, v)) {
for (c <- searchHistogramToRow(k, v)) {
buf.append(c)
}
}
Expand All @@ -124,7 +122,7 @@ object Utils{


// Find the largest numeric bucket that contains a value greater than zero.
def enumHistogramToCount(h: JValue): Option[Long] = {
def enumHistogramToCount(h: JValue): Option[Int] = {
(h \ "values") match {
case JNothing => None
case JObject(x) => {
Expand Down
93 changes: 93 additions & 0 deletions src/main/scala/utils/Telemetry.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package telemetry.utils

import scala.io.Source
import scala.collection.JavaConversions._
import scala.collection.JavaConverters._
import telemetry.heka.{HekaFrame, Message}
import org.json4s._
import org.json4s.jackson.JsonMethods.parse
import org.joda.time.DateTime
import org.apache.spark.rdd.RDD
import awscala.s3.{Bucket, S3}
import org.apache.spark.SparkContext
import telemetry.{DerivedStream, ObjectSummary}

object Telemetry {
implicit lazy val s3: S3 = S3()

private def listS3Keys(bucket: Bucket, prefix: String, delimiter: String = "/"): Stream[String] = {
import com.amazonaws.services.s3.model.{ ListObjectsRequest, ObjectListing }

val request = new ListObjectsRequest().withBucketName(bucket.getName).withPrefix(prefix).withDelimiter(delimiter)
val firstListing = s3.listObjects(request)

def completeStream(listing: ObjectListing): Stream[String] = {
val prefixes = listing.getCommonPrefixes.asScala.toStream
prefixes #::: (if (listing.isTruncated) completeStream(s3.listNextBatchOfObjects(listing)) else Stream.empty)
}

completeStream(firstListing)
}

private def matchingPrefixes(bucket: Bucket, seenPrefixes: Stream[String], pattern: List[String]): Stream[String] = {
if (pattern.isEmpty) {
seenPrefixes
} else {
val matching = seenPrefixes
.flatMap(prefix => listS3Keys(bucket, prefix))
.filter(prefix => (pattern.head == "*" || prefix.endsWith(pattern.head + "/")))
matchingPrefixes(bucket, matching, pattern.tail)
}
}

def appendToFile(p: String, s: String): Unit = {
val pw = new java.io.PrintWriter(new java.io.FileOutputStream(new java.io.File(p),true))
try pw.write(s) finally pw.close()
}

def getRecords(sc: SparkContext, submissionDate: DateTime, pingPath: List[String]): RDD[Map[String, Any]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this function defined here but never used?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied this whole file from Anthony's PR, which only had the getRecords function - I added getMessages because I needed to access the main Heka fields (Timestamp specifically).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a debugging function that I left in accidentally. It's pretty useful when debugging locally, but we probably don't need to keep it around.

getMessages(sc, submissionDate, pingPath).map(HekaFrame.fields)
}

def getMessages(sc: SparkContext, submissionDate: DateTime, pingPath: List[String]): RDD[Message] = {
// obtain the prefix of the telemetry data source
val metadataBucket = Bucket("net-mozaws-prod-us-west-2-pipeline-metadata")
val Some(sourcesObj) = metadataBucket.get("sources.json")
val metaSources = parse(Source.fromInputStream(sourcesObj.getObjectContent()).getLines().mkString("\n"))
val JString(telemetryPrefix) = metaSources \\ "telemetry" \\ "prefix"
val JString(dataBucket) = metaSources \\ "telemetry" \\ "bucket"

// get a stream of object summaries that match the desired criteria
val bucket = Bucket(dataBucket)

val summaries = matchingPrefixes(
bucket,
List("").toStream,
List(telemetryPrefix, submissionDate.toString("yyyyMMdd")) ++ pingPath
).flatMap(prefix => s3.objectSummaries(bucket, prefix)).map(summary => ObjectSummary(summary.getKey, summary.getSize))

// Partition the files into groups of approximately-equal size
val groups = DerivedStream.groupBySize(summaries.toIterator)
sc.parallelize(groups, groups.size).flatMap(x => x).flatMap(o => {
val hekaFile = bucket.getObject(o.key).getOrElse(throw new Exception(s"File missing on S3: ${o.key}"))
for (message <- HekaFrame.parse(hekaFile.getObjectContent, o.key)) yield message
})
}

def listOptions(submissionDate: DateTime, pingPath: List[String]): Stream[String] = {
// obtain the prefix of the telemetry data source
val metadataBucket = Bucket("net-mozaws-prod-us-west-2-pipeline-metadata")
val Some(sourcesObj) = metadataBucket.get("sources.json")
val metaSources = parse(Source.fromInputStream(sourcesObj.getObjectContent).getLines().mkString("\n"))
val JString(telemetryPrefix) = metaSources \\ "telemetry" \\ "prefix"
val JString(dataBucket) = metaSources \\ "telemetry" \\ "bucket"

// get a stream of object summaries that match the desired criteria
val bucket = Bucket(dataBucket)
matchingPrefixes(
bucket,
List("").toStream,
List(telemetryPrefix, submissionDate.toString("yyyyMMdd")) ++ pingPath ++ List("*")
)
}
}
11 changes: 11 additions & 0 deletions src/main/scala/utils/Utils.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package telemetry.utils

import java.net.URI
import java.rmi.dgc.VMID

import org.apache.hadoop.fs.Path
import org.joda.time._

object Utils{
Expand Down Expand Up @@ -76,4 +80,11 @@ object Utils{
val millisecondsPerDay = 1000 * 60 * 60 * 24
dateFormatter.withZone(org.joda.time.DateTimeZone.UTC).print(new DateTime(timestamp.toLong * millisecondsPerDay))
}

def temporaryFileName(): Path = {
val tmpDir = System.getProperty("java.io.tmpdir")
val vmid = new VMID().toString().replaceAll(":|-", "")
val uri = URI.create(s"file:///$tmpDir/$vmid.tmp")
return new Path(uri)
}
}
Loading