Expanded the metadata fields supported in the AMQP input#39
Merged
Conversation
Syncing fork from master
Contributor
|
Hey @chrisriley, looks great. I'm just playing with the changes locally. I think I might take away the prefix for headers, the intention there is to make a best attempt at passing the unchanged preexisting metadata to sinks. A general rule I'm leaning towards right now is any specific headers/metadata fields that are taken from a message we try to copy as-is (obviously here you do a best attempt by walking recursive header tables, which I like), but things we extract that aren't explicitly metadata or headers (such as the key, timestamps, etc) we prefix so that they can be easily distinguished. |
Jeffail
added a commit
that referenced
this pull request
May 14, 2026
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
Jeffail
added a commit
that referenced
this pull request
May 14, 2026
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
Jeffail
added a commit
that referenced
this pull request
May 18, 2026
Picks up two upstream landings from the rolling-fix work: 1. twmb/avro PR #38 (Jeffail) — the field-level logicalType lift our own metadata parser already handles. Pulling it in means the value-side decoder now produces time.Time for sibling-form timestamp-millis (and the rest of the matrix) natively, instead of returning int64 and relying on the iceberg shredder's metadata-driven numeric scaling bridge to reconcile. 2. twmb/avro PR #39 (twmb) — a cumulative perf, parity, and spec-compliance pass. Includes "decimal precision/scale, spec form" which changes how decimal-typed values serialise under EncodeJSON: a scale-2 value 0.33 (wire bytes 0x21) now emits as the codepoint-mapped string "!" rather than the numeric 0.33, matching Java's JsonEncoder output. The shredder coerce bridge in iceberg/shredder/temporal.go stays — it's now a safety net rather than load-bearing infrastructure. The metadata-side fix in confluent/ecs_avro.go also stays because it parses schemas into schema.Common independently of twmb (the iceberg output's schema_metadata path uses Common, not twmb's schemaNode). Coverage: - TestUpstreamTwmbHonoursSiblingFormLogicalType (new): pins the upstream PR #38 behaviour by asserting that sibling-form schemas decode to time.Time end-to-end. If twmb ever regresses on this, the test surfaces it in the package that depends on the contract. - TestSchemaRegistryDecodeAvro / TestSchemaRegistryDecodeAvroRawJson: pos_0_33333333 default-mode expectation updated from `0.33` to `"!"` per the spec form. Preserved-mode expectation unchanged — our preserveLogicalTypeOpts decimal CustomType still produces json.Number, which the SetStructuredMut path preserves through Go's json.Marshal. CHANGELOG: a "Changed (potentially breaking)" entry documents the decimal serialisation shape change for default-mode users and points at preserve_logical_types: true as the migration knob.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR expands AMQP input metadata functionality to include all fields in the amqp.Delivery struct with the exception of Acknowledger and MessageCount. It also adds support for all types supported by amqp.Table and includes support for nested amqp.Table structs.
I created a gist that demonstrates these additions.
-Chris