diff --git a/src/main/java/otter/jet/reader/ReaderConfiguration.java b/src/main/java/otter/jet/reader/ReaderConfiguration.java index 5f59ab6..bcf8e19 100644 --- a/src/main/java/otter/jet/reader/ReaderConfiguration.java +++ b/src/main/java/otter/jet/reader/ReaderConfiguration.java @@ -6,6 +6,9 @@ import org.springframework.context.annotation.Configuration; import otter.jet.store.MessageStore; +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; + @Configuration @EnableConfigurationProperties(ReaderConfigurationProperties.class) class ReaderConfiguration { @@ -20,13 +23,24 @@ class ReaderConfiguration { public ReaderService readerService( @Value("${nats.server.host}") String natsServerHost, @Value("${nats.server.port}") String natsServerPort, + @Value("${read.beginTimestamp:}") String startDate, MessageDeserializer messageDeserializer, MessageStore messageStore) { return new ReaderService( createNatsServerUrl(natsServerHost, natsServerPort), messageDeserializer, readerConfigurationProperties.getSubject(), - messageStore); + messageStore, + resolveBeginTimestamp(startDate)); + } + + private static LocalDateTime resolveBeginTimestamp(String startDate) { + if (!startDate.isBlank()) { + DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); + return LocalDateTime.parse(startDate, formatter); + } else { + return LocalDateTime.MIN; + } } private String createNatsServerUrl(String natsServerHost, String natsServerPort) { diff --git a/src/main/java/otter/jet/reader/ReaderService.java b/src/main/java/otter/jet/reader/ReaderService.java index 309e1f6..39bdf2d 100644 --- a/src/main/java/otter/jet/reader/ReaderService.java +++ b/src/main/java/otter/jet/reader/ReaderService.java @@ -9,6 +9,7 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.time.LocalDateTime; import java.util.concurrent.Executor; import java.util.concurrent.Executors; @@ -27,17 +28,20 @@ public class ReaderService { private final MessageDeserializer messageDeserializer; private final String subject; private final MessageStore messageStore; + private final LocalDateTime beginTimestamp; private final Executor executorService = Executors.newSingleThreadExecutor(); public ReaderService(String natsServerUrl, MessageDeserializer messageDeserializer, String subject, - MessageStore messageStore) { + MessageStore messageStore, + LocalDateTime beginTimestamp) { this.natsServerUrl = natsServerUrl; this.messageDeserializer = messageDeserializer; this.subject = subject; this.messageStore = messageStore; + this.beginTimestamp = beginTimestamp; } @EventListener(ApplicationReadyEvent.class) @@ -93,21 +97,35 @@ private void continuouslyReadMessages( Message message = subscription.nextMessage(100); // Print the message if (message != null) { - try { - DeserializedMessage deserializedMessage = - messageDeserializer.deserializeMessage(ByteBuffer.wrap(message.getData())); - ReadMessage msg = - new ReadMessage( - message.getSubject(), - deserializedMessage.name(), - deserializedMessage.content(), - message.metaData().timestamp().toLocalDateTime()); - messageStore.add(msg); + LocalDateTime messageTimestamp = message + .metaData() + .timestamp() + .toLocalDateTime(); + + if (messageTimestamp.isAfter(beginTimestamp)) { + deserializeMessage(messageDeserializer, message, messageTimestamp); + } else { + LOG.warn("Timestamp from message {}, smaller then begin timestamp {}, message {} will not be process", messageTimestamp, beginTimestamp, message.getSID()); message.ack(); - } catch (Exception e) { - LOG.warn("Unable to deserialize message", e); } } } } + + private void deserializeMessage(MessageDeserializer messageDeserializer, Message message, LocalDateTime timestamp) { + try { + DeserializedMessage deserializedMessage = + messageDeserializer.deserializeMessage(ByteBuffer.wrap(message.getData())); + ReadMessage msg = + new ReadMessage( + message.getSubject(), + deserializedMessage.name(), + deserializedMessage.content(), + timestamp); + messageStore.add(msg); + message.ack(); + } catch (Exception e) { + LOG.warn("Unable to deserialize message", e); + } + } } diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a3aefe8..3e15c3b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,5 +1,6 @@ read: mode: "proto" + beginTimestamp: "2024-06-10 10:21:20" # String in format yyyy-MM-dd HH:mm:ss empty for all messages proto: pathToDescriptor: "path_to_descriptor" subject: "*" diff --git a/src/test/java/otter/jet/proto/AnyProtoMessageReaderTest.java b/src/test/java/otter/jet/proto/AnyProtoMessageReaderTest.java index e42e751..5f3ee49 100644 --- a/src/test/java/otter/jet/proto/AnyProtoMessageReaderTest.java +++ b/src/test/java/otter/jet/proto/AnyProtoMessageReaderTest.java @@ -28,7 +28,8 @@ properties = { "read.mode=proto", "read.subject=any_person", - "read.proto.pathToDescriptor=src/test/resources/person.desc" + "read.proto.pathToDescriptor=src/test/resources/person.desc", + "read.beginTimestamp=" }) class AnyProtoMessageReaderTest extends AbstractIntegrationTest { diff --git a/src/test/java/otter/jet/proto/SimpleProtoMessageReaderTest.java b/src/test/java/otter/jet/proto/SimpleProtoMessageReaderTest.java index ec6d3a3..0927859 100644 --- a/src/test/java/otter/jet/proto/SimpleProtoMessageReaderTest.java +++ b/src/test/java/otter/jet/proto/SimpleProtoMessageReaderTest.java @@ -28,7 +28,8 @@ "read.mode=proto", "read.subject=typed_person", "read.proto.messageTypeName=protobuf.Person", - "read.proto.pathToDescriptor=src/test/resources/person.desc" + "read.proto.pathToDescriptor=src/test/resources/person.desc", + "read.beginTimestamp=" }) class SimpleProtoMessageReaderTest extends AbstractIntegrationTest { diff --git a/src/test/resources/application-local.yml b/src/test/resources/application-local.yml index 3c3b48b..adf5b63 100644 --- a/src/test/resources/application-local.yml +++ b/src/test/resources/application-local.yml @@ -1,5 +1,6 @@ read: mode: "proto" + beginTimestamp: "" proto: pathToDescriptor: "src/test/resources/person.desc" subject: "*"