Skip to content

Commit

Permalink
First commit
Browse files Browse the repository at this point in the history
  • Loading branch information
purplefox committed Oct 19, 2015
0 parents commit 03e7fd4
Show file tree
Hide file tree
Showing 5 changed files with 240 additions and 0 deletions.
22 changes: 22 additions & 0 deletions .gitignore
@@ -0,0 +1,22 @@
.DS_Store
.gradle
.idea
.classpath
.project
.settings
.yardoc
.yardopts
build
target
out
*.iml
*.ipr
*.iws
.vertx
test-output
src/scratchpad
test-results
test-tmp
*.class
*.swp
.vertx
39 changes: 39 additions & 0 deletions pom.xml
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.vertx</groupId>
<artifactId>vertx-proton</artifactId>
<version>1.0-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
<version>0.10</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>

<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>


</project>
63 changes: 63 additions & 0 deletions src/main/java/io/vertx/proton/HelloWorld.java
@@ -0,0 +1,63 @@
package io.vertx.proton;

import io.vertx.core.Vertx;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class HelloWorld {

public static void main(String[] args) {

// Create the Vert.x instance
Vertx vertx = Vertx.vertx();

// Create the Vert.x AMQP client instance
VertxAMQPClient client = new VertxAMQPClient(vertx, "localhost", 5672);

// Runt the example
helloWorldSendAndConsumeExample(client);

// Just stop main() from exiting
try {
System.in.read();
} catch (Exception ignore) {
}
}

private static void helloWorldSendAndConsumeExample(VertxAMQPClient client) {

// First connect, asynchronously
client.connect(res -> {

// Now we're connected!
System.out.println("We're connected");

if (res.succeeded()) {

// This is our connection object
VertxAMQPConnection conn = res.result();

// This is the address of the queue
String address = "queue://foo";

// Create a consumer that will receive messages from the queue
conn.setHandler(address, msg -> {

// Should print out; I received message: helloworld
System.out.println("I received message: " + msg);
});

// Now send a message to the queue
conn.sendMessage("queue://foo", "helloworld");

System.out.println("Sent a message");

// That's it!

} else {
res.cause().printStackTrace();
}
});
}
}
41 changes: 41 additions & 0 deletions src/main/java/io/vertx/proton/VertxAMQPClient.java
@@ -0,0 +1,41 @@
package io.vertx.proton;

import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.net.NetClient;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.engine.Connection;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxAMQPClient {

private final Vertx vertx;
private final String host;
private final int port;
private final NetClient netClient;

public VertxAMQPClient(Vertx vertx, String host, int port) {
this.vertx = vertx;
this.host = host;
this.port = port;
Connection connection = Proton.connection();
connection.setContainer("client-id:1");
connection.open();
this.netClient = vertx.createNetClient();
}

public void connect(Handler<AsyncResult<VertxAMQPConnection>> connection) {
netClient.connect(port, host, res -> {
if (res.succeeded()) {
VertxAMQPConnection amqpConnection = new VertxAMQPConnection(res.result());
connection.handle(Future.succeededFuture(amqpConnection));
} else {
connection.handle(Future.failedFuture(res.cause()));
}
});
}
}
75 changes: 75 additions & 0 deletions src/main/java/io/vertx/proton/VertxAMQPConnection.java
@@ -0,0 +1,75 @@
package io.vertx.proton;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.Handler;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.message.Message;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

/**
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class VertxAMQPConnection {

private final NetSocket socket;
private final Connection connection;
private final Transport transport;
private Handler<String> messageHandler;

public VertxAMQPConnection(NetSocket socket) {
this.socket = socket;

connection = Proton.connection();
connection.setContainer("client-id:1");
connection.open();
transport = Proton.transport();
transport.bind(connection);

socket.handler(buff -> {
transport.getInputBuffer().put(buff.getBytes());
});
}

public void sendMessage(String address, String body) {
Session session = connection.session();
session.open();
Target target = new Target();
target.setAddress(address);
Sender sender = session.sender("link1"); // Is this just an arbitrary string?
sender.setTarget(target);
sender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
sender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
sender.open();
int BUFFER_SIZE = 1024;
Message m = Proton.message();
m.setBody(new AmqpValue(body));
byte[] encodedMessage = new byte[BUFFER_SIZE];
int len = m.encode(encodedMessage, 0, BUFFER_SIZE);
String deliveryTag = "msg:1"; // Not sure the relevance of this
byte[] tag = deliveryTag.getBytes(StandardCharsets.UTF_8);
Delivery serverDelivery = sender.delivery(tag); // Not sure why this is necessary
sender.send(encodedMessage, 0, len);
sender.advance();
ByteBuffer outputBuffer = transport.getOutputBuffer();
while (outputBuffer.hasRemaining() ) {
byte buffer[] = new byte[outputBuffer.remaining()];
outputBuffer.get(buffer);
socket.write(Buffer.buffer(buffer));
transport.outputConsumed();
}
}

public void setHandler(String address, Handler<String> handler) {
this.messageHandler = handler;
// TODO - how to connect up the handler so it consumes messages, something to do with Receiver I guess?
}

}

0 comments on commit 03e7fd4

Please sign in to comment.