Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option to use message index instead of record sequence for partitioning #470

Merged
merged 6 commits into from Nov 1, 2022

Conversation

nicoloboschi
Copy link
Contributor

@nicoloboschi nicoloboschi commented Oct 17, 2022

Motivation

Currently all the partitioners use the recordSequence field in order to partition messages. That field doesn't handle batch messages and, consequently, objects could be loss on the target storage. The record sequence has the same value for all the entries in a batch message. Only the last entry of the batch will be persisted on the storage.

Modifications

Documentation

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@github-actions
Copy link

@nicoloboschi:Thanks for your contribution. For this PR, do we need to update docs?
(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@github-actions github-actions bot added the doc-info-missing This pr needs to mark a document option in description label Oct 17, 2022
@alpreu
Copy link
Contributor

alpreu commented Oct 17, 2022

The record sequence has the same value for all the entries in a batch message. Only the last entry of the batch will be persisted on the storage.

Can you explain this scenario a bit more, why would only the last message be persisted? If all messages in a batch share the same recordSequence, I would expect them to end up in the same output file

@nicoloboschi
Copy link
Contributor Author

nicoloboschi commented Oct 17, 2022

The record sequence has the same value for all the entries in a batch message. Only the last entry of the batch will be persisted on the storage.

Can you explain this scenario a bit more, why would only the last message be persisted? If all messages in a batch share the same recordSequence, I would expect them to end up in the same output file

Sure. Let's take this scenario. We set batchSize=2.
We receive 3 records that are in the same BK entry, so they have the same recordSequence.
1 and 2 are in the same flush cycle. Record 1 is taken to get the output filename (-> recordSequence). S3 objects created.

When the third record is flushed, the resulting output filename is the same because recordSequence is the same of the first one. So the previous object is overridden in S3.

Copy link
Member

@freeznet freeznet left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nicoloboschi
The use of a message index for the partitioner is okay, it gives the user another option to use. So overall the feature LGTM. But this PR is not solving the root cause here, which is the cloud storage connector do not handle file overwritten case. When a target file exists, the cloud storage connector should either load the file content, append the new content and overwrite it, or use another file name to save the file.

@nicoloboschi
Copy link
Contributor Author

But this PR is not solving the root cause here, which is the cloud storage connector do not handle file overwritten case.

It's true. Moreover the index approach is not enabled by default. But the solution is not that easy.

When a target file exists, the cloud storage connector should either load the file content, append the new content and overwrite it

This will violate the batchSize parameter

use another file name to save the file.

That would make sense. A time-based solution could be implemented as a new partitioner type.

However both the time-based approach and the index one will not guarantee absence of duplicates in case of ProcessingGuarantees = ATLEAST_ONCE.
The only way to avoid duplicates is to set batchSize = 1 (but it's not a good idea in terms of efficiency)

I think it's fine to merge this pull anyway.

Comment on lines 85 to 93
} else {
LOGGER.debug("Found message {} with hasIndex=true but index is empty, using recordSequence",
message.getMessageId());
}
} else {
LOGGER.debug("partitionerUseIndexAsOffset configured to true but no index found on the message {}, "
+ "perhaps the broker didn't exposed the metadata, using recordSequence",
message.getMessageId());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good idea to implicitly fall back to the recordSequence if the user has configured the partitionerUseIndexAsOffset property. At the very least this should be a WARN log, or we should throw an exception here imo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand your concern.
The problem is that who installs the sink may not know if the broker will put the index in the record metadata.

In my first implementation this was a WARN level: 15659aa#diff-ed19908cc0ff954ebf1f795a0e3fd708a3b1be501cda66c22ea1906d771da90cR91

This is the same behavior (except for the special batch messages handling) present in Kafka connect's connectors in Apache Pulsar: https://github.com/apache/pulsar/blob/4c22159f5a972e7a92382b9a90c6c70c43c5d166/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSink.java#L312-L359

To improve the usability we could:

  • Explicitly state the behavior in the parameter doc
  • Set those logs to WARN

Another (more complex) option would be to add another flag "noIndexAction" that regulates if throws error or accept records without index when partitionerUseIndexAsOffset is true.
Although, I would avoid to add another boilerplate option

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's not add another flag :) I think moving the log to WARN and extending the parameter description is a good tradeoff

@alpreu
Copy link
Contributor

alpreu commented Oct 26, 2022

Could you resolve the doc conflicts and add the new parameter to the azure provider config properties too? Then it's good to merge IMO :)

Copy link
Contributor

@alpreu alpreu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you! LGTM

@alpreu alpreu merged commit a7672fb into streamnative:master Nov 1, 2022
alpreu pushed a commit that referenced this pull request Nov 9, 2022
alpreu pushed a commit to alpreu/pulsar-io-cloud-storage that referenced this pull request Mar 8, 2023
alpreu pushed a commit that referenced this pull request Mar 9, 2023
alpreu pushed a commit that referenced this pull request Mar 9, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-info-missing This pr needs to mark a document option in description
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants