Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Allow an inline stream map to set output stream name dynamically #2502

Open
menzenski opened this issue Jun 24, 2024 · 1 comment
Open

Comments

@menzenski
Copy link
Contributor

Feature scope

Other

Description

See this thread (and this message in particular) in the Meltano slack for more context.

Here's the scenario I'd like to be able to implement:

Given a tap that produces messages to a generic database_records stream, where the records in that stream have a namespace object with a database and collection property (”namespace”: {“database”: “customer_service”, “collection”: “Customer”} for example), I'd like to dynamically split the database_records stream into many streams, one for each namespace.database and namespace.collection value. For the example record with ”namespace”: {“database”: “customer_service”, “collection”: “Customer”} that should be mapped to a new stream with stream_id customer_service-Customer here (as the hyphenating will let us take advantage of handling in the target to write this record to a specific table).

@menzenski menzenski added kind/Feature New feature or request valuestream/SDK labels Jun 24, 2024
@edgarrmondragon
Copy link
Collaborator

This is even tougher than I imagined and I think you were hinting at this in Slack. The way stream maps currently work is by generating:

  • one SCHEMA message for each stream map
  • one RECORD message for each record and stream map

So if a stream with 10 records has 2 stream maps applied to it, then the resulting mapped stream will emit:

  • 2 SCHEMA messages
  • 20 RECORD messages

Now, in the current implementation the generated SCHEMA messages don't depend on the individual records, only on the stream map expression. With this proposal however, SCHEMA messages would also need to be dynamic since the stream alias would depend on the contents of each individual record, so this change would require some non-trivial refactoring of the stream maps implementation.

That is, for any implementation of a "stream splitter" like this, based on the SDK stream maps or not, the following transformation would have to occur:

Original Singer output:

{"type": "SCHEMA", "stream": "tenant_resources", "schema": {"properties": {"tenant_id": {"type": "string"}, "resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_002", "resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_002", "resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_resources", "record": {"tenant_id": "tenant_001", "resource": "resource_C"}}

Transformed output based on the tenant_id property:

{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_B"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_B"}}
{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_C"}}

Notice because of the arbitrary order of records, a SCHEMA message may be generated multiple times. With some smart caching, we might get to simplify this to:

{"type": "SCHEMA", "stream": "tenant_001-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_A"}}
{"type": "SCHEMA", "stream": "tenant_002-resources", "schema": {"properties": {"resource": {"type": "string"}}, "type": "object"}, "key_properties": []}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_A"}}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_002-resources", "record": {"resource": "resource_B"}}
{"type": "RECORD", "stream": "tenant_001-resources", "record": {"resource": "resource_C"}}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants