Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for hierarchical streams #97

Closed
MeltyBot opened this issue Apr 13, 2021 · 1 comment
Closed

add support for hierarchical streams #97

MeltyBot opened this issue Apr 13, 2021 · 1 comment

Comments

@MeltyBot
Copy link
Contributor

MeltyBot commented Apr 13, 2021

Migrated from GitLab: https://gitlab.com/meltano/sdk/-/issues/97

Originally created by @kgpayne on 2021-04-13 15:44:53


In a similar vein to #20 and #22 there are cases where an upstream endpoint has a 1:many relationship to streams in a tap (rather than the simple 1:1) case.

A real-world example can be found in tap-tableau-wrangler, where Workbook files are fetched from Tableau Server and are introspected to extract child entities (Datasource, Connection, Relation and Table Reference objects). In this case only 1 endpoint is used - get_workbook, however multiple entities are extracted (mapping to individual singer Streams, hence 1:many). Specifically, I cannot extract a child Datasource without first downloading its parent Workbook. To solve this, I created a shim Service class that downloads changed Workbooks and presents an interface to access flattened lists of child entities. This is not ideal as it means i) a long initialisation step while the Service class collects Workbooks and extracts child objects, ii) poor memory performance from storage of long lists of dicts/records in-memory and iii) poor fault-tolerance as bookmarking is only done once all upstream Workbooks are downloaded during initialisation (as the download effectively happens outside the Singer context, in the Service class). I think this case could be handled 'natively' in the Singer SDK!

My current implementation effectively produces the following message stream:

{"type": "SCHEMA", "stream": "workbook", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 1, "name": "First Workbook"}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 2, "name": "Second Workbook"}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 3, "name": "Third Workbook"}}
{"type": "SCHEMA", "stream": "workbook_connection", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 1, "name": "First Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 2, "name": "First Workbook Second Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 3, "name": "First Workbook Third Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 4, "name": "Second Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 5, "name": "Third Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 6, "name": "Third Workbook Second Connection"}}
{"type": "STATE", "value": {"workbook": 3}}

Here we see the flattening in action - all workbook records are emitted, then all workbook_connection records and so on.
If we successfully implement hierarchical streams, I would expect the output to be:

{"type": "SCHEMA", "stream": "workbook", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 1, "name": "First Workbook"}}
{"type": "SCHEMA", "stream": "workbook_connection", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 1, "name": "First Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 2, "name": "First Workbook Second Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 3, "name": "First Workbook Third Connection"}}

{"type": "SCHEMA", "stream": "workbook", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 2, "name": "Second Workbook"}}
{"type": "SCHEMA", "stream": "workbook_connection", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 4, "name": "Second Workbook First Connection"}}

{"type": "SCHEMA", "stream": "workbook", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook", "record": {"id": 3, "name": "Third Workbook"}}
{"type": "SCHEMA", "stream": "workbook_connection", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 5, "name": "Third Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 6, "name": "Third Workbook Second Connection"}}

{"type": "STATE", "value": {"workbook": 3}}

(blank lines are added for readability)

From the example output above, it is clear that the volume of STATE SCHEMA messages is dramatically increased, being a multiple of the total number of parent records. This assumes that targets expect homogeneous blocks of RECORD messages until the next SCHEMA message. If this assumption is false (i.e. targets can consume a mix of RECORD messages from different streams, so long as SCHEMA messages were sent before the first occurrence of each RECORD stream and ordering of messages in the same stream is maintained), the below output would be far more efficient:

{"type": "SCHEMA", "stream": "workbook", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}

{"type": "RECORD", "stream": "workbook", "record": {"id": 1, "name": "First Workbook"}}

{"type": "SCHEMA", "stream": "workbook_connection", "key_properties": ["id"], "schema": {"required": ["id"], "type": "object", "properties": {"id": {"type": "integer"}}}}

{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 1, "name": "First Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 2, "name": "First Workbook Second Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 3, "name": "First Workbook Third Connection"}}

{"type": "RECORD", "stream": "workbook", "record": {"id": 2, "name": "Second Workbook"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 4, "name": "Second Workbook First Connection"}}

{"type": "RECORD", "stream": "workbook", "record": {"id": 3, "name": "Third Workbook"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 5, "name": "Third Workbook First Connection"}}
{"type": "RECORD", "stream": "workbook_connection", "record": {"id": 6, "name": "Third Workbook Second Connection"}}

{"type": "STATE", "value": {"workbook": 3}}

(blank lines are added for readability)

i.e. a nested loop.

To achieve this, we would need to i) add a children attribute to the base Stream class that will contain a list of child Stream classes, ii) update the Stream.sync() called by the Tap class to iterate over child Streams for each response from .get_records() passing context from the parent stream as input and iii) support a tuple yielded by get records of format (record, parent_context) by parents to be consumed by children. This would allow a child Stream to receive context from the parent instance effectively implementing our nested loop 🎉

For example:

class WorkbookConnection(Stream):

    name = 'workbook_connection'
    schema_filepath = SCHEMAS_DIR / 'workbook_connection.json'

    def get_records(self, workbook_xml):
        for record in self.extract_workbook_connection_records(workbook_xml):
            yield record

class Workbook(Stream):

    name = 'workbook'
    primary_keys = ['id']
    replication_key = 'updated_at'
    schema_filepath = SCHEMAS_DIR / 'workbook.json'
    children = [WorkbookConnection]

    def get_records(self, *args, **kwargs):
        bookmark = self.get_stream_state().get('replication_key')
        for workbook_xml in self.get_new_workbooks(bookmark):
            wb_record = self.extract_workbook_record(workbook_xml)
            # This Stream has children, so second tuple value should be passed to them.
            yield (wb_record, workbook_xml)

Foreseeable Challenges

  • Discovery - how do we discover child streams? How do we support stream and stream attribute selection?
  • Instantiation - how are child streams instantiated (this is currently done as part of discovery in the Tap)

How is this different to Partitioning?

For partitioned restful endpoints (e.g. /groups/{id}/epics/{secondary_id}/issues where child resources are accessible directly so long as a parent ID is known) this can be solved with the partition feature from #22. This effectively still a 1:1 mapping of endpoint to stream, provided you have the parent ID (which is what the partitioning feature provides).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants