From d5c05533cb3c068a26c5fd7393e3482d42b16878 Mon Sep 17 00:00:00 2001 From: Emre Sen Date: Fri, 24 May 2024 01:49:11 +0300 Subject: [PATCH] listener refactored --- .../src/main/resources/application.yaml | 11 +----- .../k8s-workshop-listener/build.gradle.kts | 7 ++-- .../k8s/workshop/listener/ListenerApp.java | 6 +-- .../messaging/SensorDataConsumer.java | 20 ++++++---- .../listener/props/MessagingProps.java | 34 ----------------- .../k8s/workshop/listener/util/Topic.java | 5 --- .../src/main/resources/application.yaml | 37 ++++++++----------- .../messaging/starter/BaseConsumer.java | 12 ++++-- 8 files changed, 42 insertions(+), 90 deletions(-) delete mode 100644 module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/props/MessagingProps.java delete mode 100644 module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/util/Topic.java diff --git a/module/app/k8s-workshop-data-generator/src/main/resources/application.yaml b/module/app/k8s-workshop-data-generator/src/main/resources/application.yaml index f9e84b8..adaa589 100644 --- a/module/app/k8s-workshop-data-generator/src/main/resources/application.yaml +++ b/module/app/k8s-workshop-data-generator/src/main/resources/application.yaml @@ -3,13 +3,6 @@ spring: active: dev kafka: bootstrap-servers: localhost:9092 - consumer: - group-id: group_id - key-deserializer: org.apache.kafka.common.serialization.StringDeserializer - value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer - properties: - schema.registry.url: http://localhost:8081 - specific.avro.reader: true producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer @@ -36,7 +29,7 @@ app: type: TEMP - name: location2 devices: - - name: device2 + - name: device2-1 sensors: - name: sensor2-2-1 type: HUMIDITY @@ -52,8 +45,6 @@ app: type: TEMP messaging: -# serializer: io.confluent.kafka.serializers.KafkaAvroSerializer -# scheme-registry-url: http://localhost:8081 topics: SENSOR_DATA: name: sensor-data diff --git a/module/app/k8s-workshop-listener/build.gradle.kts b/module/app/k8s-workshop-listener/build.gradle.kts index dda8925..42cda12 100644 --- a/module/app/k8s-workshop-listener/build.gradle.kts +++ b/module/app/k8s-workshop-listener/build.gradle.kts @@ -9,7 +9,7 @@ plugins { val avroVersion = "1.11.3" val confluentVersion = "7.6.1" val springCloudVersion = "2023.0.0" -val lombokVersion = "1.18.30" +val lombokVersion = "1.18.32" repositories { maven { @@ -25,11 +25,10 @@ dependencyManagement { dependencies { implementation("org.springframework.boot:spring-boot-starter") - implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka") implementation("org.apache.avro:avro:$avroVersion") - implementation("io.confluent:kafka-avro-serializer:$confluentVersion") - implementation("io.confluent:kafka-schema-registry-client:$confluentVersion") implementation(project(":module:lib:k8s-workshop-avro-schemes")) + implementation(project(":module:lib:k8s-workshop-messaging-spring-boot-starter")) + compileOnly("org.projectlombok:lombok:$lombokVersion") annotationProcessor("org.projectlombok:lombok:$lombokVersion") diff --git a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/ListenerApp.java b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/ListenerApp.java index e508eed..2fd8b2b 100644 --- a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/ListenerApp.java +++ b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/ListenerApp.java @@ -1,13 +1,11 @@ package com.maemresen.k8s.workshop.listener; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; import org.springframework.boot.WebApplicationType; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; +import org.springframework.context.annotation.ComponentScan; -@Slf4j -@RequiredArgsConstructor +@ComponentScan("com.maemresen") @SpringBootApplication public class ListenerApp { diff --git a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/messaging/SensorDataConsumer.java b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/messaging/SensorDataConsumer.java index 2f7d2f4..685c359 100644 --- a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/messaging/SensorDataConsumer.java +++ b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/messaging/SensorDataConsumer.java @@ -1,17 +1,23 @@ package com.maemresen.k8s.workshop.listener.messaging; +import com.maemresen.k8s.workshop.messaging.starter.BaseConsumer; +import com.maemresen.k8s.workshop.messaging.starter.MessagingProps; +import com.maemresen.k8s.workshop.messaging.starter.Topic; import com.maemresen.lib.message.dto.SensorData; -import java.util.function.Consumer; import lombok.extern.slf4j.Slf4j; -import org.springframework.messaging.Message; -import org.springframework.stereotype.Component; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.stereotype.Service; @Slf4j -@Component -public class SensorDataConsumer implements Consumer> { +@Service +public class SensorDataConsumer extends BaseConsumer { + + public SensorDataConsumer(final MessagingProps messagingProps) { + super(Topic.SENSOR_DATA, messagingProps); + } @Override - public void accept(final Message message) { - log.info("Received sensor data: {}", message); + protected void processMessage(final ConsumerRecord consumerRecord) { + log.info("Received sensor data with key {} and value {}", consumerRecord.key(), consumerRecord.value()); } } diff --git a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/props/MessagingProps.java b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/props/MessagingProps.java deleted file mode 100644 index f8207dd..0000000 --- a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/props/MessagingProps.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.maemresen.k8s.workshop.listener.props; - -import com.maemresen.k8s.workshop.listener.util.Topic; -import java.util.EnumMap; -import java.util.Map; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.NoArgsConstructor; -import lombok.Setter; -import org.springframework.boot.context.properties.ConfigurationProperties; -import org.springframework.context.annotation.Configuration; - -@Getter -@Setter -@NoArgsConstructor -@AllArgsConstructor -@ConfigurationProperties(prefix = "app.messaging") -@Configuration -public class MessagingProps { - - private EnumMap topics; - - public TopicProps getTopic(final Topic topic) { - return topics.getOrDefault(topic, null); - } - - public record TopicProps( - String outputBindingName, - String topicName, - String partitionKey, - Map extra) { - - } -} diff --git a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/util/Topic.java b/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/util/Topic.java deleted file mode 100644 index db96c42..0000000 --- a/module/app/k8s-workshop-listener/src/main/java/com/maemresen/k8s/workshop/listener/util/Topic.java +++ /dev/null @@ -1,5 +0,0 @@ -package com.maemresen.k8s.workshop.listener.util; - -public enum Topic { - SENSOR_DATA -} diff --git a/module/app/k8s-workshop-listener/src/main/resources/application.yaml b/module/app/k8s-workshop-listener/src/main/resources/application.yaml index d91dffe..9d2ee7a 100644 --- a/module/app/k8s-workshop-listener/src/main/resources/application.yaml +++ b/module/app/k8s-workshop-listener/src/main/resources/application.yaml @@ -1,30 +1,23 @@ spring: profiles: active: dev - cloud: - stream: - kafka: - binder: - brokers: localhost:9092 - auto-add-partitions: true - bindings: - sensorDataConsumer-in-0: - consumer: - configuration: - value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer - schema.registry.url: http://localhost:8081 - specific.avro.reader: true - - bindings: - sensorDataConsumer-in-0: - destination: ${app.messaging.topics.SENSOR_DATA.topic-name} - group: sensor-data-listener + kafka: + bootstrap-servers: localhost:9092 + consumer: + group-id: group_id + key-deserializer: org.apache.kafka.common.serialization.StringDeserializer + value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer + properties: + schema.registry.url: http://localhost:8081 + specific.avro.reader: true + producer: + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer + properties: + schema.registry.url: http://localhost:8081 app: messaging: - serializer: io.confluent.kafka.serializers.KafkaAvroSerializer - scheme-registry-url: http://localhost:8081 topics: SENSOR_DATA: - output-binding-name: sensorDataConsumer-in-0 - topic-name: sensor-data \ No newline at end of file + name: sensor-data \ No newline at end of file diff --git a/module/lib/k8s-workshop-messaging-spring-boot-starter/src/main/java/com/maemresen/k8s/workshop/messaging/starter/BaseConsumer.java b/module/lib/k8s-workshop-messaging-spring-boot-starter/src/main/java/com/maemresen/k8s/workshop/messaging/starter/BaseConsumer.java index baa9aa7..7e337fb 100644 --- a/module/lib/k8s-workshop-messaging-spring-boot-starter/src/main/java/com/maemresen/k8s/workshop/messaging/starter/BaseConsumer.java +++ b/module/lib/k8s-workshop-messaging-spring-boot-starter/src/main/java/com/maemresen/k8s/workshop/messaging/starter/BaseConsumer.java @@ -3,18 +3,22 @@ import lombok.RequiredArgsConstructor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.core.ConsumerFactory; @RequiredArgsConstructor public abstract class BaseConsumer { - private final String topic; - private final ConsumerFactory consumerFactory; + private final Topic topic; + private final MessagingProps messagingProps; - @KafkaListener(topics = "#{@baseConsumer.topic}", groupId = "${spring.kafka.consumer.group-id}") + @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "${spring.kafka.consumer.group-id}") public void consume(final ConsumerRecord consumerRecord) { processMessage(consumerRecord); } + public String getTopicName() { + final var topicProps = messagingProps.getTopic(topic); + return topicProps.name(); + } + protected abstract void processMessage(final ConsumerRecord consumerRecord); } \ No newline at end of file