Skip to content

Commit

Permalink
listener refactored
Browse files Browse the repository at this point in the history
  • Loading branch information
maemresen committed May 23, 2024
1 parent 73a85c1 commit d5c0553
Show file tree
Hide file tree
Showing 8 changed files with 42 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,6 @@ spring:
active: dev
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
Expand All @@ -36,7 +29,7 @@ app:
type: TEMP
- name: location2
devices:
- name: device2
- name: device2-1
sensors:
- name: sensor2-2-1
type: HUMIDITY
Expand All @@ -52,8 +45,6 @@ app:
type: TEMP

messaging:
# serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
# scheme-registry-url: http://localhost:8081
topics:
SENSOR_DATA:
name: sensor-data
Expand Down
7 changes: 3 additions & 4 deletions module/app/k8s-workshop-listener/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ plugins {
val avroVersion = "1.11.3"
val confluentVersion = "7.6.1"
val springCloudVersion = "2023.0.0"
val lombokVersion = "1.18.30"
val lombokVersion = "1.18.32"

repositories {
maven {
Expand All @@ -25,11 +25,10 @@ dependencyManagement {

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.apache.avro:avro:$avroVersion")
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
implementation("io.confluent:kafka-schema-registry-client:$confluentVersion")
implementation(project(":module:lib:k8s-workshop-avro-schemes"))
implementation(project(":module:lib:k8s-workshop-messaging-spring-boot-starter"))


compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package com.maemresen.k8s.workshop.listener;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.ComponentScan;

@Slf4j
@RequiredArgsConstructor
@ComponentScan("com.maemresen")
@SpringBootApplication
public class ListenerApp {

Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
package com.maemresen.k8s.workshop.listener.messaging;

import com.maemresen.k8s.workshop.messaging.starter.BaseConsumer;
import com.maemresen.k8s.workshop.messaging.starter.MessagingProps;
import com.maemresen.k8s.workshop.messaging.starter.Topic;
import com.maemresen.lib.message.dto.SensorData;
import java.util.function.Consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Service;

@Slf4j
@Component
public class SensorDataConsumer implements Consumer<Message<SensorData>> {
@Service
public class SensorDataConsumer extends BaseConsumer<String, SensorData> {

public SensorDataConsumer(final MessagingProps messagingProps) {
super(Topic.SENSOR_DATA, messagingProps);
}

@Override
public void accept(final Message<SensorData> message) {
log.info("Received sensor data: {}", message);
protected void processMessage(final ConsumerRecord<String, SensorData> consumerRecord) {
log.info("Received sensor data with key {} and value {}", consumerRecord.key(), consumerRecord.value());
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,30 +1,23 @@
spring:
profiles:
active: dev
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
auto-add-partitions: true
bindings:
sensorDataConsumer-in-0:
consumer:
configuration:
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true

bindings:
sensorDataConsumer-in-0:
destination: ${app.messaging.topics.SENSOR_DATA.topic-name}
group: sensor-data-listener
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081

app:
messaging:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
scheme-registry-url: http://localhost:8081
topics:
SENSOR_DATA:
output-binding-name: sensorDataConsumer-in-0
topic-name: sensor-data
name: sensor-data
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,22 @@
import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;

@RequiredArgsConstructor
public abstract class BaseConsumer<K, V> {

private final String topic;
private final ConsumerFactory<K, V> consumerFactory;
private final Topic topic;
private final MessagingProps messagingProps;

@KafkaListener(topics = "#{@baseConsumer.topic}", groupId = "${spring.kafka.consumer.group-id}")
@KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(final ConsumerRecord<K, V> consumerRecord) {
processMessage(consumerRecord);
}

public String getTopicName() {
final var topicProps = messagingProps.getTopic(topic);
return topicProps.name();
}

protected abstract void processMessage(final ConsumerRecord<K, V> consumerRecord);
}

0 comments on commit d5c0553

Please sign in to comment.