Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SE Reactive Messaging #1636

Merged
merged 11 commits into from
May 28, 2020
Merged

SE Reactive Messaging #1636

merged 11 commits into from
May 28, 2020

Conversation

danielkec
Copy link
Contributor

@danielkec danielkec commented Apr 12, 2020

API

API is providing same functionality as MP Reactive Messaging does, number of method signatures is reduced to accommodate slightly different requirements of SE Helidon. For connectors is used same api as for MP connectors, which makes connectors reusable for both flawors.

Acknowledgement

Due to no magic approach implicit acknowledgement has been reduced only to methods providing unwrapped payload, as it wouldn't be possible to do manual ack without org.eclipse.microprofile.reactive.messaging.Message wrapper.

Channel<String> channel1 = Channel.create("channel1");

Messaging.builder()
                .publisher(channel1, Multi.just("foo", "bar").map(Message::of))
                // Every message is acked before each invocation of consumer
                .listener(channel1, s -> System.out.println(s))
                .build()
                .start();

Such messages are acked in preprocesses, right before unwrapping.

Kafka connector usage example:

 Channel<ConsumerRecord<String, String>> fromKafka = Channel.<ConsumerRecord<String, String>>builder()
                .name("from-kafka")
                .publisherConfig(KafkaConnector.configBuilder()
                        .bootstrapServers(KAFKA_SERVER)
                        .groupId("test-group")
                        .topic(TEST_SE_TOPIC_1, TEST_SE_TOPIC_2)
                        .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
                        .enableAutoCommit(false)
                        .keyDeserializer(StringDeserializer.class)
                        .valueDeserializer(StringDeserializer.class)
                        .build()
                )
                .build();
        KafkaConnector kafkaConnector = KafkaConnector.create(Config.empty());
        Messaging messaging = Messaging.builder()
                .connector(kafkaConnector)
                .listener(fromKafka, consumerRecord -> {
                    LOGGER.info("Kafka says: " + consumerRecord.value());
                })
                .build();
messaging.start();

Emitter usage example:

Channel<String> simpleChannel = Channel.create("simple-channel");

Emitter<String> emitter = Emitter.create(simpleChannel);

Messaging messaging = Messaging.builder()
        .emitter(emitter)
        .listener(simpleChannel, System.out::println)
        .build();

messaging.start();

emitter.send(Message.of("test1"));
emitter.send("test2");
                                                     
messaging.stop();

@danielkec danielkec added the messaging Reactive Messaging label Apr 12, 2020
@danielkec danielkec added this to the 2.0.0 milestone Apr 12, 2020
@danielkec danielkec self-assigned this Apr 12, 2020
@danielkec danielkec mentioned this pull request Apr 16, 2020
2 tasks
@paulparkinson
Copy link
Contributor

Looks good/consistent. Let's work on some extended TestMessage types/use cases to prove it out. Will contact you...

@danielkec danielkec changed the title [WIP]: SE Reactive Messaging SE Reactive Messaging May 8, 2020
@danielkec danielkec marked this pull request as ready for review May 8, 2020 06:47
@danielkec
Copy link
Contributor Author

/trigger

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Remove internal-test bundle

Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
@danielkec
Copy link
Contributor Author

Rebased and ready for review

@danielkec danielkec modified the milestones: 2.0.0, 2.0.0-RC1 May 26, 2020
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
Copy link
Member

@tomas-langer tomas-langer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@danielkec danielkec merged commit e33833e into helidon-io:master May 28, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
messaging Reactive Messaging
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants