Skip to content

Commit

Permalink
Added a multi-binder sample
Browse files Browse the repository at this point in the history
Cleaned up a few syntax complaints
Added samples parent pom
Added second level target to git ignore
Cleaned up code so that it is working based on the sample
  • Loading branch information
Stephen Asbury committed Jun 24, 2019
1 parent beea4c6 commit d12e734
Show file tree
Hide file tree
Showing 18 changed files with 198 additions and 13 deletions.
1 change: 1 addition & 0 deletions .gitignore
Expand Up @@ -49,6 +49,7 @@ Thumbs.db
# Build output directies
/target
*/target
*/*/target
/build
*/build

Expand Down
37 changes: 37 additions & 0 deletions nats-samples/multi-binder-sample/pom.xml
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>nats-binder-sample</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<packaging>jar</packaging>
<name>multi-binder-sample</name>
<description>A simple sample for the nats cloud stream binder</description>

<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>nats-samples</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<properties>
<main.basedir>${basedir}/..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-nats</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
@@ -0,0 +1,29 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.nats;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class MultiBinderSample {

public static void main(String[] args) {
SpringApplication.run(MultiBinderSample.class, args);
}

}
@@ -0,0 +1,45 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.nats;

import java.nio.charset.StandardCharsets;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Processor;
import org.springframework.messaging.handler.annotation.SendTo;

@EnableBinding(Processor.class)
public class MultiBinderTransformer {
private static final Log logger = LogFactory.getLog(MultiBinderTransformer.class);

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object transform(Object message) {
if (message instanceof byte[]) {
String value = new String((byte[]) message, StandardCharsets.UTF_8);
message = value.toUpperCase().getBytes(StandardCharsets.UTF_8);
}
else if (message instanceof String) {
message = ((String) message).toUpperCase();
}
return message;
}
}
@@ -0,0 +1,10 @@
spring:
cloud:
stream:
bindings:
input:
destination: dataIn
binder: nats
output:
destination: dataOut
binder: nats
37 changes: 37 additions & 0 deletions nats-samples/pom.xml
@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<artifactId>nats-samples</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<packaging>pom</packaging>

<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-nats-parent</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>

<properties>
<spring-boot.version>2.1.6.RELEASE</spring-boot.version>
<spring-cloud-stream.version>2.2.0.RELEASE</spring-cloud-stream.version>
<java.version>1.8</java.version>
</properties>

<modules>
<module>multi-binder-sample</module>
</modules>

<dependencyManagement>
</dependencyManagement>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
1 change: 1 addition & 0 deletions pom.xml
Expand Up @@ -22,6 +22,7 @@
<modules>
<module>spring-nats</module>
<module>spring-cloud-stream-binder-nats</module>
<module>nats-samples</module>
</modules>

<dependencyManagement>
Expand Down
2 changes: 1 addition & 1 deletion spring-cloud-stream-binder-nats/bin/pom.xml
Expand Up @@ -4,7 +4,7 @@
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-nats-parent</artifactId>
<artifactId>spring-nats-parent</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
</parent>
<artifactId>spring-cloud-starter-stream-nats</artifactId>
Expand Down
2 changes: 2 additions & 0 deletions spring-cloud-stream-binder-nats/pom.xml
Expand Up @@ -5,6 +5,7 @@

<artifactId>spring-cloud-stream-binder-nats</artifactId>
<description>Spring Cloud Stream Binder for NATS</description>
<version>0.0.1.BUILD-SNAPSHOT</version>
<organization>
<name>CNCF</name>
<url>https://www.nats.io</url>
Expand All @@ -14,6 +15,7 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-nats-parent</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>


Expand Down
Expand Up @@ -16,6 +16,9 @@

package org.springframework.cloud.stream.binder.nats;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.binder.AbstractMessageChannelBinder;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
Expand All @@ -25,8 +28,9 @@
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

public class NatsChannelBinder extends
AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, NatsChannelProvisioner> {
public class NatsChannelBinder
extends AbstractMessageChannelBinder<ConsumerProperties, ProducerProperties, NatsChannelProvisioner> {
private static final Log logger = LogFactory.getLog(NatsChannelBinder.class);

private final NatsChannelProvisioner provisioner;

Expand All @@ -50,7 +54,8 @@ protected MessageProducer createConsumerEndpoint(ConsumerDestination destination
@Override
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
ConsumerDestination destination, ConsumerProperties consumerProperties) {
return new PolledConsumerResources(new NatsMessageSource((NatsConsumerDestination) destination, provisioner.getConnection()),
return new PolledConsumerResources(
new NatsMessageSource((NatsConsumerDestination) destination, provisioner.getConnection()),
registerErrorInfrastructure(destination, group, consumerProperties, true));
}
}
Expand Up @@ -18,6 +18,8 @@

import io.nats.client.Connection;
import io.nats.client.NUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
Expand All @@ -27,6 +29,7 @@
import org.springframework.cloud.stream.provisioning.ProvisioningProvider;

public class NatsChannelProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {
private static final Log logger = LogFactory.getLog(NatsChannelProvisioner.class);

private final Connection connection;

Expand All @@ -49,7 +52,7 @@ public ConsumerDestination provisionConsumerDestination(String subject, String g
throws ProvisioningException {
String subscriptionName;

if (group != null || group.length() > 0) {
if (group != null && group.length() > 0) {
subscriptionName = subject + "#" + group;
}
else {
Expand Down
Expand Up @@ -16,9 +16,13 @@

package org.springframework.cloud.stream.binder.nats;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.provisioning.ConsumerDestination;

public class NatsConsumerDestination implements ConsumerDestination {
private static final Log logger = LogFactory.getLog(NatsConsumerDestination.class);

private String name;

Expand Down
Expand Up @@ -20,7 +20,6 @@
import java.nio.charset.StandardCharsets;

import io.nats.client.Connection;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down
Expand Up @@ -18,7 +18,6 @@

import io.nats.client.Connection;
import io.nats.client.Dispatcher;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down
Expand Up @@ -21,14 +21,11 @@
import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.messaging.MessageChannel;

public class NatsMessageSource extends AbstractMessageSource<Object> implements Lifecycle {
private static final Log logger = LogFactory.getLog(NatsMessageHandler.class);
Expand Down
Expand Up @@ -16,9 +16,13 @@

package org.springframework.cloud.stream.binder.nats;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.cloud.stream.provisioning.ProducerDestination;

public class NatsProducerDestination implements ProducerDestination {
private static final Log logger = LogFactory.getLog(NatsProducerDestination.class);

private String name;

Expand Down
3 changes: 2 additions & 1 deletion spring-nats/pom.xml
Expand Up @@ -5,6 +5,7 @@

<artifactId>spring-nats</artifactId>
<description>NATS Implementation for Spring</description>
<version>0.0.1.BUILD-SNAPSHOT</version>
<organization>
<name>CNCF</name>
<url>https://www.nats.io</url>
Expand All @@ -14,9 +15,9 @@
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-nats-parent</artifactId>
<version>0.0.1.BUILD-SNAPSHOT</version>
<relativePath>..</relativePath>
</parent>


<properties>
<main.basedir>${basedir}/..</main.basedir>
</properties>
Expand Down
Expand Up @@ -17,10 +17,11 @@
package org.springframework.boot.autoconfigure.nats;

import java.io.IOException;
import java.lang.InterruptedException;

import io.nats.client.Connection;
import io.nats.client.Nats;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
Expand All @@ -35,11 +36,21 @@
@ConditionalOnClass({ Connection.class })
@EnableConfigurationProperties(NatsProperties.class)
public class NatsAutoConfiguration {
private static final Log logger = LogFactory.getLog(NatsAutoConfiguration.class);

@Bean
@ConditionalOnMissingBean
public Connection natsConnection(NatsProperties properties) throws IOException, InterruptedException {
return Nats.connect(properties.toOptions());
Connection nc = null;

try {
nc = Nats.connect(properties.toOptions());
}
catch (Exception e) {
e.printStackTrace();
throw e;
}
return nc;
}

}

0 comments on commit d12e734

Please sign in to comment.