Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplification Review 1 #917

Merged
merged 21 commits into from
May 31, 2023
Merged

Simplification Review 1 #917

merged 21 commits into from
May 31, 2023

Conversation

scottf
Copy link
Contributor

@scottf scottf commented May 23, 2023

No description provided.

@@ -2128,6 +2128,20 @@ void lenientFlushBuffer() {
}
}

@Override
public StreamContext streamContext(String streamName) throws IOException, JetStreamApiException {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When looking at an example, it would become:

streamContext = nc.streamContext(STREAM);
streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);

Would it be nice to name it to getStreamContext so it's consistent with getConsumerContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe, I'll ask the other team members. get is very java, but the existing Connection interface has methods like jetStream and jetStreamManagement so I wanted to stay consistent. Not sure why those where named like that to begin with, again they aren't very java like.

catch (Exception e) {
e.printStackTrace();
catch (RuntimeException e) {
// Synchronous pull calls, including raw calls, fetch, iterate and reader
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really like these docs! Makes it really clear just looking at the examples 👍

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the old pull examples so make sure people see the JetStreamStatusException runtime exception

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the new examples.

  • ConsumeManuallyCallNext.java
  • ConsumeWithHandler.java
  • FetchExample.java
  • NextExample.java

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the examples are very clear. Don't have many/any comments on the examples themselves, more so on the API.

* Messages do not immediately stop
* @throws InterruptedException if one is thrown, in order to propagate it up
*/
void stop() throws InterruptedException;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There used to be a drain here. What would be the alternative approach to block and wait for the stop/drain to complete?

Copy link
Contributor Author

@scottf scottf May 24, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had originally called that drain, but stop was the choice of the design team, trying to distinguished it from the existing model.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at NatsConnection there is still a drain there as well, could maybe be a point for consistency.

Am mostly concerned with being able to know when drain/stop finishes though, to be able to run some other code after a consumer has drained/stopped for example.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a hasPending method to the SimpleConsumer

/**
 * Whether the Simple Consumer has pending messages to be sent from the server.
 * This does not indicate that messages are coming, just that the server has not fulfilled
 * the current request. Mostly helpful after stop has been called to check if any more messages
 * are coming
 * @return the pending state
 */
boolean hasPending();

Copy link
Contributor Author

@scottf scottf May 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I read that explanation, it's not quite right. The intention is for the dev to be able to know when there are no more messages coming after stop has been called, mostly for async consumers (those with handlers) as they won't get a null or terminal message.

StreamContext streamContext;
ConsumerContext consumerContext;
try {
streamContext = nc.streamContext(STREAM);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at this example more I'm wondering why it's called on nc and not js.

Would having both under js be preferable, or is there a specific reason to have it under nc?

js.getStreamContext(STREAM);
js.getConsumerContext(STREAM, consumerName);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There actually is a getStreamContext and getConsumerContext off of js. I probably should make those consistent (remove the get).
Hopefully I'll get more feedback here.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To me it would sound logical to have to convert the nc: NatsConnection to nc.jetStream() first, so you are explicitly in the JetStream context. And only then having access to the js.getStreamContext / js.getConsumerContext.

That is, if you'd still want to make the distinction though. Since it would mean that the simplification is two calls away instead of just one (nc.jetStream().getStreamContext() vs nc.streamContext()). I would consider having it accessible more quickly to be valuable as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would probably lean towards just having js.getStreamContext() (or js.streamContext()) instead of having those methods on both NatsConnection and JetStream. Having it one additional call away does not really make it worse IMO and gives you clean separation - NatsConnction should not be handling streams, consumers etc.

try {
streamContext = nc.streamContext(STREAM);
streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Expected the returned value of streamContext.addConsumer to be a ConsumerContext.

Makes sense it's still a ConsumerInfo, but I would probably like to be able to do something like this:
(not thinking about errors for this example)

ConsumerContext consumerContext = streamContext.getConsumerContext(CONSUMER_NAME);
if (consumerContext == null) {
    consumerContext = streamContext.addConsumer(ConsumerConfiguration.builder().durable(CONSUMER_NAME).build());
}

That would allow for chaining these commands as well:

ConsumerContext consumerContext = js.getStreamContext(STREAM).addConsumer(...);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's a good point, I wrestled with that. I'll check and see what the other client are doing.

Copy link
Contributor

@piotrpio piotrpio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some questions and some nits I came across.

In general the API looks really good!

/**
* Management function to creates a consumer on this stream.
* @param config the consumer configuration to use.
* @return consumer information.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
* @return consumer information.
* @return a ConsumerContext object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

@@ -0,0 +1,18 @@
// Copyright 2021 The NATS Authors
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Copyright 2021 The NATS Authors
// Copyright 2023 The NATS Authors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
*/
StreamContext streamContext(String streamName) throws IOException, JetStreamApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure it's valuable to have stream/consumer methods on Connection - they seem to just clutter the interface, or is there any clear benefit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@piotrpio My thought was I don't need a JetStreamContext unless I'm publishing, so it's less steps to get it from the connection, so hanging off connection is my first choice and hanging off JetStream is the second choice. Especially since StreamContext has a lot of the same stuff as JetStreamManagement, it made sense to have it at connection level.
But I was never really settled on it either way, so I guess let's continue to think about this.

* @throws IOException covers various communication issues with the NATS
* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerInfo getConsumerInfo() throws IOException, JetStreamApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be valuable to provide e.g. getCachedConsumerInfo() to be able to get to the consumer info returned when executing consumerContext() or addConsumer() without an extra call to the server?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a getCachedConsumerInfo that returns the last one that was read from the server. I added to the doc that technically it could be out of date.

@@ -13,6 +13,30 @@

package io.nats.client;

import java.time.Duration;

public interface ManualConsumer extends SimpleConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to have some general docs on the whole interface. I am not a huge fan of the ManualConsumer name to be honest - just an idea of the top of my head, but maybe something like IterableConsumer or even have consumer implement Iterator?

Not strongly against the current naming, I just find it a little bit awkward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'm did not care for ManualConsumer either. It can't just be Iterator, because since it's endless, I need a way to stop (which is in the superclass) and since this next has a timeout, I can't just make it also implement Java Iterator since it's next does not have a timeout. IterableConsumer is good but not my favorite since Iterator is a well known pattern in java.

I don't even know if Consumer is the best here. For instance the SimpleConsumer is really just a handle/holder/context to allow the dev to stop the consuming since messages are being automatically pushed to the handler they provide. The ManualConsumer is just that plus the next methods.

Naming is hard. Just going to go with IterableConsumer for now

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, naming is hard. In go client, for endless iteration, in addition to having consumer.Consume() - which takes a callback - I added a consumer.Messages() method which works pretty much like ManualConsumer:

	it, err := cons.Messages()
	if err != nil {
		log.Fatal(err)
	}
	for {
		msg, err := it.Next()
		if err != nil {
			fmt.Println("next err: ", err)
		}
		msg.Ack()
	}

* server such as timeout or interruption
* @throws JetStreamApiException the request had an error related to the data
*/
ConsumerContext addConsumer(ConsumerConfiguration config) throws IOException, JetStreamApiException;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just FYI - not sure which ack policy is default right now, but we recently agreed that at least for the js simplification part (to avoid breaking changes in the old implementation) we should default to AckExplicit). I mention this because go client uses AckNone as default in old implementation and I just changed it for simplified API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Java/.NET, when you are creating a ConsumerConfiguration through code, Explicit is the default but of course can be set/changed by the developer. I'm not sure if I understand the comment, this method is accepting the ConsumerConfiguration that the user has supplied, so I can only assume that it's configured the way they want it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I just put the comment here because I was curious about the default (having in mind the recent issue in go client) - if it's explicit, that's correct

Copy link
Member

@aricart aricart left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@scottf scottf merged commit 6f43db6 into main May 31, 2023
2 checks passed
@scottf scottf deleted the simplification-review-1 branch May 31, 2023 19:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants