Skip to content

Commit

Permalink
spring-cloud stream config removed and used simple spring-kafka for d…
Browse files Browse the repository at this point in the history
…ata-generator
  • Loading branch information
maemresen committed May 23, 2024
1 parent ff65084 commit 73a85c1
Show file tree
Hide file tree
Showing 14 changed files with 164 additions and 87 deletions.
24 changes: 24 additions & 0 deletions .run/Assemble.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="Assemble" type="GradleRunConfiguration" factoryName="Gradle" folderName="00-build">
<ExternalSystemSettings>
<option name="executionName" />
<option name="externalProjectPath" value="$PROJECT_DIR$" />
<option name="externalSystemIdString" value="GRADLE" />
<option name="scriptParameters" value="" />
<option name="taskDescriptions">
<list />
</option>
<option name="taskNames">
<list>
<option value="assemble" />
</list>
</option>
<option name="vmOptions" />
</ExternalSystemSettings>
<ExternalSystemDebugServerProcess>true</ExternalSystemDebugServerProcess>
<ExternalSystemReattachDebugProcess>true</ExternalSystemReattachDebugProcess>
<DebugAllEnabled>false</DebugAllEnabled>
<RunAsTest>false</RunAsTest>
<method v="2" />
</configuration>
</component>
12 changes: 10 additions & 2 deletions module/app/k8s-workshop-data-generator/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ plugins {
val avroVersion = "1.11.3"
val confluentVersion = "7.6.1"
val springCloudVersion = "2023.0.0"
val lombokVersion = "1.18.30"
val lombokVersion = "1.18.32"
val testcontainersVersion = "1.19.8"
val commonsCodecVersion = "1.17.0"

tasks {
withType<JacocoReport> {
Expand Down Expand Up @@ -37,7 +39,6 @@ dependencyManagement {

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.apache.avro:avro:$avroVersion")
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
implementation("io.confluent:kafka-schema-registry-client:$confluentVersion")
Expand All @@ -48,6 +49,13 @@ dependencies {
testImplementation("org.junit.jupiter:junit-jupiter-params")
testImplementation("org.assertj:assertj-core")

integrationTestImplementation("org.springframework.boot:spring-boot-testcontainers")
integrationTestImplementation("org.testcontainers:testcontainers-bom:$testcontainersVersion")
integrationTestImplementation("org.testcontainers:testcontainers")
integrationTestImplementation("org.testcontainers:junit-jupiter")
integrationTestImplementation("org.testcontainers:kafka")
integrationTestImplementation("commons-codec:commons-codec:$commonsCodecVersion")

compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
testCompileOnly("org.projectlombok:lombok:$lombokVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.maemresen.k8s.workshop.data.generator;

import com.maemresen.k8s.workshop.data.generator.task.DataGeneratorTask;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;

@Testcontainers
@SpringBootTest
class DataGeneratorTaskIT {

@Autowired
private DataGeneratorTask dataGeneratorTask;

@Container
private KafkaContainer kafkaContainer =
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.6.1"));


@Test
void test1() {
dataGeneratorTask.produceSensorData();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.maemresen.k8s.workshop.data.generator;

import org.springframework.boot.test.context.TestConfiguration;

@TestConfiguration
public class KafkaTestConfig {
}
Original file line number Diff line number Diff line change
@@ -1,26 +1,16 @@
package com.maemresen.k8s.workshop.data.generator.messaging;

import com.maemresen.k8s.workshop.messaging.starter.BaseMessageProducer;
import com.maemresen.k8s.workshop.messaging.starter.BaseProducer;
import com.maemresen.k8s.workshop.messaging.starter.MessagingProps;
import com.maemresen.k8s.workshop.messaging.starter.Topic;
import com.maemresen.lib.message.dto.SensorData;
import org.springframework.cloud.function.cloudevent.CloudEventMessageBuilder;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.stereotype.Component;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;

@Component
public class SensorDataProducer extends BaseMessageProducer<SensorData> {
@Service
public class SensorDataProducer extends BaseProducer<String, SensorData> {

protected SensorDataProducer(
final MessagingProps messagingProps,
final StreamBridge streamBridge) {
super(messagingProps, streamBridge, Topic.SENSOR_DATA);
}

@Override
protected CloudEventMessageBuilder<SensorData> messageCustomizer(
final SensorData payload,
final CloudEventMessageBuilder<SensorData> builder) {
return builder.setHeader(topicProps.partitionKey(), payload.getDevice().getLocation());
}
public SensorDataProducer(final MessagingProps messagingProps, final KafkaTemplate<String, SensorData> kafkaTemplate) {
super(Topic.SENSOR_DATA, messagingProps, kafkaTemplate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
public class DataGeneratorTask {

private final GeneratorSensorDataProps dataGeneratorProps;
private final SensorDataProducer sensorDataProducer;
private final SensorDataGeneratorService sensorDataGeneratorService;
private final SensorDataProducer sensorDataProducer;

@Scheduled(cron = "0/2 * * * * ?")
public void produceSensorData() {
Expand All @@ -29,7 +29,7 @@ public void produceSensorData() {
device.name(),
sensor.name(),
sensor.type());
sensorDataProducer.publish(sensorData);
sensorDataProducer.sendMessage(device.name(), sensorData);
})));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,26 +1,20 @@
spring:
profiles:
active: dev
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
sensorDataProducer-out-0:
producer:
configuration:
value.serializer: ${app.messaging.serializer}
schema.registry.url: ${app.messaging.scheme-registry-url}

bindings:
sensorDataProducer-out-0:
destination: ${app.messaging.topics.SENSOR_DATA.topic-name}
producer:
partition-key-expression: headers['${app.messaging.topics.SENSOR_DATA.partition-key}']
partition-count: 3
use-native-encoding: true

kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: group_id
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
schema.registry.url: http://localhost:8081
app:
generator:
constraint:
Expand Down Expand Up @@ -58,10 +52,9 @@ app:
type: TEMP

messaging:
serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
scheme-registry-url: http://localhost:8081
# serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
# scheme-registry-url: http://localhost:8081
topics:
SENSOR_DATA:
output-binding-name: sensorDataProducer-out-0
topic-name: sensor-data
name: sensor-data
partition-key: partitionKey
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ dependencyManagement {
}

dependencies {
implementation("org.springframework.boot:spring-boot-starter")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("io.confluent:kafka-avro-serializer:$confluentVersion")
implementation("io.confluent:kafka-schema-registry-client:$confluentVersion")
api("org.springframework.boot:spring-boot-starter")
api("org.springframework.kafka:spring-kafka")
api("io.confluent:kafka-avro-serializer:$confluentVersion")
api("io.confluent:kafka-schema-registry-client:$confluentVersion")

compileOnly("org.projectlombok:lombok:$lombokVersion")
annotationProcessor("org.projectlombok:lombok:$lombokVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.maemresen.k8s.workshop.messaging.starter;

import lombok.RequiredArgsConstructor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.ConsumerFactory;

@RequiredArgsConstructor
public abstract class BaseConsumer<K, V> {

private final String topic;
private final ConsumerFactory<K, V> consumerFactory;

@KafkaListener(topics = "#{@baseConsumer.topic}", groupId = "${spring.kafka.consumer.group-id}")
public void consume(final ConsumerRecord<K, V> consumerRecord) {
processMessage(consumerRecord);
}

protected abstract void processMessage(final ConsumerRecord<K, V> consumerRecord);
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.maemresen.k8s.workshop.messaging.starter;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;

@RequiredArgsConstructor
@Slf4j
public class BaseProducer<K, V> {
private final Topic topic;
private final MessagingProps messagingProps;
private final KafkaTemplate<K, V> kafkaTemplate;

public void sendMessage(final K key, final V value) {
try {
final var topicProps = messagingProps.getTopic(topic);
final var topicName = topicProps.name();
kafkaTemplate.send(topicName, key, value);
log.info("Producing message to topic {} with key {} and value {}", topicName, key, value);
} catch (Exception e) {
log.error("Failed to send message to topic {}", topic, e);
throw new MessagingException("Failed to send message to topic " + topic, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package com.maemresen.k8s.workshop.messaging.starter;


import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;

@EnableKafka
@Configuration
public class KafkaConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.maemresen.k8s.workshop.messaging.starter;

public class MessagingException extends RuntimeException {

public MessagingException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ public TopicProps getTopic(final Topic topic) {
}

public record TopicProps(
String outputBindingName,
String topicName,
String name,
String partitionKey,
Map<String, String> extra) {

Expand Down

0 comments on commit 73a85c1

Please sign in to comment.