Skip to content
Browse files

Initial commit with Embedded broker

  • Loading branch information...
1 parent 97c27fd commit a7f9bd5215e6efa8d418e3abcabe88fc2120cac7 @mchapala committed Sep 13, 2013
View
3 .gitignore
@@ -8,3 +8,6 @@
/.classpath
/.project
+/bin
+/data/
+/tmp/
View
35 project/Build.scala
@@ -5,11 +5,12 @@ object ApplicationBuild extends Build {
val appName = "TwsToAmqp"
val appVersion = "1.0-SNAPSHOT"
- val qpidversion ="0.22";
+ val qpidversion ="0.24";
var buildSettings = Seq(
- sbtVersion := "0.12.4",
+ sbtVersion := "0.13",
scalaVersion := "2.10.2",
+ fork:= true,
scalacOptions ++= Seq("-unchecked", "-deprecation", "-Yinline-warnings", "-Xcheckinit", "-encoding", "utf8", "-feature"),
scalacOptions ++= Seq("-language:higherKinds", "-language:postfixOps", "-language:implicitConversions", "-language:reflectiveCalls", "-language:existentials"),
javacOptions ++= Seq("-target", "1.6", "-source", "1.6", "-Xlint:deprecation"),
@@ -64,19 +65,29 @@ object ApplicationBuild extends Build {
// qpid dependenices for amqp client
//val qpidclient = "org.apache.qpid" % "qpid-client" % qpidversion
//val qpidcommon = "org.apache.qpid" % "qpid-common" % qpidversion
- val qpidbroker = "org.apache.qpid" % "qpid-broker" % qpidversion % "test"
+ // val qpidbroker = "org.apache.qpid" % "qpid-broker" % qpidversion % "test"
// proton dependencies for amqp broker/server
- val protonapi = "org.apache.qpid" % "proton-api" % "0.4"
- val protonimpl = "org.apache.qpid" % "proton-j-impl" % "0.4"
+ val protonapi = "org.apache.qpid" % "proton-api" % "0.5"
+ val protonimpl = "org.apache.qpid" % "proton-j-impl" % "0.5"
+ val proton_hawtdispatch = "org.apache.qpid" % "proton-hawtdispatch" % "0.5"
+
+
val apollo= "org.apache.activemq" % "apollo-broker" % "1.6"
- val apollo_leveldb= "org.apache.activemq" % "apollo-leveldb" % "1.6"
+ val apollo_leveldb= "org.apache.activemq" % "apollo-leveldb" % "1.6"
+ val apollo_jmx = "org.apache.activemq" % "apollo-jmx" % "1.6"
+ val apollo_web= "org.apache.activemq" % "apollo-web" % "1.6" excludeAll(
+ //ExclusionRule(organization = "com.sun.jersey"),
+ ExclusionRule(organization = "net.sf.josql")
+ )
+ val apollo_ampq = "org.apache.activemq" % "apollo-amqp" % "1.6"
-
-
-
+ val jettyWebApp= "org.eclipse.jetty" % "jetty-webapp" % "8.0.0.v20110901"
+ val jettyServer= "org.eclipse.jetty" % "jetty-server" % "8.0.0.v20110901"
+ val jettyServlet= "org.eclipse.jetty" % "jetty-servlet" % "8.0.0.v20110901"
+
}
import Compile._
@@ -87,8 +98,10 @@ object ApplicationBuild extends Build {
logback,slf4japi,slf4jimpl, xstream,
akka,scalareflect,
//qpidclient,qpidcommon,qpidbroker,
- protonapi,protonimpl,
- apollo,apollo_leveldb
+ protonapi,protonimpl,proton_hawtdispatch,
+ apollo,apollo_leveldb,apollo_web,apollo_jmx,apollo_ampq,
+ jettyServer, jettyWebApp,jettyServlet
+
)
}
View
2 src/main/resources/log4j.properties
@@ -1,4 +1,4 @@
-log4j.rootLogger=DEBUG, Console
+log4j.rootLogger=ALL, Console
log4j.appender.Console=org.apache.log4j.ConsoleAppender
log4j.appender.Console.layout=org.apache.log4j.PatternLayout
View
121 src/main/scala/broker/EmbeddedApolloBroker.scala
@@ -1,72 +1,89 @@
package broker
import org.apache.activemq.apollo.dto._
+import org.apache.activemq.apollo.broker.jmx.dto._
import org.apache.activemq.apollo.broker.store.leveldb.dto.LevelDBStoreDTO
import java.io.File
import org.apache.activemq.apollo.broker.Broker
-
-
+import constants.TwsConstants
+import org.apache.activemq.apollo.amqp.dto._
trait EmbeddedApolloBroker {
-
+
startBroker()
+ val broker = new Broker();
- def startBroker()={
- val broker = new Broker();
- broker.setTmp(new File("./tmp"));
- broker.setConfig(createConfig());
- // The broker starts asynchronously. The runnable is invoked once
- // the broker if fully started.
- System.out.println("Starting the broker.");
- broker.start(new Runnable(){
- def run() {
- System.out.println("Broker Started."+System.currentTimeMillis());
-
- }
- });
- Thread.sleep(100000);
+ def startBroker() = {
+ val broker = new Broker();
+ broker.setTmp(new File("./tmp"));
+ broker.setConfig(createConfig());
+ // The broker starts asynchronously. The runnable is invoked once
+ // the broker if fully started.
+ System.out.println("Starting the broker.");
+ broker.start(new Runnable() {
+ def run() {
+ System.out.println("Broker Started." + System.currentTimeMillis());
+
+ }
+ });
}
-
- def createConfig():BrokerDTO={
- val broker = new BrokerDTO();
- // Brokers support multiple virtual hosts.
- val host = new VirtualHostDTO();
- host.id = "localhost";
- host.host_names.add("localhost");
- host.host_names.add("127.0.0.1");
+ def createConfig(): BrokerDTO = {
+ val broker:BrokerDTO = new BrokerDTO();
+
+ // Brokers support multiple virtual hosts.
+ val host:VirtualHostDTO = new VirtualHostDTO();
+ host.id = "localhost";
+ host.host_names.add("localhost");
+ host.host_names.add("127.0.0.1");
- // The message store is configured on the virtual host.
- val store = new LevelDBStoreDTO();
- store.directory = new File("./data");
- host.store = store;
- broker.virtual_hosts.add(host);
+
+ //
+ val queue:QueueDTO = new QueueDTO();
+ queue.id = TwsConstants.TWS_QUEUE_LISTENER;
+
+ val queue2:QueueDTO = new QueueDTO();
+ queue2.id = TwsConstants.TWS_QUEUE_SENDER
+ host.queues.add(queue);
+ host.queues.add(queue2);
+
+ // The message store is configured on the virtual host.
+ val store = new LevelDBStoreDTO();
+ store.directory = new File("./data");
+ host.store = store;
+ broker.virtual_hosts.add(host);
-
-
- //
- // Control which ports and protocols the broker binds and accepts
- val connector = new AcceptingConnectorDTO();
- connector.id = "amqp";
- connector.bind = "tcp://0.0.0.0:5672";
- broker.connectors.add(connector);
+
+
+ //
+ // Control which ports and protocols the broker binds and accepts
+ val connector = new AcceptingConnectorDTO();
+ connector.id = "amqp";
+ connector.bind = TwsConstants.SERVER_ADDRESS;
+ //connector.protocol.add
+
+ broker.connectors.add(connector);
- //
- // Fires up the web admin console on HTTP.
- val webadmin = new WebAdminDTO();
- webadmin.bind = "http://localhost:8080";
- broker.web_admins.add(webadmin);
+ //
+ val jmxService = new JmxDTO();
+ jmxService.enabled = true;
+ broker.services.add(jmxService);
+ //
+ // Fires up the web admin console on HTTP.
+ val webadmin = new WebAdminDTO();
+ webadmin.bind = "http://localhost:61680";
+ broker.web_admins.add(webadmin);
- return broker;
- }
+ return broker;
+ }
- def createUpdate():BrokerDTO ={
- val broker = createConfig();
+ def createUpdate(): BrokerDTO = {
+ val broker = createConfig();
- // Lets change the port.
- val connector = broker.connectors.get(0).asInstanceOf[AcceptingConnectorDTO]
- connector.bind = "tcp://0.0.0.0:61614";
- return broker;
- }
+ // Lets change the port.
+ val connector = broker.connectors.get(0).asInstanceOf[AcceptingConnectorDTO]
+ connector.bind = "tcp://0.0.0.0:61614";
+ return broker;
+ }
}
View
5 src/main/scala/constants/TwsConstants.scala
@@ -4,12 +4,13 @@ object TwsConstants {
val TWS_DISCONNECT_ERROR:Integer=null;
- val TWS_QUEUE_LISTENER:String= "";
+ val TWS_QUEUE_LISTENER:String= "queueListener";
// This is the address where the app will listen for message and forward the request to IB-TWS
// Applications using this module will send messages to this address
- val TWS_QUEUE_SENDER:String="/queue/a";
+ val TWS_QUEUE_SENDER:String="queueSender";
+ val SERVER_ADDRESS:String = "tcp://127.0.0.1:5672";
}
View
21 src/main/scala/receiver/TwsReceiver.scala
@@ -10,6 +10,9 @@ import org.apache.qpid.proton.message.impl.MessageFactoryImpl
import constants.TwsConstants
import messages.TwsMessage
import adapter.{DummyTwsAdapterImpl, TwsMessageAdapter}
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
+import java.io.File
trait TwsReceiver{
@@ -18,23 +21,29 @@ trait TwsReceiver{
}
class TwsReceiverImpl(val adapter:TwsMessageAdapter = new DummyTwsAdapterImpl()) extends TwsReceiver {
+ val logger:Logger = LoggerFactory.getLogger(getClass().getName());
- val mng:Messenger = new MessengerFactoryImpl().createMessenger();
- val queueAddress = TwsConstants.TWS_QUEUE_SENDER;
- mng.start
+ val queueAddress = TwsConstants.SERVER_ADDRESS +File.separator+ TwsConstants.TWS_QUEUE_SENDER;
+
def send(twsMsg:TwsMessage)={
-
- /**
+ val mng:Messenger = new MessengerFactoryImpl().createMessenger();
+ mng.start
+ /**
Provide custom adapter if you want to convert TwsMessage type to some other message
**/
- val anyValue = adapter.convert(twsMsg);
+ var anyValue = adapter.convert(twsMsg);
+ //uncomment later
+ anyValue="TestMessage";
+ logger.debug("sending"+anyValue);
val msg:Message = new MessageFactoryImpl().createMessage();
msg.setAddress(queueAddress);
msg.setBody(new AmqpValue(anyValue));
// TODO how to handle custom msg delivary options ?
mng.put(msg);
+ mng.send();
+ mng.stop();
}
}
View
7 src/test/resources/log4j.properties
@@ -0,0 +1,7 @@
+log4j.rootLogger=ALL, Console
+
+log4j.appender.Console=org.apache.log4j.ConsoleAppender
+log4j.appender.Console.layout=org.apache.log4j.PatternLayout
+log4j.appender.Console.layout.ConversionPattern=%d{ABSOLUTE} %-5p ~ %C%m%n
+
+
View
24 src/test/scala/receiver/TwsReceiverImplTest.scala
@@ -6,6 +6,8 @@ import org.scalatest.FlatSpec
import org.scalatest.junit.AssertionsForJUnit
import org.scalatest.FunSuite
import broker.EmbeddedApolloBroker
+import org.slf4j.LoggerFactory
+import org.slf4j.Logger
@@ -17,11 +19,27 @@ import broker.EmbeddedApolloBroker
class TwsReceiverImplTest extends FunSuite with EmbeddedApolloBroker {
val impl:TwsReceiverImpl = new TwsReceiverImpl();
-
+ val logger:Logger = LoggerFactory.getLogger(classOf[TwsReceiverImplTest])
test("Able to send message to tws") {
- //impl.send(new TWSConnect());
+ Thread.sleep(9000);
+ logger.debug("Sending message TwsConnect to queue.")
+ try {
+ impl.send(new TWSConnect());
+ }catch {
+ case e: Exception => e.printStackTrace();
+ stopBroker
+ fail()
+ }
+ Thread.sleep(3000);
+ stopBroker
}
-
+ def stopBroker() {
+ broker.stop( new Runnable() {
+ def run() {
+ System.out.println("Stopping broker");
+ }
+ })
+ }
}

0 comments on commit a7f9bd5

Please sign in to comment.
Something went wrong with that request. Please try again.