New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug 1568042 - Allow ingesting heka-framed blobs #781
Conversation
Codecov Report
@@ Coverage Diff @@
## master #781 +/- ##
=============================================
- Coverage 89.04% 49.61% -39.43%
+ Complexity 562 555 -7
=============================================
Files 81 77 -4
Lines 2966 4815 +1849
Branches 260 701 +441
=============================================
- Hits 2641 2389 -252
- Misses 239 2315 +2076
- Partials 86 111 +25
Continue to review full report at Codecov.
|
Gotcha, could you point me where we can suppress spotless checks?
…On Wed, Aug 28, 2019, 11:44 Daniel Thorn ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In checkstyle/suppressions.xml
<#781 (comment)>:
> @@ -15,6 +15,9 @@
<!-- suppress all checks in files we have copied in from other projects -->
<suppress checks=".*" files="WriteTables.java" />
+ <!-- heka protobuf file is autogenerated -->
+ <suppress checks=".*" files="Heka.java" />
this isn't checkstyle complaining, it's spotless
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#781?email_source=notifications&email_token=AAAFAWPKLKYGHTZ45SFFYYDQG2MNDA5CNFSM4IRJHEO2YY3PNVWWK3TUL52HS4DFWFIHK3DMKJSXC5LFON2FEZLWNFSXPKTDN5WW2ZLOORPWSZGOCC7JTLY#discussion_r318655664>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AAAFAWPV5QSS4GR6JNVJK4LQG2MNDANCNFSM4IRJHEOQ>
.
|
4ef7919
to
7459ab0
Compare
|
ingestion-beam/src/main/java/com/mozilla/telemetry/io/Read.java
Outdated
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaTransform.java
Outdated
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/io/Read.java
Outdated
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaReader.java
Outdated
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaTransform.java
Outdated
Show resolved
Hide resolved
0b8b3cb
to
204b24b
Compare
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaReader.java
Outdated
Show resolved
Hide resolved
|
||
insertPath(payload, Arrays.asList("meta", "Hostname"), message.getHostname()); | ||
insertPath(payload, Arrays.asList("meta", "Timestamp"), new Long(message.getTimestamp())); | ||
insertPath(payload, Arrays.asList("meta", "Type"), message.getDtype()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am I correct in understanding that these lines are injecting fields into the JSON payload? If we're going to be passing this JSON object through the decoder, it would be better to have these available as attributes on the PubsubMessage rather than in the payload to match the layout of attributes coming from the edge server (see https://mozilla.github.io/gcp-ingestion/architecture/edge_service_specification/#general-data-flow). We should probably have a chat to think through implications there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, I think these fields might have been added by hindsight to the json payload in our AWS implementation. @mreid-moz can you confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do similar in the GCP pipeline where we put various metadata into a metadata
blob as one of the last steps of the decoder, see AddMetadata.java.
I believe the plan is that we'll pass these heka payloads through the Decoder rather than just the Sink so that they get validated before we try to send them to BigQuery. We'll need to make sure the message is in a format that the Decoder can understand. The Decoder will check the JSON payload to see if it contains a metadata
struct and will extract various attributes from it if they exist, so it is still an option to provide them that way rather than directly making them PubsubMessage attributes. But in either case we need to make sure the names line up. Perhaps we should save that to a separate PR where @relud or I can apply our knowledge of the metadata schema that this repo expects.
09b8466
to
eb0ea53
Compare
@jklukas I think this is ready for another look. The ingestion-beam tests are now passing, the ingestion-sink-integration tests aren't passing, but I don't think that has to do with this PR. AFAICT there are 3 issues remaining, but I'm not sure if there's anything blocking merging. Might be easier to get this in and then followup, thoughts? |
This needs a rebase on master before being ready to merge, which will require refactoring to use jackson ObjectNode rather than org.json JSONObject. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some Heka arcana :)
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaReader.java
Outdated
Show resolved
Hide resolved
ingestion-beam/src/main/java/com/mozilla/telemetry/heka/HekaReader.java
Outdated
Show resolved
Hide resolved
: Arrays.asList("meta", key); | ||
if (f.getValueType() == Heka.Field.ValueType.STRING) { | ||
String value = f.getValueString(0); | ||
if (value.charAt(0) == '{') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can reliably also look at the Heka Field's "representation", which should be set to "json"
for anything that should be parsed as json. This needs to be verified, as my recollection is fairly hazy on that.
If this is the case, then everything with a "json" representation should be inserted into the main document body, and all other Heka message fields should be inserted into the "meta"
struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, yes, this might need a bit more refinement. It looks like the scala version attempts to parse the value as json, falling back to just setting the value to a string if that fails:
eb0ea53
to
17b82a8
Compare
.ifPresent(s -> attributes.put(Attribute.GEO_SUBDIVISION1, s)); | ||
Optional.ofNullable(meta.path("geoSubdivision2").textValue()) | ||
.ifPresent(s -> attributes.put(Attribute.GEO_SUBDIVISION2, s)); | ||
// TODO: Do heka messages contain parsed user agent info? DNT? Method? Protocol? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mreid-moz Do heka messages contain parsed user agent info? Other headers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we parse/store user agent info in the telemetry data on AWS.
cc @trink can you confirm?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do not store/parse user agent info for telemetry data in AWS (we do for structured). We do store these headers: https://github.com/mozilla-services/lua_sandbox_extensions/blob/master/moz_telemetry/io_modules/decoders/moz_ingest/telemetry.lua#L321-L324.
24cc864
to
323e684
Compare
78b9e27
to
e8146cb
Compare
@@ -10,7 +10,7 @@ | |||
"version": "69.0", | |||
"xpcomAbi": "x86-msvc" | |||
}, | |||
"clientId": "a136763f-3b9a-492d-8964-bfe783b68dc6", | |||
"clientId": "0e8e41d2-d46f-4818-b071-c6605257e6aa", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test was failing before the commit. I'm assuming the json is just not up to date with the heka data for the test, but correct me if I'm wrong on that, @wlach
bbd6649
to
f836980
Compare
f836980
to
92a914d
Compare
Let's merge this! It seems to work reasonable well for the data we've thrown at it so far. Can always address any issues in followups. |
Work in progress. This adds heka blob parsing, some tests for that, and the very beginnings of a heka->pubsub sink. Still needed: