Skip to content

Commit

Permalink
Update stream samples to latest model
Browse files Browse the repository at this point in the history
* Update Stream applicaiton Kafka samples to use the latest functional model
* Use the new test binder to test the components

Resolves #131
  • Loading branch information
sobychacko committed Mar 16, 2020
1 parent 158f455 commit 2b3d2b6
Show file tree
Hide file tree
Showing 13 changed files with 183 additions and 134 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

Expand All @@ -22,7 +22,7 @@
<docker.org>springcloudstream</docker.org>
<docker.version>${project.version}</docker.version>
<java.version>1.8</java.version>
<spring-cloud.version>Greenwich.SR1</spring-cloud.version>
<spring-cloud.version>Hoxton.SR2</spring-cloud.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,33 @@
<artifactId>spring-boot-starter-cloud-connectors</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>



</project>
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package io.spring.dataflow.sample.usagecostlogger;

import java.util.function.Consumer;

import io.spring.dataflow.sample.UsageCostDetail;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
@Configuration
public class UsageCostLogger {

private static final Logger logger = LoggerFactory.getLogger(UsageCostLoggerApplication.class);

@StreamListener(Sink.INPUT)
public void process(UsageCostDetail usageCostDetail) {
logger.info(usageCostDetail.toString());
@Bean
public Consumer<UsageCostDetail> process() {
return usageCostDetail -> {
logger.info(usageCostDetail.toString());
};
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
spring.cloud.stream.function.bindings.process-in-0=input
spring.cloud.stream.bindings.input.destination=usage-cost
Original file line number Diff line number Diff line change
@@ -1,53 +1,55 @@
package io.spring.dataflow.sample.usagecostlogger;

import java.util.HashMap;
import java.util.Map;

import io.spring.dataflow.sample.UsageCostDetail;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.test.system.CapturedOutput;
import org.springframework.boot.test.system.OutputCaptureExtension;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

@ExtendWith(OutputCaptureExtension.class)
public class UsageCostLoggerApplicationTests {

@Autowired
protected Sink sink;

@Autowired
protected UsageCostLogger usageCostLogger;

@Test
public void contextLoads() {
}

@Test
public void testUsageCostLogger() throws Exception {
ArgumentCaptor<UsageCostDetail> captor = ArgumentCaptor.forClass(UsageCostDetail.class);
this.sink.input().send(MessageBuilder.withPayload("{\"userId\":\"user3\",\"callCost\":10.100000000000001,\"dataCost\":25.1}").build());
verify(this.usageCostLogger).process(captor.capture());
}
public void testUsageCostLogger(CapturedOutput output) {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration
.getCompleteConfiguration(UsageCostLoggerApplication.class))
.web(WebApplicationType.NONE)
.run()) {

InputDestination source = context.getBean(InputDestination.class);

UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId("user1");
usageCostDetail.setCallCost(3.0);
usageCostDetail.setDataCost(5.0);

final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageCostDetail, messageHeaders);

@EnableAutoConfiguration
@EnableBinding(Sink.class)
static class TestConfig {
source.send(message);

// Override `UsageCostLogger` bean for spying.
@Bean
@Primary
public UsageCostLogger usageCostLogger() {
return spy(new UsageCostLogger());
Awaitility.await().until(output::getOut, value -> value.contains("{\"userId\": \"user1\", \"callCost\": \"3.0\", \"dataCost\": \"5.0\" }"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,9 @@
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,27 @@
package io.spring.dataflow.sample.usagecostprocessor;

import java.util.function.Function;

import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
@Configuration
public class UsageCostProcessor {

private double ratePerSecond = 0.1;

private double ratePerMB = 0.05;

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public UsageCostDetail processUsageCost(UsageDetail usageDetail) {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
@Bean
public Function<UsageDetail, UsageCostDetail> processUsageCost() {
return usageDetail -> {
UsageCostDetail usageCostDetail = new UsageCostDetail();
usageCostDetail.setUserId(usageDetail.getUserId());
usageCostDetail.setCallCost(usageDetail.getDuration() * this.ratePerSecond);
usageCostDetail.setDataCost(usageDetail.getData() * this.ratePerMB);
return usageCostDetail;
};
}
}
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
spring.cloud.stream.function.bindings.processUsageCost-in-0=input
spring.cloud.stream.function.bindings.processUsageCost-out-0=output
spring.cloud.stream.bindings.input.destination=usage-detail
spring.cloud.stream.bindings.output.destination=usage-cost
Original file line number Diff line number Diff line change
@@ -1,16 +1,60 @@
package io.spring.dataflow.sample.usagecostprocessor;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import java.util.HashMap;
import java.util.Map;

import io.spring.dataflow.sample.UsageCostDetail;
import io.spring.dataflow.sample.UsageDetail;
import org.junit.jupiter.api.Test;
import org.springframework.boot.WebApplicationType;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.cloud.stream.binder.test.InputDestination;
import org.springframework.cloud.stream.binder.test.OutputDestination;
import org.springframework.cloud.stream.binder.test.TestChannelBinderConfiguration;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.converter.MessageConverter;

import static org.assertj.core.api.Assertions.assertThat;

@RunWith(SpringRunner.class)
@SpringBootTest
public class UsageCostProcessorApplicationTests {

@Test
public void contextLoads() {
}

@Test
public void testUsageCostProcessor() {
try (ConfigurableApplicationContext context = new SpringApplicationBuilder(
TestChannelBinderConfiguration.getCompleteConfiguration(
UsageCostProcessorApplication.class)).web(WebApplicationType.NONE)
.run()) {

InputDestination source = context.getBean(InputDestination.class);

UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId("user1");
usageDetail.setDuration(30L);
usageDetail.setData(100L);

final MessageConverter converter = context.getBean(CompositeMessageConverter.class);
Map<String, Object> headers = new HashMap<>();
headers.put("contentType", "application/json");
MessageHeaders messageHeaders = new MessageHeaders(headers);
final Message<?> message = converter.toMessage(usageDetail, messageHeaders);

source.send(message);

OutputDestination target = context.getBean(OutputDestination.class);
Message<byte[]> sourceMessage = target.receive(10000);

final UsageCostDetail usageCostDetail = (UsageCostDetail) converter
.fromMessage(sourceMessage, UsageCostDetail.class);

assertThat(usageCostDetail.getCallCost()).isEqualTo(3.0);
assertThat(usageCostDetail.getDataCost()).isEqualTo(5.0);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,12 @@
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<artifactId>spring-cloud-stream</artifactId>
<type>test-jar</type>
<classifier>test-binder</classifier>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,25 @@
package io.spring.dataflow.sample.usagedetailsender;

import java.util.Random;
import java.util.function.Supplier;

import io.spring.dataflow.sample.UsageDetail;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

@EnableScheduling
@EnableBinding(Source.class)
@Configuration
public class UsageDetailSender {

@Autowired
private Source source;

private String[] users = {"user1", "user2", "user3", "user4", "user5"};

@Scheduled(fixedDelay = 1000)
public void sendEvents() {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
this.source.output().send(MessageBuilder.withPayload(usageDetail).build());
@Bean
public Supplier<UsageDetail> sendEvents() {
return () -> {
UsageDetail usageDetail = new UsageDetail();
usageDetail.setUserId(this.users[new Random().nextInt(5)]);
usageDetail.setDuration(new Random().nextInt(300));
usageDetail.setData(new Random().nextInt(700));
return usageDetail;
};
}
}
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
spring.cloud.stream.function.bindings.sendEvents-out-0=output
spring.cloud.stream.bindings.output.destination=usage-detail

0 comments on commit 2b3d2b6

Please sign in to comment.