Permalink
Browse files

Initial commit with Embedded broker

  • Loading branch information...
1 parent 97c27fd commit 0c03375f598e311e75b5d952c52d2dcfa849f169 @mchapala committed with Sep 13, 2013
View
@@ -8,3 +8,6 @@
/.classpath
/.project
+/bin
+/data/
+/tmp/
View
@@ -2,3 +2,8 @@ TwsToAmqp
=========
Listens to messages in queue ans sends to tws & vice versa
+
+## Default user/password could be admin/password
+
+## Broker url
+http://localhost:61680/broker
View
@@ -5,11 +5,13 @@ object ApplicationBuild extends Build {
val appName = "TwsToAmqp"
val appVersion = "1.0-SNAPSHOT"
- val qpidversion ="0.22";
+ val qpidVersion = "1.0-SNAPSHOT"
+ val qpidversion ="0.24";
var buildSettings = Seq(
- sbtVersion := "0.12.4",
+ sbtVersion := "0.13",
scalaVersion := "2.10.2",
+ fork:= true,
scalacOptions ++= Seq("-unchecked", "-deprecation", "-Yinline-warnings", "-Xcheckinit", "-encoding", "utf8", "-feature"),
scalacOptions ++= Seq("-language:higherKinds", "-language:postfixOps", "-language:implicitConversions", "-language:reflectiveCalls", "-language:existentials"),
javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:deprecation"),
@@ -64,19 +66,30 @@ object ApplicationBuild extends Build {
// qpid dependenices for amqp client
//val qpidclient = "org.apache.qpid" % "qpid-client" % qpidversion
//val qpidcommon = "org.apache.qpid" % "qpid-common" % qpidversion
- val qpidbroker = "org.apache.qpid" % "qpid-broker" % qpidversion % "test"
+ // val qpidbroker = "org.apache.qpid" % "qpid-broker" % qpidversion % "test"
// proton dependencies for amqp broker/server
- val protonapi = "org.apache.qpid" % "proton-api" % "0.4"
- val protonimpl = "org.apache.qpid" % "proton-j-impl" % "0.4"
+ val protonapi = "org.apache.qpid" % "proton-api" % qpidVersion
+ val protonimpl = "org.apache.qpid" % "proton-j-impl" % qpidVersion
+ val proton_hawtdispatch = "org.apache.qpid" % "proton-hawtdispatch" % qpidVersion
+
+
+
val apollo= "org.apache.activemq" % "apollo-broker" % "1.6"
- val apollo_leveldb= "org.apache.activemq" % "apollo-leveldb" % "1.6"
+ val apollo_leveldb= "org.apache.activemq" % "apollo-leveldb" % "1.6"
+ val apollo_jmx = "org.apache.activemq" % "apollo-jmx" % "1.6"
+ val apollo_web= "org.apache.activemq" % "apollo-web" % "1.6" excludeAll(
+ //ExclusionRule(organization = "com.sun.jersey"),
+ ExclusionRule(organization = "net.sf.josql")
+ )
+ val apollo_ampq = "org.apache.activemq" % "apollo-amqp" % "1.6"
-
-
-
+ val jettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % "8.0.0.v20110901"
+ val jettyServer= "org.eclipse.jetty" % "jetty-server" % "8.0.0.v20110901"
+ val jettyServlet= "org.eclipse.jetty" % "jetty-servlet" % "8.0.0.v20110901"
+
}
import Compile._
@@ -87,8 +100,10 @@ object ApplicationBuild extends Build {
logback,slf4japi,slf4jimpl, xstream,
akka,scalareflect,
//qpidclient,qpidcommon,qpidbroker,
- protonapi,protonimpl,
- apollo,apollo_leveldb
+ protonapi,protonimpl,proton_hawtdispatch,
+ apollo,apollo_leveldb,apollo_web,apollo_jmx,apollo_ampq,
+ jettyServer, jettyWebApp,jettyServlet
+
)
}
@@ -1,8 +1,12 @@
-log4j.rootLogger=DEBUG, Console
+log4j.rootLogger=ALL, Console, FILE
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
log4j.appender.Console.layout.ConversionPattern=%d{ABSOLUTE} %-5p ~ %C%m%n
+log4j.appender.FILE=twsbrokeradapter.log
+log4j.appender.FILE.layout=org.apache.log4j.PatternLayout
+log4j.appender.FILE.layout.ConversionPattern=%d{ABSOLUTE} %-5p ~ %C%m%n
+
logger.akka=DEBUG
@@ -1,72 +1,92 @@
package broker
import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.broker.jmx.dto._
import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO
import java.io.File
import org.apache.activemq.apollo.broker.Broker
-
-
+import constants.TwsConstants
+import org.apache.activemq.apollo.amqp.dto._
trait EmbeddedApolloBroker {
-
+
startBroker()
-
- def startBroker()={
- val broker = new Broker();
- broker.setTmp(new File("./tmp"));
- broker.setConfig(createConfig());
- // The broker starts asynchronously. The runnable is invoked once
- // the broker if fully started.
- System.out.println("Starting the broker.");
- broker.start(new Runnable(){
- def run() {
- System.out.println("Broker Started."+System.currentTimeMillis());
-
- }
- });
- Thread.sleep(100000);
+ val broker = new Broker();
+
+ def startBroker() = {
+ val broker = new Broker();
+ broker.setTmp(new File("./tmp"));
+ broker.setConfig(createConfig());
+ // The broker starts asynchronously. The runnable is invoked once
+ // the broker if fully started.
+ System.out.println("Starting the broker.");
+ broker.start(new Runnable() {
+ def run() {
+ System.out.println("Broker Started." + System.currentTimeMillis());
+
+ }
+ });
+ }
+
+ def createConfig(): BrokerDTO = {
+ val broker: BrokerDTO = new BrokerDTO();
+
+ // Brokers support multiple virtual hosts.
+ val host: VirtualHostDTO = new VirtualHostDTO();
+ host.id = "localhost";
+ host.host_names.add("localhost");
+ host.host_names.add("127.0.0.1");
+
+
+ //
+ val queue: QueueDTO = new QueueDTO();
+ queue.id = TwsConstants.TWS_QUEUE_LISTENER;
+
+ val queue2: QueueDTO = new QueueDTO();
+ queue2.id = TwsConstants.TWS_QUEUE_SENDER
+ host.queues.add(queue);
+ host.queues.add(queue2);
+ host.auto_create_destinations = true
+
+ // The message store is configured on the virtual host.
+ val store = new LevelDBStoreDTO();
+ store.directory = new File("./data");
+ host.store = store;
+ broker.virtual_hosts.add(host);
+
+
+
+ //
+ // Control which ports and protocols the broker binds and accepts
+ val connector = new AcceptingConnectorDTO();
+ connector.id = "amqp";
+ connector.bind = TwsConstants.SERVER_ADDRESS;
+ connector.protocols.add(new AmqpDTO());
+ connector.protocol="amqp";
+
+ broker.connectors.add(connector);
+
+
+ //
+ val jmxService = new JmxDTO();
+ jmxService.enabled = true;
+ broker.services.add(jmxService);
+ //
+ // Fires up the web admin console on HTTP.
+ val webadmin = new WebAdminDTO();
+ webadmin.bind = "http://localhost:61680";
+ broker.web_admins.add(webadmin);
+
+ return broker;
+ }
+
+ def createUpdate(): BrokerDTO = {
+ val broker = createConfig();
+
+ // Lets change the port.
+ val connector = broker.connectors.get(0).asInstanceOf[AcceptingConnectorDTO]
+ connector.bind = "tcp://0.0.0.0:61614";
+ return broker;
}
-
- def createConfig():BrokerDTO={
- val broker = new BrokerDTO();
-
- // Brokers support multiple virtual hosts.
- val host = new VirtualHostDTO();
- host.id = "localhost";
- host.host_names.add("localhost");
- host.host_names.add("127.0.0.1");
-
- // The message store is configured on the virtual host.
- val store = new LevelDBStoreDTO();
- store.directory = new File("./data");
- host.store = store;
- broker.virtual_hosts.add(host);
-
-
-
- //
- // Control which ports and protocols the broker binds and accepts
- val connector = new AcceptingConnectorDTO();
- connector.id = "amqp";
- connector.bind = "tcp://0.0.0.0:5672";
- broker.connectors.add(connector);
-
- //
- // Fires up the web admin console on HTTP.
- val webadmin = new WebAdminDTO();
- webadmin.bind = "http://localhost:8080";
- broker.web_admins.add(webadmin);
-
- return broker;
- }
-
- def createUpdate():BrokerDTO ={
- val broker = createConfig();
-
- // Lets change the port.
- val connector = broker.connectors.get(0).asInstanceOf[AcceptingConnectorDTO]
- connector.bind = "tcp://0.0.0.0:61614";
- return broker;
- }
}
@@ -4,12 +4,13 @@ object TwsConstants {
val TWS_DISCONNECT_ERROR:Integer=null;
- val TWS_QUEUE_LISTENER:String= "";
+ val TWS_QUEUE_LISTENER:String= "queueListener";
// This is the address where the app will listen for message and forward the request to IB-TWS
// Applications using this module will send messages to this address
- val TWS_QUEUE_SENDER:String="/queue/a";
-
-
+ val TWS_QUEUE_SENDER:String="queueSender";
+ val SERVER_ADDRESS:String = "tcp://127.0.0.1:5672";
+ //val SERVER_ADDRESS:String = "tcp://127.0.0.1:5672";amqp://0.0.0.0/test
+ //val SERVER_ADDRESS:String = "amqp://0.0.0.0:5674/test";
}
@@ -2,39 +2,93 @@ package receiver
import org.apache.qpid.proton.amqp.messaging.AmqpValue
import org.apache.qpid.proton.message.Message
-import org.apache.qpid.proton.message.impl.MessageImpl
import org.apache.qpid.proton.messenger.Messenger
-import org.apache.qpid.proton.messenger.impl.MessengerImpl
import org.apache.qpid.proton.messenger.impl.MessengerFactoryImpl
import org.apache.qpid.proton.message.impl.MessageFactoryImpl
import constants.TwsConstants
import messages.TwsMessage
import adapter.{DummyTwsAdapterImpl, TwsMessageAdapter}
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+import java.io.File
-trait TwsReceiver{
- def send(message:TwsMessage)
+trait TwsReceiver {
+ def send(message: TwsMessage)
}
-class TwsReceiverImpl(val adapter:TwsMessageAdapter = new DummyTwsAdapterImpl()) extends TwsReceiver {
-
- val mng:Messenger = new MessengerFactoryImpl().createMessenger();
-
- val queueAddress = TwsConstants.TWS_QUEUE_SENDER;
- mng.start
-
- def send(twsMsg:TwsMessage)={
-
+
+/**
+ * Makes use of qpid for implementing
+ *
+ * @param adapter
+ */
+class TwsReceiverImpl(val adapter: TwsMessageAdapter = new DummyTwsAdapterImpl()) extends TwsReceiver {
+ val logger: Logger = LoggerFactory.getLogger(getClass().getName());
+ val queueAddress = TwsConstants.SERVER_ADDRESS + File.separator + TwsConstants.TWS_QUEUE_SENDER;
+
+
+ def send(twsMsg: TwsMessage) = {
+ val mng: Messenger = new MessengerFactoryImpl().createMessenger();
+ mng.start
/**
- Provide custom adapter if you want to convert TwsMessage type to some other message
- **/
-
- val anyValue = adapter.convert(twsMsg);
- val msg:Message = new MessageFactoryImpl().createMessage();
+ Provide custom adapter if you want to convert TwsMessage type to some other message
+ * */
+
+ var anyValue = adapter.convert(twsMsg);
+ //uncomment later
+ anyValue = "TestMessage";
+ logger.debug("sending" + anyValue);
+ val msg: Message = new MessageFactoryImpl().createMessage();
msg.setAddress(queueAddress);
msg.setBody(new AmqpValue(anyValue));
+
// TODO how to handle custom msg delivary options ?
mng.put(msg);
+ mng.send();
+ mng.stop();
}
+
+ /**
+ def sendAmqp(){
+
+ val amqp = new AmqpConnectOptions();
+ amqp.setHost("localhost", port)
+ amqp.setUser("admin");
+ amqp.setPassword("password");
+
+ val connection = AmqpConnection.connect(amqp)
+ val dispatchQueue = connection.queue()
+ dispatchQueue.createQueue("queue://FOO")
+
+ connection.queue() {
+ val session = connection.createSession()
+ val target = new Target
+ target.setAddress("queue://FOO")
+ val sender = session.createSender(target);
+ val md = sender.send(session.createTextMessage("Hello World"))
+ md.onSettle
+
+
+ ( {
+ val source = new Source
+ source.setAddress("queue://FOO")
+ val receiver = session.createReceiver(source);
+ receiver.resume()
+ receiver.setDeliveryListener(new AmqpDeliveryListener {
+ def onMessageDelivery(delivery: MessageDelivery) = {
+ println("Received: " + delivery.getMessage().getBody().asInstanceOf[AmqpValue].getValue);
+ delivery.settle()
+ connection.close()
+ }
+ })
+ })
+ }
+ connection.waitForDisconnected()
+
+
+ }
+ * */
+
}
@@ -0,0 +1,7 @@
+log4j.rootLogger=ALL, Console
+
+log4j.appender.Console=org.apache.log4j.ConsoleAppender
+log4j.appender.Console.layout=org.apache.log4j.PatternLayout
+log4j.appender.Console.layout.ConversionPattern=%d{ABSOLUTE} %-5p ~ %C%m%n
+
+
Oops, something went wrong.

0 comments on commit 0c03375

Please sign in to comment.