Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat. JetStream Support for Source #4

Open
wants to merge 25 commits into
base: main
Choose a base branch
from

Conversation

somratdutta
Copy link

We have added support for NATS JetStream for Source.
Also added is a test-case for the Source class.
We are open to feedback and actively working on to improve the current state of this project.

return batchSize;
}

public boolean isCumulativeAck() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was mistakenly pushed, we have removed this now.
Thanks for noticing.

private final String consumerName;
private final int batchSize;

private NATSConsumerConfig(Builder builder) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm concerned about introducing yet another consumer config when there area already so many options to chose from in the library. Is there something we could make serializable in the client that would help?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I agree, we shouldn't be using so many options.
Further plan is to send a PR in nats-client library to make it Serialisable.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, the PR to make certain classes Serializable was rejected, I don't think we have other options.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this now, we will have to make something.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's what I'm doing. I'm changing the ConsumerConfiguration builder to accept json. That's just step one. This allows us to make a simple class to use:


public static class SerializableConsumerConfiguration implements Serializable {
    private static final long serialVersionUID = 1L;

    private String json;
    private transient ConsumerConfiguration cc;

    public SerializableConsumerConfiguration() {
        cc(ConsumerConfiguration.builder().build());
    }

    public SerializableConsumerConfiguration(ConsumerConfiguration cc) {
        cc(cc);
    }

    public SerializableConsumerConfiguration cc(ConsumerConfiguration cc) {
        this.cc = cc;
        json = cc.toJson();
        return this;
    }

    public String getJson() {
        return json;
    }

    public ConsumerConfiguration getConsumerConfiguration() {
        return cc;
    }

    private void writeObject(java.io.ObjectOutputStream out) throws IOException {
        out.defaultWriteObject();
    }

    private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        cc = ConsumerConfiguration.builder().json(json).build();
    }
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm adding SerializableConsumerConfiguration directly to JNATS in the helpers section. Please review: nats-io/nats.java#1156

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's great @scottf, will review

private final String id;

// Package-private constructor to ensure usage of the Builder for object creation
NATSJetstreamSource(DeserializationSchema<T> deserializationSchema, ConnectionFactory connectionFactory, String natsSubject, NATSConsumerConfig config) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can DeserializationSchema be replaced with existing PayloadDeserializer? Or is there a specific reason you made this choice?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink has inbuilt implementation for Deserializer and Serializers.
In our application code, we are heavily invested in using Flink's native Deserializer (& Serializers) as our current Pulsar-Flink connector implements it. In my humble opinion, this would be the same for other folks moving from another messaging framework. Thoughts?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Consider the PayloadDeserializer interface

OutputT getObject(String subject, byte[] input, Headers headers);

versus the primary methods in the DeserializationSchema

T deserialize(byte[] var1) throws IOException;

The Nats message is more than just the data byte array. But there is your code could implement both interfaces and delegate the getObject implementation to call deserialize.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NATSJetstreamSource<T> implements Source<T, NatsSubjectSplit, Collection<NatsSubjectSplit>>, ResultTypeQueryable<T> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you use more descriptive names instead of T See existing source for examples

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will make the changes accordingly.

return new NATSConsumerConfig(this);
}
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

@@ -0,0 +1,48 @@
package io.synadia.flink.source;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All source files must have the license. Look at any existing source file. This is a requirement of contributing to this specific repo.

@@ -0,0 +1,48 @@
package io.synadia.flink.source;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we should make this a different package, i.e. io.synadia.flink.jssource, or I suppose repackage the existing into io.synadia.flink.source.core and then have io.synadia.flink.source.js

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use Nats instead NATS for class names

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, refactored to Nats

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I totally agree with the idea of repackaging.
For an easy review of this PR, I will make those changes in the next PR as it would involve reviewing a lot of file changes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively we could put it in a different commit if @scottf is fine to review by picking out commits individually.

somratdutta and others added 12 commits November 29, 2023 14:45
- Add license to all files
- Refactored classes using NATS to Nats
- Removed isCumulativeAck() block (as never used)
- Add blank spaces at EOF to all classes
- Added more descriptive generics <OutputT> instead of <T>
Add extra line after license header
Improvements based on Contributor's comments
- Add license to all files
- Add blank spaces at EOF to all classes
- Added more descriptive generics <OutputT> instead of <T>
Refactored packages for jetstream and core Sources and Sink
- Added Sink feature for NATS JetStream
- Added test case for Sink
Add Sink Support to NATS-Flink Connector
- Removed functionality that allowed the connector to create its own stream and consumers.
- Introduced usage of the `bind` method in NatsJetstreamSourceReader to attach to existing consumers.
- Refactored NatsJetstreamSourceBuilder to accept and propagate streamName, facilitating the binding process.
Restrict to binding existing NATS Jetstream consumers in Source
somratdutta and others added 4 commits January 8, 2024 14:16
- Implement client option to choose between bounded/unbounded modes
- Write and integrate test case for unbounded mode validation
Add support for unbounded streaming mode in NATS-Flink connector
- keeping seperate PRs for JetStream Source and Sink
- Test file removed
Temporary remove NATS JetStream Sink Implementation
Copy link
Collaborator

@scottf scottf left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an initial pass, I still need to run validate.

@@ -1,7 +1,7 @@
// Copyright (c) 2023 Synadia Communications Inc. All Rights Reserved.
// See LICENSE and NOTICE file for details.

package io.synadia.flink.sink;
package io.synadia.flink.sink.core;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cannot repackage, this has been released and I do not want to major version.

package io.synadia.flink.source.js;
import java.io.Serializable;

public class NatsConsumerConfig implements Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No on this.

  • We need to find a different name for this.
    • batch size is not a consumer configuration detail, it's a consume behavior detail.
    • it could be confusing with actual consumer configuration.
  • Is there anything from the simplification api that we can match to?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Noted,
here's a list of suggested name for the above class, recommendation needed:

  • NatsConsumeOptions
  • NatsBehaviorSettings
  • NatsRuntimeParameters
  • NatsOperationalConfig
  • NatsProcessingOptions
  • NatsBehaviorConfig
  • NatsDynamicConfig
  • NatsPerformanceSettings
  • NatsConsumeBehavior
  • NatsConsumeParameters

Going for NatsConsumeOptions for now.

public Builder() {
}

public Builder withConsumerName(String consumerName) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JNats client does not use with in it's builders so i would prefer to be consistent with that.

somratdutta and others added 2 commits February 12, 2024 14:48
- Capitalization correction for 'Jetstream' to 'JetStream' across the codebase, ensuring consistency in naming conventions.
- Renamed builder method prefixes from 'with' to match the JNats client style, promoting consistency with existing JNats client usage patterns.
- Improved code readability in NatsConsumerConfig class by adding spaces around '=' operators, aligning with best coding practices.
- Renamed NatsConsumerConfig to NatsConsumeOptions to better reflect its purpose. The change addresses feedback on the naming convention, distinguishing between static consumer configurations and dynamic consume behavior settings, particularly around the 'batchSize' parameter.
- Reorganized package structure, moving repackaged classes from nats core and JetStream to a common source and sink package, enhancing project modularity and clarity.
Cosmetic changes to classes as per request
}

@Override
public SplitEnumerator<NatsSubjectSplit, Collection<NatsSubjectSplit>> createEnumerator(

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, a split will be a NATS subject. I'm curious about whether Flink will spawn multiple source reader in parallel for a single split?

I'm asking because with Kafka, a split is a partition, and apparently only one consumer will read that partition at a time. NATS is different that multiple consumers can read from the same subject. If I need multiple readers to read a very big subject, how Flink will handle that?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the subject is a split. Should only be one reader per split (per subject)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one has only one huge subject, one reader might not be enough to handle the rate. Usually we can spawn multiple nats client to read from the same subject but with this pattern there can only be one I believe.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we could try to figure out how to use multiple instances of the same pull consumer (works the same as push delivery group). So when that option is configured we would have to know the number of consumer instances and build the splits accordingly.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that we should build support for multiple consumers reading from one subject. I will work on it.
However, for this PR, would prefer to limit it to one subject one consumer.
We can create an issue in this project as requirement for record keeping.

Copy link

@sourabhaggrawal sourabhaggrawal Jun 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @scottf, just to clear if I understand it right, you are suggesting that we should have :

  1. one consumer for every subject under a regex subject like (ORDER.*) ?
  2. one consumer for every subject from list of subjects that user will provide ?
  3. multiple copies of one consumer to increase the read throughput, if subject is big ?

If I have understood it correctly you are suggesting to have multiple copies of one consumer, right ?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original implementation made one split per subject filter. I'm suggesting that we need some sort of configuration hint that tell us to make N splits for a single subject filter. I suppose that could be accomplished by allowing the subject filter multiple times in the input list.
Also, I would not characterize a subject like "order.*" as a regex, it's a wildcard. As presented, it is simply a single subject filter to a consumer.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes order.* is one subject and thats how I see it too. Now I have got more clarity on this multiple consumer part which I was confusing myself with some other usecase.
Let me see it once again how we can fit creating multiple instance of same consumer for one subject.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if we don't need to have split at all? NATS supports multiple readers out-of-the-box already. So a NATS source just need to create multiple readers with the same config (same stream, same subject) and they will work in parallel without further config.

Copy link

@souravagrawal souravagrawal Jun 23, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @scottf @vu-hoang-phan-form3 It does make sense to ignore split if we do not want to use checkpointing, which is the current implementation, and we are not making use of the Split.
But to implement the checkpointing we need Split, it holds the data which is fetched from source and same data needs to be provided back to Flink during snapshot.
I have changed the code to implement checkpoint here

souravagrawal@edfe8e2

Please go through with it once, I plan to merge it in somrat's fork soon.

Comment on lines +86 to +87
boolean ackMessageFlag = (i == messages.size() - 1);
processMessage(output, message, ackMessageFlag);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a message is acknowledged but checkpointing fails, we lose the message.
I think message ack should be inside notifyCheckpointComplete method instead. Either that or inside snapshotState.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, ack should be done after checkpoint.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will resolve this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @vu-hoang-phan-form3, message is acknowledged in pollNext as we read, if for any reason checkpointing is failed after acknowledging the job for the message is anyways done, there is a chance of getting same message again if for some reason connector gets rebooted during message delivery and before it could ack the message, consumer will start redelivering message from where it stopped.
If you can help me understand how do we lose the message here if checkpointing is failed ?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vu-hoang-phan-form3 you mean the processed data for the message might get lost?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@shiv4289 yes
@sourabhaggrawal After processMessage here, the message is acknowledge and goes down the pipeline. Then if the next checkpoint fails, that same message will be lost.

There is indeed a chance of redelivering the same message twice if it's acknowledged in notifyCheckpointComplete. That method is best-effort anyways. But it's not a problem, deduplication can be implemented elsewhere. Plus, NATS is at-least-once delivery, so duplication can always happen.

scottf and others added 2 commits May 20, 2024 15:28
#14)

- Replaced DeserializationSchema with PayloadDeserializer in NatsJetStreamSource, NatsJetStreamSourceReader, and NatsJetStreamSourceBuilder.
- Updated NatsJetstreamSourceTest to use StringPayloadDeserializer instead of SimpleStringSchema.
- Introduced maxWait configuration in NatsConsumeOptions.
- Updated NatsJetStreamSourceBuilder and NatsJetStreamSourceReader to support configurable maxWait.
Refactored test cases to include maxWait configuration.
@somratdutta
Copy link
Author

somratdutta commented Jun 6, 2024

Hi @scottf,
I have made some changes as per the latest discussion.
You can see the commit here.
I also agree on your PR for supporting Serializable Consumer Configuration, I would refactor the Jetstream connector code once its merged.

Upcoming improvements:

  • add support for both Flink and Nats deserializers.
  • implement checkpointing in JetstreamReader as per discussion.
  • support for multiple subjects.

…ation for JSON serialization (#17)

* Refactor Jetstream Source classes to use SerializableConsumerConfiguration for JSON serialization (#16)

- Refactor source code to use SerializableConsumerConfiguration for JSON serialization
- Remove NatsConsumeOptions and update configurations in NatsJetStreamSource and related classes
- Fix method visibility in ExampleErrorListener to match interface requirements
- Implement missing methods in ExampleErrorListener and add appropriate logging
@somratdutta
Copy link
Author

somratdutta commented Jun 12, 2024

Updates in this commit :

  • Removed NatsConsumeOptions and updated configurations in NatsJetStreamSource and related classes:

    • Replaced NatsConsumeOptions with SerializableConsumerConfiguration for more efficient JSON serialization.
    • Updated NatsJetStreamSource, NatsJetStreamSourceBuilder, and related classes to use SerializableConsumerConfiguration.
    • Ensured proper handling of ConsumerConfiguration instances through SerializableConsumerConfiguration.
  • Fixed Method Visibility in ExampleErrorListener to Match Interface Requirements:

    • Adjusted method visibility in ExampleErrorListener to comply with the interface requirements.
    • Ensured all interface methods are properly implemented with the correct access modifiers.
  • Refactored NatsJetStreamSourceTest:

    • Modified test cases to use SerializableConsumerConfiguration instead of NatsConsumeOptions.
    • Ensured proper thread interruption handling and Flink environment cleanup.
    • Added explicit null checks and descriptive comments for better code readability.

Note :
This refactoring is done based on this PR which adds Serialisable wrappers. It hasn't been released yet, so the jnats version is bumped for next release.

@scottf

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

8 participants