Skip to content

Commit

Permalink
Feature/79 auto register event classes (#80)
Browse files Browse the repository at this point in the history
* Add @event annotation

* Add  EventClassRegister

* Add @event annotations on event DTO classes

* Combine KafkaConfigurations for proucer and consumer

* Update docstrings
  • Loading branch information
ChromaChroma authored Jul 28, 2022
1 parent 3364676 commit 416930d
Show file tree
Hide file tree
Showing 46 changed files with 270 additions and 295 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.application.bill.event;

import com.tungstun.common.messaging.Event;

@Event
public record BillCreated(
Long id,
Long barId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.application.bill.event;

import com.tungstun.common.messaging.Event;

@Event
public record BillDeleted(
Long id,
Long barId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.application.bill.event;

import com.tungstun.common.messaging.Event;

@Event
public record BillPayed(
Long id,
Long barId,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.tungstun.bill.port.messaging.config;


import com.tungstun.common.messaging.KafkaConfigBase;
import com.tungstun.common.messaging.KafkaMessageProducer;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.listener.CommonLoggingErrorHandler;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
public class KafkaConfig extends KafkaConfigBase {
private static final String TOPIC = "bill";

@Bean
public NewTopic bill() {
return new NewTopic(TOPIC, 1, (short) 1);
}

@Bean
public KafkaMessageProducer kafkaMessageProducer() {
return createMessageProducer(TOPIC);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(new CommonLoggingErrorHandler());
factory.setConsumerFactory(defaultConsumerFactory());
return factory;
}
}

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.person.message;

import com.tungstun.common.messaging.Event;

@Event
public record PersonCreated(
Long id,
Long barId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.person.message;

import com.tungstun.common.messaging.Event;

@Event
public record PersonDeleted(
Long id) {
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.person.message;

import com.tungstun.common.messaging.Event;

@Event
public record PersonUpdated(
Long id,
String username) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.product.message;

import com.tungstun.common.messaging.Event;

@Event
public record ProductCreated(
Long id,
Long barId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.product.message;

import com.tungstun.common.messaging.Event;

@Event
public record ProductDeleted(
Long id) {
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.tungstun.bill.port.messaging.in.product.message;

import com.tungstun.common.messaging.Event;

@Event
public record ProductUpdated(
Long id,
String name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ class BillCommandHandlerMessageTest extends MessageProducerTestBases {
private JpaRepository<Bill, Long> repository;
@Autowired
private JpaRepository<Person, Long> personRepository;

private Person customer;

@BeforeAll
Expand All @@ -39,7 +40,6 @@ static void beforeAll() {
protected void setUp() {
super.setUp();
customer = personRepository.save(new Person(456L, BAR_ID, "customer"));

}

@AfterEach
Expand Down
1 change: 1 addition & 0 deletions common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation 'com.auth0:java-jwt:3.19.2'
implementation 'com.google.code.gson:gson:2.9.0'
implementation 'com.googlecode.libphonenumber:libphonenumber:8.12.49'
implementation 'org.reflections:reflections:0.10.2'
annotationProcessor "org.springframework.boot:spring-boot-configuration-processor"
}

Expand Down
16 changes: 16 additions & 0 deletions common/src/main/java/com/tungstun/common/messaging/Event.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.tungstun.common.messaging;

import org.springframework.context.annotation.Bean;

import java.lang.annotation.*;

/**
* Methods annotated with {@code @Event} are registered for serialization and deserialization of the class.
*/
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
@Bean
public @interface Event {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.tungstun.common.messaging;

import org.reflections.Reflections;
import org.springframework.kafka.annotation.KafkaHandler;

import java.lang.annotation.Annotation;
import java.util.Arrays;
import java.util.Collections;
import java.util.Set;
import java.util.stream.Stream;

/**
* Static function class that contains a set of all classes used as event DTO's.
*/
public class EventClassRegister {
private static Set<Class<?>> eventClasses;

/**
* Reflectively gets and registers all event DTO classes if eventClasses has not been initialized yet.
* Method gets all classes annotated with {@code @Event} and all parameter classes annotated with {@code @KafkaHandler}
*
* @return Set of event classes.
*/
public static Set<Class<?>> getEventClasses() {
if (eventClasses == null) {
eventClasses = new Reflections("com.tungstun")
.getTypesAnnotatedWith(Event.class);

new Reflections("com.tungstun")
.getSubTypesOf(KafkaMessageConsumer.class)
.parallelStream()
.flatMap(consumerClass -> Stream.of(consumerClass.getDeclaredMethods()))
.filter(consumerMethod -> Arrays
.stream(consumerMethod.getDeclaredAnnotations())
.map(Annotation::annotationType)
.anyMatch(annotation -> annotation.equals(KafkaHandler.class)))
.flatMap(consumerMethod -> Stream.of(consumerMethod.getParameterTypes()))
.forEach(eventClasses::add);
}
return Collections.unmodifiableSet(eventClasses);
}

private EventClassRegister() {
// Static class, there is no reason to construct an instance
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,39 +18,32 @@
import java.util.stream.Collectors;

/**
* <p>Base class for Consumer and Producer configuration classes.
* Class contains general kafka configuration properties with type mappings for a set of custom message classes</p>
* <p>Base class for Kafka Configuration classes.
* Class contains general kafka configuration properties with type mappings for a set event classes</p>
*
* <p>ConsumerConfig class example:</p>
* <p>KafkaConfig class example:</p>
* <pre>
* public class ConsumerConfig extends KafkaConfigBase {
* private static final Set< Class<?> > CLASSES = Set.of(
* SomeConsumableMessage.class
* );
* public class KafkaConfig extends KafkaConfigBase {
* private static final String TOPIC = "topic";
*
* &#64;Bean
* public KafkaListenerContainerFactory kafkaListenerContainerFactory() {
* ...
* factory.setConsumerFactory(defaultConsumerFactory(CLASSES));
* ...
* public NewTopic topic() {
* return new NewTopic(TOPIC, 1, (short) 1);
* }
* }
* </pre>
*
* <p>ProducerConfig class example:</p>
* <pre>
* public class ProducerConfig extends KafkaConfigBase {
* private static final String TOPIC = "topic";
* private static final Set< Class<?> > CLASSES = Set.of(
* SomeProducibleMessage.class
* );
*
* &#64;Bean
* public KafkaMessageProducer kafkaMessageProducer() {
* return new KafkaMessageProducer(TOPIC, defaultKafkaTemplate(CLASSES));
* return createMessageProducer(TOPIC);
* }
*
* &#64;Bean
* public KafkaListenerContainerFactory kafkaListenerContainerFactory() {
* KafkaListenerContainerFactory factory = ...
* factory.setSomeSetting(xyz);
* ...
* return factory;
* }
* }
* </pre>
*/
public abstract class KafkaConfigBase {
/**
Expand All @@ -59,43 +52,46 @@ public abstract class KafkaConfigBase {
@Value("${spring.kafka.bootstrap-servers:localhost:9292}")
protected String bootstrapServers;


/**
* Creates a new KafkaMessageProducer instance for the provided topic with the default kafka template
*/
public KafkaMessageProducer createMessageProducer(String topic) {
return new KafkaMessageProducer(topic, defaultKafkaTemplate());
}

/**
* Creates a simple KafkaTemplate for producer configs.
* KafkaTemplate keys are Strings and values can Object to allow custom kafka message classes.
*
* @param customClasses Collection of custom message classes to configure for (de)serialization
* KafkaTemplate keys are Strings and values can Object to allow custom kafka event classes.
*/
public KafkaTemplate<String, Object> defaultKafkaTemplate(Collection<Class<?>> customClasses) {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps(customClasses)));
private KafkaTemplate<String, Object> defaultKafkaTemplate() {
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configProps()));
}

/**
* Creates DefaultKafkaConsumerFactory for consumer configs.
* Creates DefaultKafkaConsumerFactory for consumer configurations
* Key and value deserializers are wrapped in ErrorHandlingDeserializer to catch thrown exceptions
*
* @param customClasses Collection of custom message classes to configure for (de)serialization
*/
public ConsumerFactory<String, Object> defaultConsumerFactory(Collection<Class<?>> customClasses) {
public ConsumerFactory<String, Object> defaultConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
configProps(customClasses),
configProps(),
new ErrorHandlingDeserializer<>(new StringDeserializer()),
new ErrorHandlingDeserializer<>(new JsonDeserializer<>()));
}

/**
* Creates general configuration properties for consumers and producers.
* Configuration properties contain created type mappings of given classes
* Creates general kafka configuration properties.
* Configuration properties contain created type mappings of registered event DTO classes
*
* @param customClasses Collection of custom message classes to configure for (de)serialization
* @return Map of configuration properties
*/
private Map<String, Object> configProps(Collection<Class<?>> customClasses) {
private Map<String, Object> configProps() {
return new HashMap<>(Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class,
JsonDeserializer.TRUSTED_PACKAGES, "com.tungstun.**",
JsonDeserializer.TYPE_MAPPINGS, convertToMapping(customClasses)
JsonDeserializer.TYPE_MAPPINGS, convertToMapping(EventClassRegister.getEventClasses())
));
}

Expand Down
Loading

0 comments on commit 416930d

Please sign in to comment.