Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Removing camel queue and adding hazelcast queue

  • Loading branch information...
commit 722a3e83d40fbf78f8331c9f1a459ea703251e8a 1 parent b5c9e70
Chapalamadugu Muralikrishna authored
View
7 app/src/main/scala/com/ats/app/config/spring/AtsHazelcast.java
@@ -1,14 +1,10 @@
package com.ats.app.config.spring;
-import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
-import org.springframework.aop.scope.DefaultScopedObject;
import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.Import;
/**
@@ -31,4 +27,7 @@ public HazelcastInstance hazelcastInstance(){
+
+
+
}
View
35 app/src/main/scala/com/ats/common/AtsMessagingUtil.scala
@@ -0,0 +1,35 @@
+package com.ats.common
+
+import _root_.util.SpringUtil
+import org.springframework.stereotype.Component
+import com.hazelcast.core.HazelcastInstance
+import org.springframework.beans.factory.annotation.Autowired
+import java.util.concurrent.BlockingQueue
+
+/**
+ * Created by chapamu on 6/11/13.
+ */
+
+
+object AtsMessagingUtil {
+
+
+ val hazelcastInstance:HazelcastInstance = SpringUtil.getBean(classOf[HazelcastInstance]);
+
+
+ def sendMessageToQueue(queueName:String,message:Any ){
+
+ }
+
+ /**
+ * Adds the specified element to this queue, waiting if necessary for space to become available.
+ * @param queueName
+ * @param message
+ */
+ def putMessageInQueue(queueName:String,message:Any){
+ val queue:BlockingQueue[Any] = hazelcastInstance.getQueue(queueName);
+ queue.put(message);
+
+ }
+
+}
View
7 app/src/main/scala/com/ats/common/Constants.java
@@ -28,9 +28,8 @@
public static final String GENERIC_TICK_LIST_MARK_PRICE="221";
public static final Integer TWS_DISCONNECT_ERROR=502;
- // AKKA Actors
- //public static final String TWSPUBLISHER = "/user/TwsAdapterActorPublisher";
- public static final String SEND_MSG_TO_TWS = "direct:send_msg_to_tws";
- public static final String RECEIVE_MSG_FROM_TWS = "direct:receive_msg_from_tws";
+
+ public static final String SEND_MSG_TO_BROKER = "SEND_MSG_TO_BROKER";
+ public static final String RECEIVE_MSG_FROM_BROKER = "RECEIVE_MSG_FROM_BROKER";
}
View
102 app/src/main/scala/com/ats/impl/tws/EWrapperImpl.scala
@@ -6,6 +6,7 @@ import messages.TWSNextValidId
import messages.TwsUpdateAccount
import messages.TwsUpdatePortifolio
import com.ats.app.config.spring.AtsCamelContext
+import com.ats.common.{AtsMessagingUtil, Constants}
// TODO remove this import
import logger.Logs
@@ -13,6 +14,8 @@ import messages._
import util.Util
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.stereotype.Component
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -25,96 +28,93 @@ import org.springframework.stereotype.Component
@Component
-class EWrapperImpl() extends EWrapper with Logs {
+class EWrapperImpl() extends EWrapper{
- // This will forward all messages from TWS to this destination
- lazy val endPointUri: String = com.ats.common.Constants.RECEIVE_MSG_FROM_TWS;
-
- @Autowired
- var camelContext:AtsCamelContext=null;
+ val queueName = Constants.RECEIVE_MSG_FROM_BROKER;
+ val logger:Logger = LoggerFactory.getLogger(getClass().getName());
def bondContractDetails(requestId: Int, contractDetails: ContractDetails) {
- debug("EWrapperImpl.bondContractDetails");
+ logger.debug("EWrapperImpl.bondContractDetails");
}
def contractDetails(requestId: Int, contractDetails: ContractDetails) {
//ewadapter.contractDetails(requestId, contractDetails);
- debug("EWrapperImpl.contractDetails")
+ logger.debug("EWrapperImpl.contractDetails")
}
def contractDetailsEnd(requestId: Int) {
//ewadapter.contractDetailsEnd(requestId);
- debug("EWrapperImpl.contractDetailsEnd")
+ logger.debug("EWrapperImpl.contractDetailsEnd")
}
def currentTime(currentTime: Long) {
- debug("EWrapperImpl.currentTime")
+ logger.debug("EWrapperImpl.currentTime")
}
def execDetails(id: Int, contract: Contract, execution: Execution) {
- debug("id" + id + ":contarct" + Util.toXML(contract) + "Execution" + Util.toXML(execution));
+ logger.debug("id" + id + ":contarct" + Util.toXML(contract) + "Execution" + Util.toXML(execution));
}
def fundamentalData(id: Int, data: String) {
- debug("" + id + "data" + Util.toXML(data));
+ logger.debug("" + id + "data" + Util.toXML(data));
}
def historicalData(reqId: Int, date: String, open: Double, high: Double, low: Double, close: Double, volume: Int, count: Int, wap: Double, hasgaps: Boolean) {
- debug("EWrapperImpl.historicalData")
+ logger.debug("EWrapperImpl.historicalData")
}
def managedAccounts(accounts: String) {
- debug("EWrapperImpl")
+ logger.debug("EWrapperImpl")
}
def nextValidId(nextValidId: Int) {
- camelContext.sendMesgToDestination(endPointUri, new TWSNextValidId(nextValidId))
+ AtsMessagingUtil.putMessageInQueue(queueName, new TWSNextValidId(nextValidId))
}
def openOrder(orderId: Int, contract: Contract, order: Order, orderState: OrderState) {
- debug("EWrapperImpl.openOrder")
+ logger.debug("EWrapperImpl.openOrder")
}
def orderStatus(orderId: Int, status: String, filled: Int, remaining: Int, avgFillPrice: Double, permId: Int, parentId: Int, lastFillPrice: Double, clientId: Int, whyHeld: String) {
- debug("EWrapperImpl.orderStatus")
+ logger.debug("EWrapperImpl.orderStatus")
}
def realtimeBar(arg0: Int, arg1: Long, arg2: Double, arg3: Double, arg4: Double, arg5: Double, arg6: Long, arg7: Double, arg8: Int) {
- debug("EWrapperImpl.realtimeBar")
+ logger.debug("EWrapperImpl.realtimeBar")
}
def receiveFA(arg0: Int, arg1: String) {
- debug("EWrapperImpl.receiveFA")
+ logger.debug("EWrapperImpl.receiveFA")
}
def scannerData(arg0: Int, arg1: Int, arg2: ContractDetails, arg3: String, arg4: String, arg5: String, arg6: String) {
- debug("EWrapperImpl.scannerData")
+ logger.debug("EWrapperImpl.scannerData")
}
def scannerDataEnd(arg0: Int) {
- debug("EWrapperImpl.scannerDataEnd")
+ logger.debug("EWrapperImpl.scannerDataEnd")
}
def scannerParameters(arg0: String) {
- debug("EWrapperImpl.scannerParameters")
+ logger.debug("EWrapperImpl.scannerParameters")
}
def tickEFP(arg0: Int, arg1: Int, arg2: Double, arg3: String, arg4: Double, arg5: Int, arg6: String, arg7: Double, arg8: Double) {
- debug("EWrapperImpl.tickEFP")
+ logger.debug("EWrapperImpl.tickEFP")
}
def tickGeneric(arg0: Int, arg1: Int, arg2: Double) {
- debug("EWrapperImpl.tickGeneric")
+ logger.debug("EWrapperImpl.tickGeneric")
}
def tickOptionComputation(arg0: Int, arg1: Int, arg2: Double, arg3: Double, arg4: Double, arg5: Double) {
- debug("EWrapperImpl.tickOptionComputation")
+ logger.debug("EWrapperImpl.tickOptionComputation")
}
/**
@@ -127,26 +127,26 @@ class EWrapperImpl() extends EWrapper with Logs {
**/
def tickPrice(tickerId: Int, field: Int, price: Double, canAutoExecute: Int) {
- debug("EWrapperImpl.tickPrice")
+ logger.debug("EWrapperImpl.tickPrice")
//this.ewadapter.tickPrice(tickerId, field, price, canAutoExecute);
}
def tickSize(arg0: Int, arg1: Int, arg2: Int) {
- debug("EWrapperImpl.tickSize")
+ logger.debug("EWrapperImpl.tickSize")
}
//void tickString(int tickerId, int tickType, String value)
def tickString(tickerId: Int, tickType: Int, value: String) {
//this.ewadapter.tickString(tickerId, tickType, value);
- debug("EWrapperImpl.tickString")
+ logger.debug("EWrapperImpl.tickString")
}
def updateAccountTime(time: String) {
- debug("EWrapperImpl.updateAccountTime")
+ logger.debug("EWrapperImpl.updateAccountTime")
- camelContext.sendMesgToDestination(endPointUri, new TWSAccountTime(time))
+ AtsMessagingUtil.putMessageInQueue(queueName, new TWSAccountTime(time))
}
/*
@@ -169,64 +169,64 @@ A string that indicates one type of account value. There is a long list of possi
*/
def updateAccountValue(key: String, value: String, currency: String, accountName: String) {
- debug("EWrapperImpl.updateAccountValue")
+ logger.debug("EWrapperImpl.updateAccountValue")
var account:TwsUpdateAccount = new TwsUpdateAccount(key,value,currency,accountName);
- camelContext.sendMesgToDestination(endPointUri, account);
+ AtsMessagingUtil.putMessageInQueue(queueName, account);
//this.ewadapter.updateAccountValue(key, value, currency, accountName);
}
def updateMktDepth(arg0: Int, arg1: Int, arg2: Int, arg3: Int, arg4: Double, arg5: Int) {
- debug("EWrapperImpl.updateMktDepth")
+ logger.debug("EWrapperImpl.updateMktDepth")
}
def updateMktDepthL2(arg0: Int, arg1: Int, arg2: String, arg3: Int, arg4: Int, arg5: Double, arg6: Int) {
- debug("EWrapperImpl.updateMktDepthL2")
+ logger.debug("EWrapperImpl.updateMktDepthL2")
}
def updateNewsBulletin(arg0: Int, arg1: Int, arg2: String, arg3: String) {
- debug("EWrapperImpl.updateNewsBulletin")
+ logger.debug("EWrapperImpl.updateNewsBulletin")
}
def updatePortfolio(contract: Contract, position: Int, marketPrice: Double, marketValue: Double, avgCost: Double, unrealizedPNL: Double, realizedPNL: Double, accountName: String) {
- debug("EWrapperImpl.updatePortfolio.contract"+Util.toXML(contract))
- debug("EWrapperImpl.updatePortfolio.position"+position)
- debug("EWrapperImpl.updatePortfolio.marketPrice"+marketPrice)
- debug("EWrapperImpl.updatePortfolio.marketValue"+marketValue)
- debug("EWrapperImpl.updatePortfolio.avgCost"+avgCost)
- debug("EWrapperImpl.updatePortfolio.unrealizedPNL"+unrealizedPNL)
- debug("EWrapperImpl.updatePortfolio.realizedPNL"+realizedPNL)
- debug("EWrapperImpl.updatePortfolio.accountName"+accountName)
+ logger.debug("EWrapperImpl.updatePortfolio.contract"+Util.toXML(contract))
+ logger.debug("EWrapperImpl.updatePortfolio.position"+position)
+ logger.debug("EWrapperImpl.updatePortfolio.marketPrice"+marketPrice)
+ logger.debug("EWrapperImpl.updatePortfolio.marketValue"+marketValue)
+ logger.debug("EWrapperImpl.updatePortfolio.avgCost"+avgCost)
+ logger.debug("EWrapperImpl.updatePortfolio.unrealizedPNL"+unrealizedPNL)
+ logger.debug("EWrapperImpl.updatePortfolio.realizedPNL"+realizedPNL)
+ logger.debug("EWrapperImpl.updatePortfolio.accountName"+accountName)
- camelContext.sendMesgToDestination(endPointUri, new TwsUpdatePortifolio(contract, position, marketPrice, marketValue, avgCost, unrealizedPNL, realizedPNL, accountName))
+ AtsMessagingUtil.putMessageInQueue(queueName, new TwsUpdatePortifolio(contract, position, marketPrice, marketValue, avgCost, unrealizedPNL, realizedPNL, accountName))
}
def connectionClosed() {
- debug("EWrapperImpl.connectionClosed")
- camelContext.sendMesgToDestination(endPointUri, TWSConnectionClosed)
+ logger.debug("EWrapperImpl.connectionClosed")
+ AtsMessagingUtil.putMessageInQueue(queueName, TWSConnectionClosed)
}
def error(ex: Exception) {
- error("EWrapperImpl.errorexception"+ex.getMessage,ex)
+ logger.error("EWrapperImpl.logger.errorexception"+ex.getMessage,ex)
val twsError:TwsError = new TwsError(null,ex,null,ex.getMessage);
- camelContext.sendMesgToDestination(endPointUri, twsError)
+ AtsMessagingUtil.putMessageInQueue(queueName, twsError)
}
override def error(error: String) {
- super[Logs].error(error);
+ logger.error(error);
val twsError:TwsError = new TwsError(null,null,null,error);
- camelContext.sendMesgToDestination(endPointUri, twsError)
+ AtsMessagingUtil.putMessageInQueue(queueName, twsError)
}
def error(tickerIdOrOrderId: Int, errorCode: Int, error: String) {
val twsError:TwsError = new TwsError(tickerIdOrOrderId,null,errorCode,error);
- camelContext.sendMesgToDestination(endPointUri, twsError)
+ AtsMessagingUtil.putMessageInQueue(queueName, twsError)
}
View
2  app/src/main/scala/com/ats/impl/tws/TwsClient.scala
@@ -43,7 +43,7 @@ class TwsClient {
var clientId: Integer = 1;
// publish messages from tws to this endpoint
- lazy val endpointUri = com.ats.common.Constants.SEND_MSG_TO_TWS;
+ lazy val endpointUri = com.ats.common.Constants.SEND_MSG_TO_BROKER;
@Autowired
var clientSocket: EClientSocket = null;
View
6 app/src/main/scala/com/ats/impl/tws/TwsStarter.scala
@@ -1,7 +1,7 @@
package com.ats.impl.tws
-import com.ats.common.Constants
+import com.ats.common.{AtsMessagingUtil, Constants}
import messages.TWSConnect
import org.slf4j.{Logger, LoggerFactory}
import org.springframework.stereotype.Component
@@ -29,8 +29,8 @@ class TwsStarter {
def connectTws() {
logger.debug("Comes to connectTws")
val connectMsg = new TWSConnect();
- logger.debug("Sending connect msg to endpoint",Constants.SEND_MSG_TO_TWS);
- camelContext.sendMesgToDestination(Constants.SEND_MSG_TO_TWS, connectMsg);
+ logger.debug("Sending connect msg to endpoint",Constants.SEND_MSG_TO_BROKER);
+ AtsMessagingUtil.putMessageInQueue(Constants.SEND_MSG_TO_BROKER, connectMsg);
}
def requestMarketData() {
View
23 app/src/main/scala/com/ats/impl/tws/config/camel/TwsConfigRouteBuilder.scala
@@ -1,23 +0,0 @@
-package com.ats.impl.tws.config.camel
-
-import org.apache.camel.builder.RouteBuilder
-import com.ats.impl.tws.TwsClient
-;
-
-//import org.apache.camel.scala.dsl.builder.RouteBuilder
-
-/**
- * Created with IntelliJ IDEA.
- * User: mchapala
- * Date: 3/1/13
- * Time: 10:28 AM
- * To change this template use File | Settings | File Templates.
- */
-class TwsConfigRouteBuilder extends RouteBuilder{
-
- def configure() {
- from(com.ats.common.Constants.SEND_MSG_TO_TWS).beanRef("twsClient", "receive");
- from(com.ats.common.Constants.RECEIVE_MSG_FROM_TWS).beanRef("twsClient", "receive");
- }
-
-}
View
3  app/src/main/scala/com/ats/start/AtsApp.scala
@@ -14,7 +14,7 @@ import org.springframework.context.annotation.AnnotationConfigApplicationContext
import com.ats.app.config.spring.{GatewayRouteBuilder, AtsCamelContext, AtsSpringConfig}
import org.slf4j.{Logger, LoggerFactory}
-import com.ats.impl.tws.config.camel.TwsConfigRouteBuilder
+
import com.ats.impl.tws.TwsStarter
@@ -33,7 +33,6 @@ object AtsApp {
val twsStarter:TwsStarter = springContext.getBean(classOf[TwsStarter])
logger.debug("Global.Tws Started..End")
camelContext.addRoute(new GatewayRouteBuilder)
- camelContext.addRoute(new TwsConfigRouteBuilder)
camelContext.onStartUp();
twsStarter.init();
}
View
23 project/Build.scala
@@ -30,15 +30,6 @@ object ApplicationBuild extends Build {
object Dependencies {
object Compile {
- // Compile
- //exclude("org.slf4j", "slf4j-api") // ApacheV2
-
- val config = "com.typesafe" % "config" % "1.0.0"
- // ApacheV2
- val netty = "io.netty" % "netty" % "3.6.1.Final" // ApacheV2
-
- val junit = "junit" % "junit" % "4.11" % "test"
-
// Common Public License 1.0
val logback = "ch.qos.logback" % "logback-classic" % "1.0.7" % "test"
val slf4japi = "org.slf4j" % "slf4j-api" % "1.7.2"
@@ -48,7 +39,6 @@ object ApplicationBuild extends Build {
val mockito = "org.mockito" % "mockito-all" % "1.8.1" % "test" // MIT
-
val xalan = "xalan" % "xalan" % "2.7.1"
val xpp3 = "xpp3" % "xpp3_min" % "1.1.4c"
val xstream = "com.thoughtworks.xstream" % "xstream" % "1.4.3"
@@ -62,17 +52,15 @@ object ApplicationBuild extends Build {
val akka = "com.typesafe.akka" % "akka-actor_2.10" % "2.1.0"
-// val neodatis = "org.neodatis.odb" % "neodatis-odb" % "1.9.30.689"
-
- val hazelcast = "com.hazelcast" % "hazelcast" % "2.5"
+ // TODO add maven artifact right now in lib folder
+ //val hazelcast = "com.hazelcast" % "hazelcast" % "3.0-SNAPSHOT"
val springcontext = "org.springframework" % "spring-context" % "3.2.2.RELEASE"
val springcore = "org.springframework" % "spring-core" % "3.2.2.RELEASE"
val springtx = "org.springframework" % "spring-tx" % "3.2.2.RELEASE"
val springtest = "org.springframework" % "spring-test" % "3.2.2.RELEASE" % "test"
- //val spring-core = "org.springframework" % "spring-context" % "3.2.1"
- //val spring-core = "org.springframework" % "spring-context" % "3.2.1"
+
val springdata = "org.springframework.data" % "spring-data-jpa" % "1.3.1.RELEASE"
val hibernate ="org.hibernate.javax.persistence" % "hibernate-jpa-2.0-api" % "1.0.0.Final"
@@ -81,7 +69,6 @@ object ApplicationBuild extends Build {
val camelspring = "org.apache.camel" % "camel-spring" % "2.10.4"
val camelscala = "org.apache.camel" % "camel-scala" % "2.10.4"
val camelCore = "org.apache.camel" % "camel-core" % "2.10.4"
- val camelAkka = "com.typesafe.akka" % "akka-camel_2.10" % "2.1.0"
val camelspringjavaconfig = "org.apache.camel" % "camel-spring-javaconfig" % "2.10.4"
val dbcp = "commons-dbcp" % "commons-dbcp" % "1.4"
@@ -95,8 +82,8 @@ object ApplicationBuild extends Build {
import Compile._
- val appDependencies = Seq(guice,scalatest,springtest,testng,hibernate,hibernateentity,dbcp,springdata,springtx,ib,camelCore,camelspring, camelAkka,camelscala,config, junit, logback, xstream,akka,slf4japi,slf4jimpl,scalareflect,config,hazelcast,springcore,springcontext,h2db)
- val actor = Seq(config)
+ val appDependencies = Seq(guice,scalatest,springtest,testng,hibernate,hibernateentity,dbcp,springdata,springtx,ib,camelCore,camelspring, camelscala,logback, xstream,akka,slf4japi,slf4jimpl,scalareflect,springcore,springcontext,h2db)
+
}
}
Please sign in to comment.
Something went wrong with that request. Please try again.