Skip to content
This repository was archived by the owner on Feb 14, 2025. It is now read-only.

Conversation

Uberi
Copy link
Contributor

@Uberi Uberi commented Apr 29, 2016

Based on #56:

  • Convert the Churn and Executive streams to views rather than the old DerivedStream kind. When all of them are completely converted, we can remove the Avro stuff.
  • The executive view avoids a ton of shuffling by using getRecords with the channel name directly. This seems to make it a lot faster.
  • telemetry.utils.Telemetry contains some useful functions, getRecords and listOptions.

@codecov-io
Copy link

codecov-io commented Apr 29, 2016

Current coverage is 50.95%

Merging #62 into master will decrease coverage by -3.31%

  1. 2 files in src/main/scala were modified. more
    • Misses -11
  2. 2 files (not in diff) in ...streams/main_summary were modified. more
  3. 2 files (not in diff) in ...c/main/scala/streams were modified. more
  4. 1 files (not in diff) in src/main/scala were modified. more
    • Misses -2
@@             master        #62   diff @@
==========================================
  Files            17         19     +2   
  Lines          1539       1639   +100   
  Methods        1481       1584   +103   
  Messages          0          0          
  Branches         55         49     -6   
==========================================
  Hits            835        835          
- Misses          704        804   +100   
  Partials          0          0          

Powered by Codecov. Last updated by d36ff31...07dafc0

val JString(telemetryPrefix) = metaSources \\ "telemetry" \\ "prefix"

// get a stream of object summaries that match the desired criteria
val bucket = Bucket("net-mozaws-prod-us-west-2-pipeline-data")
Copy link
Contributor

@mreid-moz mreid-moz May 2, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should get the bucket from the metadata as well: val JString(bucketName) = metaSources \\ "telemetry" \\ "bucket"

@Uberi
Copy link
Contributor Author

Uberi commented May 2, 2016

Updated to use data bucket from metadata.

@vitillo
Copy link
Contributor

vitillo commented May 3, 2016

Thanks for picking this up @Uberi! Could you please rebase it?

}
}

def appendToFile(p: String, s: String): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this function used?

@vitillo
Copy link
Contributor

vitillo commented May 3, 2016

@mreid-moz are you happy with the churn rewrite?

@mreid-moz
Copy link
Contributor

Taking a look now

sbt "run-main telemetry.views.CrashAggregateView --from 20160410 --to 20160411"
```

**Note:** Currently, due to [avro-parquet issues](https://issues.apache.org/jira/browse/HIVE-12828), Parquet writing only works under the `spark-submit` commands - the above example will fail. This will be fixed when avro-parquet updates or is removed.
Copy link
Contributor

@mreid-moz mreid-moz May 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/Currently/As of 2016-05-03/

This will hopefully help later when figuring out timelines debugging the linked issue.

val schema = buildSchema()
val messages = Telemetry.getRecords(sc, currentDate, List("telemetry", "4", "main", "Firefox"))
val rowRDD = messages.flatMap(messageToRow).repartition(100) // TODO: partition by sampleId
val records = sqlContext.createDataFrame(rowRDD.coalesce(1), schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the coalesce(1) here? Shouldn't we leave rowRDD as-is since it already has the desired number of partitions (in this case 100)?

@Uberi
Copy link
Contributor Author

Uberi commented May 7, 2016

Changes:

  • Implement all of the above feedback.
  • Note: Since I'm not in the AWS cloud services group, I haven't tested these on real data.

@mreid-moz
Copy link
Contributor

Changes look good to me. Can you test it on real data @vitillo?

bucket,
List("").toStream,
List(telemetryPrefix, submissionDate.toString("yyyyMMdd")) ++ pingPath
).flatMap(prefix => s3.objectSummaries(bucket, prefix)).map(summary => summary.getKey())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we include something like the logic in DerivedStream.groupBySize to try to balance out the size of each partition / task?

@vitillo
Copy link
Contributor

vitillo commented May 10, 2016

@mreid-moz I can test the executive view, can you take the churn view?

@mreid-moz
Copy link
Contributor

@vitillo Yeah, I'll test out the churn view today.

@mreid-moz
Copy link
Contributor

Ok, I ran a test of the ChurnView code and found the following for the data on 20160401 (running on a cluster of 20 nodes):

Test 1: DerivedStream-based code:

time spark-submit \
    --master yarn-client \
    --class telemetry.DerivedStream \
    telemetry-batch-view-1.1-orig.jar \
    --from-date 20160401 \
    --to-date 20160401 \
    Churn

Test 2: ChurnView code:

time spark-submit \
    --master yarn-client \
    --class telemetry.views.ChurnView \
    telemetry-batch-view-1.1-views.jar \
    --from 20160401 \
    --to 20160401
  • The old DerivedStream-based code ran in 26m31.701s and generated a correct-looking number of records.
  • The View-based code ran in 137m48.952s (about 5 times slower), and generated a dataset with the correct structure, but no actual records (a count of the dataframe was zero).
  • The tasks in the ChurnView code were very unbalanced time-wise, with the longest task taking 2.2hr. See this screenshot from the Spark UI:
    screen shot 2016-05-12 at 3 06 47 pm
  • I left the generated datasets in telemetry-test-bucket/churn/v1 (DerivedStream based) and telemetry-test-bucket/churn/v2 (ChurnView) for further inspection, and I saved the "spark.log" for the test if that's of interest.
  • I also have a simple notebook I used to count the resulting datasets if needed.

def buildSchema(): StructType = {
StructType(
StructField("clientId", StringType, false) ::
StructField("sampleId", IntegerType, false) ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either s/IntegerType/LongType/ here or output an Integer value in messageToRow. I noticed that Spark types are less forgiving of Int vs. Long conversion than the Avro stuff.

mreid-moz added a commit to mreid-moz/telemetry-batch-view that referenced this pull request May 19, 2016
Per bug 1272334, refactor the Main Summary code to use Spark SQL types
directly (rather than using Avro Schema), and switch from the "Derived
Stream" approach to the "Views" approach.

This incorporates the "utils.Telemetry" code in mozilla#62.

Processing a single day's data on a 20-node cluster (April 1, 2016 was
my reference date) takes a bit more than 90 minutes with this current
code. Using the DerivedStreams approach from a few commits back takes
about half that amount of time.

Consumer-facing changes in the v3 dataset include:
- Renaming the "submission_date_s3" field to "submission_date"
- Changing the type of the "sample_id" field to a string due to
  using it as an s3 partitioning field.
vitillo pushed a commit that referenced this pull request May 23, 2016
* Refactor "Main Summary" to drop avro, use "Views".

Per bug 1272334, refactor the Main Summary code to use Spark SQL types
directly (rather than using Avro Schema), and switch from the "Derived
Stream" approach to the "Views" approach.

This incorporates the "utils.Telemetry" code in #62.

Processing a single day's data on a 20-node cluster (April 1, 2016 was
my reference date) takes a bit more than 90 minutes with this current
code. Using the DerivedStreams approach from a few commits back takes
about half that amount of time.

Consumer-facing changes in the v3 dataset include:
- Renaming the "submission_date_s3" field to "submission_date"
- Changing the type of the "sample_id" field to a string due to
  using it as an s3 partitioning field.

* Fix the S3 partitioning logic.

* Re-enable the tests for "Longitudinal".

The MainSummaryView serialization doesn't run when using
parquet-avro 1.8.1, so disable that for now. Also adjust
the .sbtopts file to avoid an OOM when running tests.

* Remove a spurious print stmt
@vitillo vitillo force-pushed the master branch 5 times, most recently from 1b94720 to ccfd798 Compare June 1, 2016 22:41
@vitillo vitillo closed this Jun 20, 2016
SamPenrose pushed a commit to SamPenrose/telemetry-batch-view that referenced this pull request May 20, 2017
* Refactor "Main Summary" to drop avro, use "Views".

Per bug 1272334, refactor the Main Summary code to use Spark SQL types
directly (rather than using Avro Schema), and switch from the "Derived
Stream" approach to the "Views" approach.

This incorporates the "utils.Telemetry" code in mozilla#62.

Processing a single day's data on a 20-node cluster (April 1, 2016 was
my reference date) takes a bit more than 90 minutes with this current
code. Using the DerivedStreams approach from a few commits back takes
about half that amount of time.

Consumer-facing changes in the v3 dataset include:
- Renaming the "submission_date_s3" field to "submission_date"
- Changing the type of the "sample_id" field to a string due to
  using it as an s3 partitioning field.

* Fix the S3 partitioning logic.

* Re-enable the tests for "Longitudinal".

The MainSummaryView serialization doesn't run when using
parquet-avro 1.8.1, so disable that for now. Also adjust
the .sbtopts file to avoid an OOM when running tests.

* Remove a spurious print stmt
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants