-
Notifications
You must be signed in to change notification settings - Fork 46
Spark schema main summary to view #67
Spark schema main summary to view #67
Conversation
Please rebase. |
Merged master. I'll squash all the WIP commits once this is ready to land :) |
build.sbt
Outdated
ivyScala := ivyScala.value map { _.copy(overrideScalaVersion = true) }, | ||
libraryDependencies += "org.apache.avro" % "avro" % "1.7.7", | ||
libraryDependencies += "org.apache.parquet" % "parquet-avro" % "1.8.1", | ||
libraryDependencies += "org.apache.parquet" % "parquet-avro" % "1.7.0", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
parquet-avro 1.8.1 conflicted with writing a DataFrame to Parquet format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Uberi that might fix the bug you had seen when writing the crash dataset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does seem like it. Are there any issues with downgrading though? 7c7494c seems to have upgraded for reasons related to functionality.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 1.8.1 added support for writing some of the data structures used by the Longutindal dataset (Map of structs or something, I'm not sure). You can see the errors I got by checking out out this PR and re-enabling the serialization test in src/test/scala/Longitudinal.scala
, then running sbt test
Current coverage is 54.03%
@@ master #67 diff @@
==========================================
Files 17 18 +1
Lines 1541 1638 +97
Methods 1483 1580 +97
Messages 0 0
Branches 55 50 -5
==========================================
+ Hits 837 885 +48
- Misses 704 753 +49
Partials 0 0
|
src/test/scala/Longitudinal.scala
Outdated
|
||
"Records" can "be serialized" in { | ||
ParquetFile.serialize(List(fixture.record).toIterator, fixture.schema) | ||
println("TODO: fix Avro+Parquet Serialization test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally we can just get rid of this by changing Longitudinal
to use the SparkSQL / DataFrame serialization too.
Just fyi, I am planning to write a proper telemetry API in the next weeks which is going to provide fast access to telemetry records. @Uberi |
Where can I find the output of this job? |
You can find the sample data at |
Do you know why |
Also note that v2 has practically no skew and all files have the same size while the same isn't true for v3. |
The extra 3 files in |
Re: skew, I believe that is due to actually repartitioning the records (by sampleId) in v2, whereas the current iteration of v3 just does the "coalesce" which combines existing partitions. |
hadoopConf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem") | ||
|
||
// We want to end up with reasonably large parquet files on S3. | ||
hadoopConf.setInt("parquet.block.size", 128 * 1024 * 1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vitillo Is 128MB a reasonable size for parquet files? Should we go larger? I've read conflicting advice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Depends, how many files are generated per day (with 128MB as size) and how many days are usually read from an analysis job?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are about 20-30GB per day of data (in the latest gzipped-parquet form per this PR). If we partition by sample_id, we should have 100 partitions of ~235MB each. So likely two files per day+sample_id
partition. Analysis jobs frequently look at MAU, so using 30 days of data is common (40 days, including data latency for activity date vs. submission date).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should have a single file per day+sample_id then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll go with 256MB for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's make it 512 to avoid very small files to be generated.
Ok, I tested that the data can be read from Spark, and that the record counts match exactly for one day's data between the existing v2 data, the v3 data using manual partitioning, and the v4 data using |
r+ once you confirm that the dataset can be read by Presto as well. |
I checked, and the v3 and v4 datasets can't be auto-imported into Hive with the current One other issue I ran into is that I manually edited the resulting Hive commands to make |
Casting sample_id to an integer in the query might be good enough. |
Ok, there's a workaround where you can just cast the partition field to a number in your query if you want to treat it as one. Null values work as expected too. These queries all do the expected thing: SELECT count(*) FROM test_main_summary_v4 WHERE sample_id = '5'
SELECT count(*) FROM test_main_summary_v4 WHERE cast(sample_id AS bigint) < 10
SELECT count(*) FROM test_main_summary_v4 WHERE sample_id IS NULL
SELECT count(*) FROM test_main_summary_v4 WHERE cast(sample_id AS bigint) IS NULL |
Per bug 1272334, refactor the Main Summary code to use Spark SQL types directly (rather than using Avro Schema), and switch from the "Derived Stream" approach to the "Views" approach. This incorporates the "utils.Telemetry" code in mozilla#62. Processing a single day's data on a 20-node cluster (April 1, 2016 was my reference date) takes a bit more than 90 minutes with this current code. Using the DerivedStreams approach from a few commits back takes about half that amount of time. Consumer-facing changes in the v3 dataset include: - Renaming the "submission_date_s3" field to "submission_date" - Changing the type of the "sample_id" field to a string due to using it as an s3 partitioning field.
75adce2
to
55ab210
Compare
@vitillo I've cleaned up the history in this branch, it should be ready for final review / merge. |
Argh, I noticed a problem where the "overwrite" deletes the entire "version" prefix before adding data. |
As discussed on IRC we should make sure the longitudinal dataset is not affected by the changes. |
The MainSummaryView serialization doesn't run when using parquet-avro 1.8.1, so disable that for now. Also adjust the .sbtopts file to avoid an OOM when running tests.
Ok, I've re-enabled the serialization test for I confirmed that the MainSummaryView generation itself still works fine. |
This also restores the "submission_date_s3" field, so that's one less change for consumers of the data between v2 and v3. |
src/test/scala/Longitudinal.scala
Outdated
} | ||
|
||
"Records" can "be serialized" in { | ||
println("TODO: fix Avro+Parquet Serialization test") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please remove this line.
@mreid-moz this can be merged; we should probably do that when you are back so we can deploy it shortly afterwards. |
Summary of where we ended up with this PR:
If this sounds acceptable, then we're ready to merge. |
r+ |
Per bug 1272334, refactor the Main Summary code to use Spark SQL types directly (rather than using Avro Schema), and switch from the "Derived Stream" approach to the "Views" approach.
This builds on the code in #62, but still has some performance problems to work out.
Processing a single day's data on a 20-node cluster (April 1, 2016 was my reference date) takes a bit more than 90 minutes with this current code. Using the DerivedStreams approach from a few commits back takes about half that amount of time.
Note that I also added a step to
Telemetry.getMessages
to balance the partition sizes, which reduced the run time from more than 3 hours down to 90 minutes or so.@vitillo @Uberi Feedback and suggestions appreciated, particularly about what I can do to identify and fix the performance difference.