Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Questions about Fault-Tolerance and Exactly-Once Delivery mechanisms #58

Closed
yarmiganosca opened this issue Oct 9, 2019 · 7 comments
Closed

Comments

@yarmiganosca
Copy link

The current docs for this connector state

Messages will neither be duplicated nor silently dropped. Messages will be delivered exactly once, or an error message will be generated. If an error is detected while loading a record (for example, the record was expected to be a well-formed JSON or Avro record, but wasn’t well-formed, then the record is not loaded; instead, an error message is returned.

I have several questions about this:

  1. Can you explain what the mechanism is for ensuring that rows are only inserted once into the ingest table?
  2. How is that mechanism tolerant of tasks crashing and restarting at a particular offset? Or of having that partition assigned to another task while the first is down?
  3. How can I see these error messages?
  4. Will one row in a batch causing an error stop the rest of the batch?

Later in the same section you call attention to

Instances of the Kafka connector do not communicate with each other. If you start multiple instances of the connector on the same topics or partitions, then multiple copies of the same row might be inserted into the table. This is not recommended; each topic should be processed by only one instance of the connector.

  1. When you use the word "instance" here, does that mean connector registration, or connector task? (I'm pretty sure I know the answer to this one, but I just want to have it explicitly confirmed).
@raphaelauv
Copy link

raphaelauv commented Apr 6, 2021

  1. from snowflake support : every file send from the connector to snowflake ( containing x rows of messages ) is store and keep in an S3 bucket ( for AWS ) and they compute the MD5 so they never ingest two time the same file.

So it's absolutely not enough for stream process were files size and content is not deterministic . Far from exactly-once !

  1. at least once could be assume if offsets wre committed ONLY if file is correctly receive from snowflake and snowpipe correctly asserted that the file can be ingest (no S3 consistency error by the snowpipe ingestor ) and not deleted before ingestion by the unsafe thread of file cleaning launch by the connector.

  2. from the recent commits I can see that logs are not verbose and not well manage

  3. Few issues talk about silent errors in case of problems with avro, so it would need to check the source code.

After a first look to the code :

There is a huge amount of really NOT elegant concurrent code (locks and shared data structure to track "current work" ) and WRONG concurrent code ( values shared between threads , not atomic or even volatile )
There is absolutely no assert of the concurrent operations and some weird manual operations on the committed offsets. The logic is way to much complex and over-engineered for what it should be.

  1. Documentation is really not good , I think like you , they mean "connector registration" , not task since its okay to have X task , no one same partition will be consume more than once.

Conclusion , this connector is really weakly write and tested

@raphaelauv
Copy link

raphaelauv commented Apr 14, 2021

To be exactly-once the flush of events have to be deterministic ( if the connector restart, it always recreate the same files , with the same amount of events from the same offsets , like msg 5144 to 5155 )

But with a NON deterministic flush time rule it's impossible to be exactly-once ->

return (System.currentTimeMillis() - this.previousFlushTimeStamp) >= (getFlushTime() * 1000);

and even this logic is buggy ->
#245

The time flush should be base on the time on the events ( see the doc of confluent -> https://docs.confluent.io/kafka-connect-s3-sink/current/index.html#exactly-once-delivery-on-top-of-eventual-consistency )

Having this done and if snowpipe really never re-ingest the same file ( based on file name and MD5 ) then it will be exactly-once

@raphaelauv
Copy link

raphaelauv commented Apr 14, 2021

The connector is not waiting for a snowpipe confirmation of the ingestion of the files before updating in intern the offsets to commits at the next flush call by the kafka-connect framework.

->

So if anything happen to this file before snowpipe ingest it , or if the files is impossible to ingest , the data could be lost ( cause not every topic have unlimited retention ).

From my understanding the connector is not even at-lest-once

@sfc-gh-japatel
Copy link
Collaborator

Hi @raphaelauv thank you for giving the feedback and looking into the code in detail.

  1. The Kafka connector is not exactly once - As you correctly pointed out, since the flush time is non deterministic.
  2. Snowpipe will not reingest the file again since there is a de-duplication logic on the server side, but lets say for instance a file with offset 0-100 is created and ingested but precommit was not successful, there is a possibility that next time the file with offset 0-99 is created. Since this is a new file for Snowpipe, there can be a data duplication but this is extremely rare.
  3. Regarding at-least-once: If there is any failure in snowpipe, we would put this file in table stage as a failure recovery. (Table stage comes with SF table)

CC: @sfc-gh-zli

@raphaelauv
Copy link

raphaelauv commented Apr 23, 2021

Hello @sfc-gh-japatel

about 1 -> the documentation need corrections

about 2 -> the connector is not exactly-once , what you do to LIMIT the duplication has no interest.

about 3 -> It's a snowflake-connector not a S3 connector. I expect my data to be available in a snowflake table.

The connector should not commit offsets if some data is not ingest, cause having a corrupted file in a bucket is not what I expect.

I already see 2 cases that make the connector not at-least-once :

  • If the connector do not deserialize correctly events (bug in the code of the connector) then data will be corrupt

    And there is already a case, if the schema-registry is not available for any reason to answer this line :

    Then by default the connector do not fail and skip the message by writing it in table_stage.
    break.on.schema.registry.error should be true by default.

  • if the file sent is corrupted by network ( let me guess you do not assert md5 of the sent file )

  • if the file is moved or deleted by the connector or any other S3_policy

    and there is a case : files are moved if snowpipe have more than 1 hour of lag : Files older than 1 hour are moved to table stage #172


Conclusion by default the connector should be at-least-once and not a best-effort connector, or say it in your documentation.

@sfc-gh-rcheng
Copy link
Collaborator

Snowpipe does not guarantee exactly once. Snowpipe streaming does guarantee exactly once

Closing this issue out due to age - please reopen if further discussion is needed.

@raphaelauv
Copy link

@sfc-gh-japatel it's about at-least-once do you have a formal proof that the connector is at-least-once ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants