Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Add end_date support in generic tap config #922

Open
Tracked by #1
aaronsteers opened this issue Aug 23, 2022 · 7 comments
Open
Tracked by #1

Feature: Add end_date support in generic tap config #922

aaronsteers opened this issue Aug 23, 2022 · 7 comments

Comments

@aaronsteers
Copy link
Contributor

aaronsteers commented Aug 23, 2022

Feature scope

Taps (catalog, state, stream maps, etc.)

Description

There are many use cases which could benefit from a generic and SDK-handled end_date config option, to match the start_date config option that is specified in the Singer Spec, and which most taps already support.

A few use cases:

  1. Rerunning a backfill (aka a "restatement") covering just a specific time period. (E.g. restating all of "Q3 2021" due to data corruption or to account for an initial start_date value such as Jan 1 2022, which was not inclusive of all time periods.)
  2. Running date partition syncs in parallel, for instance: extracting all of 2020 in one batch, all of 2021 in another batch, and YTD 2022 in a final batch.
  3. Especially for new EL pipelines: prioritizing recent dates' data over prior dates. For instance: We may start with extracting "current year YTD" as highest priority. Then, later once the data source is running and operational, we may want to keep backfilling one year at a time: 2021, 2020, 2019, etc., with the recent periods having higher priority than prior periods.
  4. Intentionally skipping over records which have not reached a minimal cool-off period.

Spec

  1. The SDK would add support for an optional end_date config input. When provided, records received from the source would be ignored and/or filtered out if they were further than the provided end_date.
  2. The SDK would never advance the bookmark beyond the end_date.
  3. The SDK would likely treat this identically to the Signposts feature, which already performs the record filtering behavior as well as the bookmark limiting feature. (In the case of the signpost, the goal is to not mark any bookmarks newer than the utcnow() calculation at the start of execution.)
  4. Different APIs would have differing levels of ability for performance optimization:
    1. An API that supports both start and end filters can get exactly the records needed.
    2. Likewise, SQL-based taps can filter for exactly the records needed.
    3. APIs that do not support an end filter can cancel early their paginated calls if the output is known to be sorted and if we've already passed the end_date limit.
    4. APIS that do not support an end filter and also do not generate a sorted output may be forced to continue paginating through all records until the end of stream. This means that extraction will be significantly slower; even though the tap will only send along matching records, the API still has to paginate through all records to find all records that match.

Spec Questions (TBD)

Still TBD is whether the end_date should be inclusive or exclusive. Probably exclusive is the correct behavior, so that Jan 1 2022 0:00:00.000 (for instance) could be used as the end_date on one pipeline and the start_date on another. If we ask users to provide an inclusive date, users are likely to provide something like Dec 31 2021 11:59:59, which (depending on precision of the source system) is subject to gaps - and therefor subject to unintentional data loss.

If we go with an exclusive logic, and given that start_date is inclusive, then the logic would be:

  • start_date <= record_date < end_date

Caveats/warnings

For merge-insert operations, especially when operations are run in parallel, it it is important to note that the latest time block should always be loaded last. This is because over the course of a parallel execution, the same record may appear in a historic period and also in the latest time window. In order to not lose the most recent changes, the final/current time period should be loaded last or (safer yet), the final/current time block should be rerun after the prior periods have been extracted and loaded.

A theoretical example: a load for "2021" and "2022 YTD" are running in parallel A customers table record ABC Customer has not been updated since Dec 2021. It is updated at the same time as the load is running, and "moves" into the 2022 YTD bucket after already being picked up in the 2021 bucket. If 2021 loads to the warehouse after the 2022 YTD bucket is loaded, the older change will overwrite the newer one, causing data loss.

The way to resolve this is to either wait until backfills have run before running the most recent period, or to rerun the latest period so that the newer version of the record once again becomes the primary/correct version of the record in the warehouse.

Since these challenges are reasonably addressed with some best practice documentation and/or additional intelligence added to the orchestrator that wants to run these in parallel (such as Meltano), there doesn't seem to be a strong reason not to build this feature.

See also

@aaronsteers aaronsteers added kind/Feature New feature or request valuestream/SDK labels Aug 23, 2022
@qbatten
Copy link
Contributor

qbatten commented Aug 23, 2022

If the end_date feature allowed an arbitrary python or SQL expression, that would vastly expand the usefulness of this and would solve your use-case item 4, "Intentionally skipping over records which have not reached a minimal cool-off period," which I am working on solving in my Meltano setup right now. If the end_date statement were python, it would be really really helpful to have access to the datetime library in the execution environment or some equivalent (a datetime object would probably suffice too). Of course, SQL comes with some built-in datetime handling; python does not... or rather it doesnt in the simpleeval execution environment that's used in stream-maps expressions.

To expand on the use-case I'm hoping this feature could help solve:

I'm using the transferwise postgres tap to incrementally load from a read replica of a production DB. (One implication of this is that I can't alter the source data at all.) The table I'm having trouble with represents "daily visits", so a row is created for each user each day upon their first visit to the site, and the values in it are updated regularly throughout the day if they return. The table is guaranteed to be finalized by EOD UTC. One way this could be solved in theory is if the source table had an "updated_at" field which could be used for incremental loading, but this table doesnt have that field (since Meltano can't handle this use-case right now, the solution we're pursuing is adding that field to this table).

It seems like it should be straigthforward to solve— I just, somewhere in Meltano, need to be able to write a python or sql expression that selects only rows where created_at > utcnow() - datetime.timedelta(hours=-24), or something similar. However this is not achievable in stream maps due to the limitations around simpleeval (no datetime library 😞). This feature could help solve this problem if its built in a flexible manner (allowing for execution of arbitrary python or sql statements).

There was a discussion of this problem in slack as well.

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Aug 23, 2022

@qbatten - Thanks for adding this detail. The question or if/how to accept dynamic expressions into the config is something we may need to think more about. I'd want to be especially careful with that.

The idea of adding dynamic datemath expression extensions into the Stream Maps capability is an easy "yes" in terms of deciding if we should add it. You can search md5 for an example of how we grafted in custom functions into the mapper, and use a similar pattern for some useful datemath expressions.

@aaronsteers
Copy link
Contributor Author

On it's own, the end_date to be dynamic, we generally have to run a pre-step that seeds the environment variable from a linux date expression. (This is what we do currently for start_date expressions.)

@tayloramurphy
Copy link
Collaborator

@tayloramurphy
Copy link
Collaborator

@aaronsteers thoughts on this as part of a 1.0?

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Oct 20, 2022

@aaronsteers thoughts on this as part of a 1.0?

Yes, I think it would be great to get in by 1.0. It opens up new options for us and our users.

@aaronsteers
Copy link
Contributor Author

aaronsteers commented Mar 2, 2023

Related, due to cooloff requirements on the source API, there's a WIP PR here to add this to tap-cloudwatch:

This specific comment includes a proposal for relative end_date, which if adopted could also be a pattern applicable to start_date as well:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants