Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events notification system for Nessie - Reference Implementation #6943

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

adutra
Copy link
Contributor

@adutra adutra commented May 31, 2023

No description provided.

@adutra adutra requested review from omarsmak and tomekl007 May 31, 2023 09:54
@adutra
Copy link
Contributor Author

adutra commented May 31, 2023

@omarsmak and @tomekl007 : adding you both as reviewers, since you showed interest in this :-)

Copy link
Contributor

@tomekl007 tomekl007 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome work!


### Serialization and deserialization

`KafkaEventSubscriber` uses the [Avro] library to define a domain model that is specific to the subscriber, and is not
Copy link
Contributor

@tomekl007 tomekl007 May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's great that we use Avro.


In this example, the destination Kafka topic is going to receive messages of three different types. This is a common
scenario in Kafka, but requires some extra work to be done on both the producer and consumer sides, especially with
respect to schema handling. The strategy we chose to use here is outlined in [Martin Kleppmann's excellent article] and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate more on the most important aspect in our context of why we picked this strategy?
Was it any events that need to stay in a fixed order must go in the same topic

Copy link
Contributor Author

@adutra adutra May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose this strategy for the reference impl because this strategy does not require to pre-configure the schema registry, which makes it easier to test the producer.

IOW, when the topic is created and the first message comes in, the producer will upload the schema since it's not present in the registry yet. It will do this for each new message type, since the strategy is to have one schema per (topic + message type) pair.

Other strategies, for example those outlined in Robert Yokota's article, require the registry to be set up with all the schemas prior to using the topic.

### Using headers efficiently

The `KafkaEventSubscriber` implementation also illustrates how to use Kafka headers to pass additional information along
with the message. In this case, the subscriber adds the repository id, the user and the event creation timestamp to the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we consider putting creation timestamp in the event itself? Instead of headers? It maybe beneficial in the situation when the end user would like to store the events offline for further analysis. If the creation timestamp is not there, users will need to create a new avro schema that contains the timestamp and map our event + timestamp to this new schema and finally save the event.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the event timestamp can be part of the message if you prefer. Here I'm using headers, but it's just an example.


The `KafkaEventSubscriber` implementation also illustrates how to handle events in a non-blocking way, by publishing
messages without waiting for the broker's acknowledgement. This is achieved by using the `KafkaProducer#send` method in
a fire-and-forget mode, along with the `acks=0` producer configuration option. If your implementation needs to wait for
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I know, using acks=0 is not recommended in the majority of cases. I am not sure if using 0 in the reference implementation is the best solution.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply. You are right, and I had to open my Kafka in Action book to refresh my skills around acks and send methods :-)

In short, we have 3 send methods:

  1. fire-and-forget: don't wait for the future to complete, don't provide any callback;
  2. async: don't wait for the future to complete, but provide a completion callback;
  3. sync: wait for the future to complete (synchronously).

With Nessie, 1 and 2 are fine, but 3 should be avoided (or at least, isBlocking() should return true).

The way these methods really work will further depend on the acks setting (and retries). In short, the returned future is completed when:

  1. With acks=0 => immediately when the message is sent;
  2. With acks=1 => when the broker acknowledged the write;
  3. With acks=all => when the broker and all the in-sync replicas acknowledged the write.

Which brings us to the conclusion: the number of acks is relatively orthogonal to the send method.

With all that in mind, I will indeed change the example to use the async send method AND ack=1, since both seem reasonable choices for this use case.

@@ -0,0 +1,39 @@
@namespace("com.example.nessie.events.generated")
protocol NessieEvents {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it model all the possible events or only a subset of them?
If the latter, maybe we should document for end users what's the best way to extend it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't model merge and transplant events indeed, since in this example, we don't process them. I will add a note.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I added comments to all record types in the Avro schema, and also included a new record type to model Merges and Transplants (it's not used in the example, but the record definition is there). Let me know what you think.

public boolean isBlocking() {
// We don't wait for the broker acknowledgement, so we can tell Nessie
// that we don't do blocking I/O.
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does the nessie handle the situation when isBlocking==true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It defers the call to onXYZ to a worker thread, as opposed to calling the method on the event loop thread.

String repositoryId = upstreamEvent.getRepositoryId();
String referenceName;
if (downstreamEvent instanceof CommitEvent) {
referenceName = ((CommitEvent) downstreamEvent).getTargetReference();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the naming inconsistency? getTargetReference vs getReference below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Nessie's Events API, a CommitEvent inherits from CommittingEvent where 2 methods are defined: getSourceReference() and getTargetReference(). In the case of a commit, both references are always the same (but they are different for merges and transplants).

Whereas a ReferenceEvent has a distinct getReference() method, because it's always about one single reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was recently changed FYI, a CommitEvent now only has one getReference() method.

org.projectnessie.events.ri.console.PrintingEventSubscriber

# Kafka event subscriber
org.projectnessie.events.ri.kafka.KafkaEventSubscriber
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a limit on the number of Subscribers that we can support?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No hard limit, but the more subscribers, the more you mobilize Nessie's event loop and/or worker threads. I am planning some benchmarks later on.

AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,
"http://localhost:" + SCHEMA_REGISTRY.getMappedPort(8081));
props.put(
AbstractKafkaSchemaSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we specify what part (which field) of the record is used by the TopicRecordNameStrategy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this strategy, the schema is recorded under the subject <topic name>-<schema name>, so you don't need to select a field in the record. E.g. for the Avro ReferenceEvent message, the schema name is ReferenceEvent.

@snazy snazy added this to the 0.61.0 milestone May 31, 2023
Copy link
Contributor

@omarsmak omarsmak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question, I guess we can't have an example based on KafkaConnect since the producer runs in Nessie server, correct?

### Serialization and deserialization

`KafkaEventSubscriber` uses the [Avro] library to define a domain model that is specific to the subscriber, and is not
tied in any way to Nessie's. This is indeed the recommended approach, as it allows the subscriber to define its own
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why tiding the implementation with Avro? Wouldn't make sense to have approach to let users definer their "choice" format? You could have some prefer using Avro, some may prefer protobuf and some may even prefer JSON :) . However, since is an example, could be fine though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would go beyond the purpose of a reference implementation. As you said, this is an example and what matters most imho is that people can easily adapt it to their own needs. Here, it shouldn't be very hard to switch to protobuf for instance, so I don't see a lot of value in actually providing the protobuf bindings + another subscriber.

@adutra
Copy link
Contributor Author

adutra commented Jun 1, 2023

I guess we can't have an example based on KafkaConnect since the producer runs in Nessie server, correct?

That, and also the fact that providing a full-blown source connector for Nessie would certainly be an interesting project to do, but also a bit out of scope for a reference implementation of the Events SPI module (especially because it would be less portable / adaptable by a variety of users).

@adutra
Copy link
Contributor Author

adutra commented Jun 6, 2023

@tomekl007 or @omarsmak could I get another round of review please?


plugins {
id("nessie-conventions-server")
id("com.github.davidmc24.gradle.plugin.avro") version "1.7.1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be defined in libs.versions.toml

@@ -6,6 +6,7 @@ nessie-compatibility-tests=compatibility/compatibility-tests
nessie-compatibility-jersey=compatibility/jersey
nessie-events-api=events/api
nessie-events-quarkus=events/quarkus
nessie-events-ri=events/ri
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not in the nessie-bom?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of the event modules is declared in :nessie-bom... I guess that's an oversight.

@snazy snazy modified the milestones: 0.61.0, 0.62.0 Jun 12, 2023
@snazy snazy requested review from omarsmak and tomekl007 June 14, 2023 14:25
@snazy snazy modified the milestones: 0.62.0, 0.63.0, 0.64.0 Jun 14, 2023
@snazy snazy modified the milestones: 0.64.0, 0.65.0, 0.66.0 Jun 27, 2023
@snazy snazy removed this from the 0.66.0 milestone Jul 6, 2023
@adutra adutra force-pushed the events-ri branch 2 times, most recently from 9fe6a3e to 6409007 Compare August 4, 2023 09:30
This commit introduces a reference implementation
for the Nessie events notification system.

The reference implementation showcases an Apache
Kafka producer putting events from Nessie in a
Kafka topic, using Apache Avro for the messages.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants