diff --git a/pom.xml b/pom.xml
index d52a5d5..46bdef5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -43,11 +43,12 @@
dev.vality
damsel
+ 1.685-5c25c2e
dev.vality
machinegun-proto
- 1.38-347c5c4
+ 1.43-3decc8f
dev.vality
@@ -61,7 +62,7 @@
dev.vality
reporter-proto
- 1.16-cc187f5
+ 1.18-d7d9995
dev.vality
@@ -203,6 +204,11 @@
3.2.0
test
+
+ org.springframework.kafka
+ spring-kafka-test
+ test
+
org.awaitility
awaitility
diff --git a/src/main/java/dev/vality/reporter/service/impl/DominantServiceImpl.java b/src/main/java/dev/vality/reporter/service/impl/DominantServiceImpl.java
index f080e1c..74b55c0 100644
--- a/src/main/java/dev/vality/reporter/service/impl/DominantServiceImpl.java
+++ b/src/main/java/dev/vality/reporter/service/impl/DominantServiceImpl.java
@@ -11,7 +11,6 @@
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Service;
-import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -56,6 +55,7 @@ public Map getShopConfigs(String partyId) {
var versionedObjectWithReferences = getPartyObject(partyId);
var shopConfigObjects = versionedObjectWithReferences.getReferencedBy().stream()
.map(VersionedObject::getObject)
+ .filter(DomainObject::isSetShopConfig)
.map(DomainObject::getShopConfig)
.toList();
log.debug("Receive shops for partyId: {}, shopConfigObjects ='{}'", partyId, shopConfigObjects);
diff --git a/src/test/java/dev/vality/reporter/kafka/KafkaListenerTest.java b/src/test/java/dev/vality/reporter/kafka/KafkaListenerTest.java
index b41d9f2..f0492c4 100644
--- a/src/test/java/dev/vality/reporter/kafka/KafkaListenerTest.java
+++ b/src/test/java/dev/vality/reporter/kafka/KafkaListenerTest.java
@@ -5,38 +5,50 @@
import dev.vality.reporter.config.MockitoSharedServices;
import dev.vality.reporter.model.KafkaEvent;
import dev.vality.reporter.service.impl.InvoicingService;
-import dev.vality.testcontainers.annotations.KafkaConfig;
-import dev.vality.testcontainers.annotations.kafka.KafkaTestcontainerSingleton;
-import dev.vality.testcontainers.annotations.kafka.config.KafkaProducer;
import dev.vality.testcontainers.annotations.postgresql.PostgresqlTestcontainerSingleton;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.thrift.TBase;
+import org.apache.thrift.TSerializer;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.MessageListenerContainer;
+import org.springframework.kafka.test.EmbeddedKafkaBroker;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
import org.springframework.test.context.bean.override.mockito.MockitoBean;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
+import java.util.Properties;
import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
import static org.mockito.Mockito.timeout;
import static org.mockito.Mockito.verify;
@SpringBootTest
-@KafkaConfig
@DirtiesContext
-@KafkaTestcontainerSingleton(
- properties = {
- "kafka.topics.invoicing.enabled=true",
- "kafka.consumer.max-poll-records=1"
- },
- topicsKeys = "kafka.topics.invoicing.id"
-)
+@EmbeddedKafka(partitions = 1, topics = "reporter-invoicing-test")
+@TestPropertySource(properties = {
+ "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
+ "spring.kafka.consumer.group-id=reporter-kafka-test",
+ "kafka.topics.invoicing.id=reporter-invoicing-test",
+ "kafka.topics.invoicing.enabled=true",
+ "kafka.consumer.max-poll-records=1"
+})
@MockitoSharedServices
@PostgresqlTestcontainerSingleton(excludeTruncateTables = "schema_version")
class KafkaListenerTest {
@@ -48,16 +60,26 @@ class KafkaListenerTest {
private InvoicingService invoicingService;
@Autowired
- private KafkaProducer> testThriftKafkaProducer;
+ private EmbeddedKafkaBroker embeddedKafkaBroker;
+
+ @Autowired
+ private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@Captor
private ArgumentCaptor> arg;
+ @BeforeEach
+ void waitForKafkaListenersAssignment() {
+ for (MessageListenerContainer listenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
+ ContainerTestUtils.waitForAssignment(listenerContainer, embeddedKafkaBroker.getPartitionsPerTopic());
+ }
+ }
+
@Test
void listenInvoicingChanges() throws Exception {
int eventsCount = 1;
for (int i = 0; i < eventsCount; i++) {
- testThriftKafkaProducer.send(invoicingTopic, createSinkEvent());
+ send(invoicingTopic, createSinkEvent());
}
verify(invoicingService, timeout(50000).times(eventsCount))
.handleEvents(arg.capture());
@@ -82,4 +104,21 @@ private MachineEvent createMessage() {
message.setData(data);
return message;
}
+
+ private void send(String topic, TBase, ?> event) throws Exception {
+ try (var producer = new KafkaProducer(producerProperties())) {
+ var serializer = new TSerializer(new TBinaryProtocol.Factory());
+ producer.send(new ProducerRecord<>(topic, serializer.serialize(event))).get();
+ producer.flush();
+ }
+ }
+
+ private Properties producerProperties() {
+ var properties = new Properties();
+ properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ properties.put(ProducerConfig.ACKS_CONFIG, "all");
+ return properties;
+ }
}