Skip to content

Commit

Permalink
Roughed out binder code
Browse files Browse the repository at this point in the history
Very minimal nats code and ultra minimal config.
Ready to start writing tests and an example to see if this does anything.
  • Loading branch information
Stephen Asbury committed Jun 21, 2019
1 parent a8976f4 commit beea4c6
Show file tree
Hide file tree
Showing 6 changed files with 264 additions and 23 deletions.
Expand Up @@ -38,32 +38,19 @@ public NatsChannelBinder(NatsChannelProvisioner provisioningProvider) {
@Override
protected MessageHandler createProducerMessageHandler(ProducerDestination destination,
ProducerProperties producerProperties, MessageChannel errorChannel) {
/* TODO PubSubMessageHandler messageHandler = new PubSubMessageHandler(this.pubSubTemplate, destination.getName());
messageHandler.setBeanFactory(getBeanFactory());
return messageHandler; */
return null;
return new NatsMessageHandler(destination.getName(), provisioner.getConnection());
}

@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group,
ConsumerProperties properties) {

// TODO return new PubSubInboundChannelAdapter(this.pubSubTemplate, destination.getName());
return null;
}

@Override
protected void afterUnbindConsumer(ConsumerDestination destination, String group,
ConsumerProperties consumerProperties) {
super.afterUnbindConsumer(destination, group, consumerProperties);
// TODO clean up
return new NatsMessageProducer((NatsConsumerDestination) destination, provisioner.getConnection());
}

@Override
protected PolledConsumerResources createPolledConsumerResources(String name, String group,
ConsumerDestination destination, ConsumerProperties consumerProperties) {
/* return new PolledConsumerResources(new PubSubMessageSource(this.pubSubTemplate, destination.getName()),
registerErrorInfrastructure(destination, group, consumerProperties, true)); */
return null;
return new PolledConsumerResources(new NatsMessageSource((NatsConsumerDestination) destination, provisioner.getConnection()),
registerErrorInfrastructure(destination, group, consumerProperties, true));
}
}
Expand Up @@ -28,10 +28,14 @@

public class NatsChannelProvisioner implements ProvisioningProvider<ConsumerProperties, ProducerProperties> {

private final Connection nc;
private final Connection connection;

public NatsChannelProvisioner(Connection nc) {
this.nc = nc;
this.connection = nc;
}

public Connection getConnection() {
return this.connection;
}

@Override
Expand Down
Expand Up @@ -30,4 +30,28 @@ public NatsConsumerDestination(String name) {
public String getName() {
return this.name;
}

public String getSubject() {
String[] parts = this.name.split("#");

if (parts.length > 2) {
return parts[1];
}
else {
return parts[0];
}
}

public String getQueueGroup() {
String[] parts = this.name.split("#");

if (parts.length > 2) {
return parts[2];
}
else if (parts.length > 1) {
return parts[1];
}

return "";
}
}
Expand Up @@ -16,7 +16,11 @@

package org.springframework.cloud.stream.binder.nats;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import io.nats.client.Connection;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -61,13 +65,24 @@ public NatsMessageHandler subject(String subject) {
@Override
protected void handleMessageInternal(Message<?> message) {
Object payload = message.getPayload();
byte[] bytes = null;

if (!(payload instanceof byte[])) {
logger.warn("NATS handler only supports byte array messages");
return;
if (payload instanceof byte[]) {
bytes = (byte[]) payload;
}
else if (payload instanceof ByteBuffer) {
ByteBuffer buf = ((ByteBuffer) payload);
bytes = new byte[buf.remaining()];
buf.get(bytes);
}
else if (payload instanceof String) {
bytes = ((String) payload).getBytes(StandardCharsets.UTF_8);
}

byte[] bytes = (byte[]) payload;
if (bytes == null) {
logger.warn("NATS handler only supports byte array, byte buffer and string messages");
return;
}

if (this.connection != null && this.subject != null && this.subject.length() > 0) {
this.connection.publish(this.subject, bytes);
Expand Down
@@ -0,0 +1,111 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.nats;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.GenericMessage;

public class NatsMessageProducer implements MessageProducer, Lifecycle {
private static final Log logger = LogFactory.getLog(NatsMessageHandler.class);

private NatsConsumerDestination destination;
private Connection connection;
private MessageChannel output;
private Dispatcher dispatcher;

public NatsMessageProducer() {
}

public NatsMessageProducer(NatsConsumerDestination destination, Connection nc) {
this.destination = destination;
this.connection = nc;
}

public NatsConsumerDestination getDestination() {
return this.destination;
}

public void setDestination(NatsConsumerDestination destination) {
this.destination = destination;
}

public Connection getConnection() {
return this.connection;
}

public void setConnection(Connection connection) {
this.connection = connection;
}

@Override
public void setOutputChannel(MessageChannel outputChannel) {
this.output = outputChannel;
}

@Override
public MessageChannel getOutputChannel() {
return this.output;
}

@Override
public boolean isRunning() {
return this.dispatcher != null;
}

@Override
public void start() {
if (this.dispatcher != null) {
return;
}

this.dispatcher = this.connection.createDispatcher((msg) -> {
try {
this.output.send(new GenericMessage<byte[]>(msg.getData()));
}
catch (Exception e) {
logger.warn("exception sending message to output channel", e);
}
});

String sub = this.destination.getSubject();
String queue = this.destination.getQueueGroup();

if (queue != null && queue.length() > 0) {
this.dispatcher.subscribe(sub, queue);
}
else {
this.dispatcher.subscribe(sub);
}
}

@Override
public void stop() {
if (this.dispatcher == null) {
return;
}

this.connection.closeDispatcher(this.dispatcher);
}
}
@@ -0,0 +1,100 @@
/*
* Copyright 2017-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.cloud.stream.binder.nats;

import java.time.Duration;

import io.nats.client.Connection;
import io.nats.client.Message;
import io.nats.client.Subscription;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.context.Lifecycle;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.endpoint.AbstractMessageSource;
import org.springframework.messaging.MessageChannel;

public class NatsMessageSource extends AbstractMessageSource<Object> implements Lifecycle {
private static final Log logger = LogFactory.getLog(NatsMessageHandler.class);

private NatsConsumerDestination destination;
private Connection connection;
private Subscription sub;

public NatsMessageSource(NatsConsumerDestination destination, Connection nc) {
this.destination = destination;
this.connection = nc;
}

@Override
protected Object doReceive() {
if (this.sub == null) {
return null;
}

try {
Message m = this.sub.nextMessage(Duration.ZERO);

if (m != null) {
return m.getData();
}
}
catch (InterruptedException exp) {
logger.info("wait for message interrupted");
}

return null;
}

@Override
public boolean isRunning() {
return this.sub != null;
}

@Override
public void start() {
if (this.sub != null) {
return;
}

String sub = this.destination.getSubject();
String queue = this.destination.getQueueGroup();

if (queue != null && queue.length() > 0) {
this.sub = this.connection.subscribe(sub, queue);
}
else {
this.sub = this.connection.subscribe(sub);
}
}

@Override
public void stop() {
if (this.sub == null) {
return;
}

this.sub.unsubscribe();
}

@Override
public String getComponentType() {
return "nats:message-source";
}
}

0 comments on commit beea4c6

Please sign in to comment.