Skip to content

Commit

Permalink
Merge pull request #884 from nats-io/simplification-3
Browse files Browse the repository at this point in the history
Simplification pre-experimental-release
  • Loading branch information
arondi committed Apr 10, 2023
2 parents 8bb1f66 + a8f758c commit 463a167
Show file tree
Hide file tree
Showing 17 changed files with 317 additions and 57 deletions.
Original file line number Diff line number Diff line change
@@ -1,12 +1,29 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.jetstream.simple;

import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;

import java.time.Duration;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.setupStream;

/**
* This example will demonstrate simplified manual consume
* This example will demonstrate simplified manual consume.
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class ConsumeManuallyCallNext {
private static final String STREAM = "simple-stream";
Expand All @@ -25,7 +42,7 @@ public static void main(String[] args) {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

Utils.setupStream(jsm, STREAM, SUBJECT);
setupStream(jsm, STREAM, SUBJECT);

String name = "simple-consumer-" + NUID.nextGlobal();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.jetstream.simple;

import io.nats.client.*;
Expand All @@ -7,8 +20,12 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.setupStream;

/**
* This example will demonstrate simplified fetch
* This example will demonstrate simplified consume with a handler
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class ConsumeWithHandler {
private static final String STREAM = "simple-stream";
Expand All @@ -27,7 +44,7 @@ public static void main(String[] args) {
JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

Utils.setupStream(jsm, STREAM, SUBJECT);
setupStream(jsm, STREAM, SUBJECT);

String name = "simple-consumer-" + NUID.nextGlobal();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.jetstream.simple;

import io.nats.client.*;
Expand All @@ -7,6 +20,7 @@

/**
* This example will demonstrate simplified fetch
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class FetchExample {
private static final String STREAM = "simple-stream";
Expand Down
36 changes: 0 additions & 36 deletions src/examples/java/io/nats/examples/jetstream/simple/Publisher.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.jetstream.simple;

import io.nats.client.*;
import io.nats.client.api.ConsumerConfiguration;

import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import static io.nats.examples.jetstream.simple.Utils.Publisher;
import static io.nats.examples.jetstream.simple.Utils.setupStream;

/**
* This example will demonstrate all 3 simplified consumes running at the same time.
* It just runs forever until you manually stop it.
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class ThreeDifferentConsumers {
private static final String STREAM = "simple-stream";
private static final String SUBJECT = "simple-subject";
private static final int REPORT_EVERY = 100;
private static final int JITTER = 30;

// change this is you need to...
public static String SERVER = "nats://localhost:4222";

public static void main(String[] args) {
Options options = Options.builder().server(SERVER).build();
try (Connection nc = Nats.connect(options)) {

JetStreamManagement jsm = nc.jetStreamManagement();
JetStream js = nc.jetStream();

setupStream(jsm, STREAM, SUBJECT);

String name1 = "next-" + NUID.nextGlobal();
String name2 = "handle-" + NUID.nextGlobal();
String name3 = "fetch-" + NUID.nextGlobal();

jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name1).build());
jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name2).build());
jsm.addOrUpdateConsumer(STREAM, ConsumerConfiguration.builder().durable(name3).build());

// Consumer[Context]
ConsumerContext ctx1 = js.getConsumerContext(STREAM, name1);
ConsumerContext ctx2 = js.getConsumerContext(STREAM, name2);
ConsumerContext ctx3 = js.getConsumerContext(STREAM, name3);

// create the consumer then use it
ManualConsumer con1 = ctx1.consume();

Thread con1Thread = new Thread(() -> {
long mark = System.currentTimeMillis();
int count = 0;
long report = randomReportInterval();
try {
while (true) {
Message msg = con1.nextMessage(1000);
if (msg != null) {
msg.ack();
++count;
if (System.currentTimeMillis() - mark > report) {
System.out.println("Manual " + count + " messages.");
mark = System.currentTimeMillis();
report = randomReportInterval();
}
}
}
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
con1Thread.start();

Publisher publisher = new Publisher(js, SUBJECT, JITTER);
Thread pubThread = new Thread(publisher);
pubThread.start();

Thread.sleep(1000); // just makes the consumers be reading different messages
AtomicInteger atomicCount = new AtomicInteger();
AtomicLong atomicMark = new AtomicLong(System.currentTimeMillis());
AtomicLong atomicReport = new AtomicLong(randomReportInterval());

MessageHandler handler = msg -> {
msg.ack();
int count = atomicCount.incrementAndGet();
if (System.currentTimeMillis() - atomicMark.get() > atomicReport.get()) {
System.out.println("Handled " + count + " messages.");
atomicMark.set(System.currentTimeMillis());
atomicReport.set(randomReportInterval());
}
};
SimpleConsumer con2 = ctx2.consume(handler);

Thread.sleep(1000); // just makes the consumers be reading different messages
Thread con2Thread = new Thread(() -> {
int count = 0;
long mark = System.currentTimeMillis();
long report = randomReportInterval();
try {
while (true) {
FetchConsumer fc = ctx3.fetch(REPORT_EVERY);
Message msg = fc.nextMessage();
while (msg != null) {
msg.ack();
++count;
if (System.currentTimeMillis() - mark > report) {
System.out.println("Fetched " + count + " messages.");
mark = System.currentTimeMillis();
report = randomReportInterval();
}
msg = fc.nextMessage();
}
}
}
catch (Exception e) {
throw new RuntimeException(e);
}
});
con2Thread.start();

con2Thread.join(); // never ends so program runs until stopped.
}
catch (Exception e) {
e.printStackTrace();
}
}

private static long randomReportInterval() {
return 1000 + ThreadLocalRandom.current().nextLong(1000);
}
}
45 changes: 45 additions & 0 deletions src/examples/java/io/nats/examples/jetstream/simple/Utils.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
// Copyright 2020-2023 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at:
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package io.nats.examples.jetstream.simple;

import io.nats.client.Connection;
Expand All @@ -8,6 +21,8 @@
import io.nats.client.api.StorageType;

import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;

import static io.nats.examples.jetstream.NatsJsUtils.createOrReplaceStream;

Expand Down Expand Up @@ -63,4 +78,34 @@ public static void setupConsumer(JetStreamManagement jsm, String stream, String
.build();
jsm.addOrUpdateConsumer(stream, cc);
}

public static class Publisher implements Runnable {
private final JetStream js;
private final String subject;
private final int jitter;
private final AtomicBoolean keepGoing = new AtomicBoolean(true);
private int pubNo;

public Publisher(JetStream js, String subject, int jitter) {
this.js = js;
this.subject = subject;
this.jitter = jitter;
}

public void stop() {
keepGoing.set(false);
}

@Override
public void run() {
try {
while (keepGoing.get()) {
Thread.sleep(ThreadLocalRandom.current().nextLong(jitter));
js.publish(subject, ("simple-message-" + (++pubNo)).getBytes());
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/BaseConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
/**
* Base Consume Options are provided to customize the way the
* consume and fetch operate. It is the base class for FetchConsumeOptions
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class BaseConsumeOptions {
public static final int DEFAULT_MESSAGE_COUNT = 100;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/ConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

/**
* Consume Options are provided to customize the consume operation.
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class ConsumeOptions extends BaseConsumeOptions {
public static ConsumeOptions DEFAULT_CONSUME_OPTIONS = ConsumeOptions.builder().build();
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/ConsumerContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import java.io.IOException;

/**
* TODO
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public interface ConsumerContext {
String getName();
Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/nats/client/FetchConsumeOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

/**
* Fetch Consume Options are provided to customize the fetch operation.
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public class FetchConsumeOptions extends BaseConsumeOptions {
private FetchConsumeOptions(Builder b) {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/io/nats/client/FetchConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@

package io.nats.client;

/**
* A fetch consumer gets messages by calling nextMessage.
* nextMessage returns null when there are no more messages
* SIMPLIFICATION IS EXPERIMENTAL AND SUBJECT TO CHANGE
*/
public interface FetchConsumer extends SimpleConsumer {
Message nextMessage() throws InterruptedException;
}
Loading

0 comments on commit 463a167

Please sign in to comment.