Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Callback with offset token once streaming blob is registered/queryable #649

Open
wesleyhillyext opened this issue Nov 3, 2023 · 1 comment

Comments

@wesleyhillyext
Copy link

Currently, it appears that the way for a client using Snowpipe Streaming (via the Ingest SDK) to monitor what's actually been committed to a table (by which I mean "is queryable in Snowflake") is to use the SnowflakeStreamingIngestClient.getLatestCommittedOffsetTokens method (and its counterpart on the channel class).

Question: How often is it reasonable to poll this method? E.g. is every 10 seconds, with a list of a hundred or more channel names, too much?

Feature request: Provide a way for the SDK to give the client application a callback once a blob is registered and queryable in the table in Snowflake. The callback would contain the offset token committed alongside that blob.

This would enable the client application to implement some pretty interesting features without the additional lag (and additional load on Snowflake) imposed by the polling approach. Some uses-cases we have in mind to build on top of this in our application are:

  1. Realtime monitoring of ingestion progress.
  2. Wait for data to be available in one table before ingesting to another table. E.g. ensure that a parent table has a row queryable before inserting rows into a child table which would reference that parent table. Or to ensure that all rows pertaining to some operation are queryable before inserting a row stating that that operation is "complete".
  3. When there is something which consumes from a Snowflake Stream defined on the table being ingested to, better align the time of when that consumer executes relative to when new data becomes available to that stream.

In terms of the API for this in the SDK, some options include:

  1. Add to OpenChannelRequestBuilder a setCommittedOffsetTokenCallback(Consumer<String> callback, Executor executor) method.
  2. If this SDK targets Java 9 or later, add to SnowflakeStreamingIngestChannel a Flow.Publisher<String> getCommittedOffsetTokensPublisher() method (see [1], [2], [3]).

One nice property about the linear nature of ingest channels is that only the latest offset token is interesting, so the callback publisher is free to drop offsets if the callback consumer is still processing a previous one, as long as it doesn't drop the latest one.

I'm admittedly assuming that the SDK actually knows once data is queryable in Snowflake as part of the blob registration service.

[1] https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/Flow.Publisher.html
[2] The SubmissionPublisher<T> implementation of that: https://docs.oracle.com/en/java/javase/21/docs/api/java.base/java/util/concurrent/SubmissionPublisher.html
[3] If targeting Java 8, could use the org.reactivestreams library for Publisher instead.

@sfc-gh-lsembera
Copy link
Contributor

sfc-gh-lsembera commented Dec 19, 2023

Hi @wesleyhillyext, thank you for the feature idea. Regarding your question, feel free to poll for the channel status whenever your business logic demands it. 10 seconds for 100 channels is perfectly fine.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants