Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Topic messages API. Reactive consumer #50

Merged
merged 4 commits into from
May 27, 2020

Conversation

apetrovs
Copy link
Contributor

No description provided.

@apetrovs apetrovs requested a review from germanosin May 21, 2020 11:02
private final KafkaCluster cluster;
private final String topic;

public void emit(FluxSink<ConsumerRecord<String, String>> sink) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll use Bytes,Bytes here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an early version of the backend for messages API and it supports only the simplest case. Will change that along with adding support of different message types.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Properties props = new Properties();
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "kafka-ui");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.getBootstrapServers());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be Bytes Serde

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

@apetrovs apetrovs May 27, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, but with BytesDeserializer. BytesSerde doesn't implement required interfaces.

@apetrovs apetrovs requested a review from germanosin May 26, 2020 10:54
}

private void assignPartitions(KafkaConsumer<String, String> consumer, String topic) {
List<TopicPartition> partitions = consumer.partitionsFor(topic).stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get number of partitions from internal state

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@apetrovs apetrovs requested a review from germanosin May 27, 2020 14:07
@apetrovs apetrovs merged commit 803f0be into master May 27, 2020
@apetrovs apetrovs deleted the feature/10-reactive-consumer-concept branch May 27, 2020 14:16
javalover123 pushed a commit to javalover123/kafka-ui that referenced this pull request Dec 7, 2022
* Topic messages API. Reactive consumer

* Topic messages API. Reactive consumer

* Review changes
@sonarcloud
Copy link

sonarcloud bot commented Aug 7, 2023

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

No Coverage information No Coverage information
No Duplication information No Duplication information

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants