Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Initial checkin

  • Loading branch information...
commit 2527c639cc42869447f97e895982c51c10a5ea69 0 parents
Andrey Paramonov authored
5 .gitignore
@@ -0,0 +1,5 @@
+/.classpath
+/.project
+/.settings
+/target
+
100 pom.xml
@@ -0,0 +1,100 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>com.ndpar</groupId>
+ <artifactId>tibrv-rabbitmq</artifactId>
+ <packaging>war</packaging>
+ <version>1.0-SNAPSHOT</version>
+
+ <properties>
+ <tibrvj.version>7.5.4</tibrvj.version>
+ <rabbitmq.version>2.1.0</rabbitmq.version>
+ <spring.framework.version>3.0.5.RELEASE</spring.framework.version>
+ <spring.amqp.version>1.0.0.BUILD-SNAPSHOT</spring.amqp.version>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>2.0.2</version>
+ <configuration>
+ <source>1.6</source>
+ <target>1.6</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.mortbay.jetty</groupId>
+ <artifactId>maven-jetty-plugin</artifactId>
+ <version>6.1.14</version>
+ <configuration>
+ <scanIntervalSeconds>10</scanIntervalSeconds>
+ <connectors>
+ <connector implementation="org.mortbay.jetty.nio.SelectChannelConnector">
+ <port>8888</port>
+ <maxIdleTime>60000</maxIdleTime>
+ </connector>
+ </connectors>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <dependencies>
+ <!-- Tibco RV -->
+ <dependency>
+ <groupId>com.tibco</groupId>
+ <artifactId>tibrvj</artifactId>
+ <version>${tibrvj.version}</version>
+ </dependency>
+
+ <!-- RabbitMQ -->
+ <dependency>
+ <groupId>com.rabbitmq</groupId>
+ <artifactId>amqp-client</artifactId>
+ <version>${rabbitmq.version}</version>
+ </dependency>
+
+ <!-- Spring AMQP -->
+ <dependency>
+ <groupId>org.springframework.amqp</groupId>
+ <artifactId>spring-amqp</artifactId>
+ <version>${spring.amqp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.amqp</groupId>
+ <artifactId>spring-rabbit</artifactId>
+ <version>${spring.amqp.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.amqp</groupId>
+ <artifactId>spring-rabbit-admin</artifactId>
+ <version>${spring.amqp.version}</version>
+ </dependency>
+
+ <!-- Spring Framework -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${spring.framework.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-web</artifactId>
+ <version>${spring.framework.version}</version>
+ </dependency>
+
+ <!-- Misc -->
+ <dependency>
+ <artifactId>commons-logging</artifactId>
+ <groupId>commons-logging</groupId>
+ <version>1.1.1</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.13</version>
+ </dependency>
+ </dependencies>
+</project>
8 src/main/java/com/ndpar/tibrv/MessageListener.java
@@ -0,0 +1,8 @@
+package com.ndpar.tibrv;
+
+import com.tibco.tibrv.TibrvMsg;
+
+public interface MessageListener {
+
+ void onMessage(TibrvMsg message);
+}
154 src/main/java/com/ndpar/tibrv/SimpleMessageListenerContainer.java
@@ -0,0 +1,154 @@
+package com.ndpar.tibrv;
+
+import java.util.concurrent.Executor;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.beans.factory.BeanNameAware;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+
+import com.tibco.tibrv.Tibrv;
+import com.tibco.tibrv.TibrvDispatcher;
+import com.tibco.tibrv.TibrvException;
+import com.tibco.tibrv.TibrvListener;
+import com.tibco.tibrv.TibrvMsg;
+import com.tibco.tibrv.TibrvMsgCallback;
+import com.tibco.tibrv.TibrvQueue;
+import com.tibco.tibrv.TibrvRvdTransport;
+import com.tibco.tibrv.TibrvTransport;
+
+/**
+ * Small abstraction for Tibco RV to make it Spring friendly.
+ *
+ * @author Andrey Paramonov
+ */
+public class SimpleMessageListenerContainer implements TibrvMsgCallback, BeanNameAware {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ private String threadName;
+
+ private Executor taskExecutor;
+ private MessageListener messageListener;
+
+ private String service;
+ private String network;
+ private String daemon;
+ private String subject;
+
+ public void setTaskExecutor(Executor taskExecutor) {
+ this.taskExecutor = taskExecutor;
+ }
+
+ public void setMessageListener(MessageListener messageListener) {
+ this.messageListener = messageListener;
+ }
+
+ //---------------------------------------------------------------
+ // Configuration
+ //---------------------------------------------------------------
+
+ @ManagedAttribute
+ public String getThreadName() {
+ return threadName;
+ }
+
+ @ManagedAttribute
+ public String getService() {
+ return service;
+ }
+
+ public void setService(String service) {
+ this.service = service;
+ }
+
+ @ManagedAttribute
+ public String getNetwork() {
+ return network;
+ }
+
+ public void setNetwork(String network) {
+ this.network = network;
+ }
+
+ @ManagedAttribute
+ public String getDaemon() {
+ return daemon;
+ }
+
+ public void setDaemon(String daemon) {
+ this.daemon = daemon;
+ }
+
+ @ManagedAttribute
+ public String getSubject() {
+ return subject;
+ }
+
+ public void setSubject(String subject) {
+ this.subject = subject;
+ }
+
+ //---------------------------------------------------------------
+ // Implementation of BeanNameAware interface
+ //---------------------------------------------------------------
+
+ @Override
+ public void setBeanName(String name) {
+ this.threadName = name;
+ }
+
+ //---------------------------------------------------------------
+ // TibRv life cycle
+ //---------------------------------------------------------------
+
+ @PostConstruct
+ public void init() {
+ try {
+ Tibrv.open(Tibrv.IMPL_NATIVE);
+ TibrvTransport transport = new TibrvRvdTransport(service, network, daemon);
+ TibrvQueue queue = new TibrvQueue();
+ new TibrvListener(queue, this, transport, subject, null);
+ new TibrvDispatcher(threadName, queue);
+
+ log.info(String.format("TibRV listener initialized: service=%s; network=%s; daemon=%s; subject=%s",
+ service, network, daemon, subject));
+
+ } catch (Exception e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ try {
+ Tibrv.close();
+ } catch (TibrvException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ //---------------------------------------------------------------
+ // Implementation of TibrvMsgCallback interface
+ //---------------------------------------------------------------
+
+ @Override
+ public void onMsg(TibrvListener tibrvlistener, final TibrvMsg message) {
+ if (log.isTraceEnabled()) log.trace(message);
+ if (messageListener == null) {
+ throw new IllegalStateException("No message listener specified - see property 'messageListener'");
+ }
+ if (taskExecutor != null) {
+ taskExecutor.execute(new Runnable() {
+ @Override public void run() {
+ messageListener.onMessage(message);
+ }
+ });
+ } else {
+ messageListener.onMessage(message);
+ }
+ }
+}
31 src/main/java/com/ndpar/tibrv/rabbit/AmqpMessageConsumer.java
@@ -0,0 +1,31 @@
+package com.ndpar.tibrv.rabbit;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessageListener;
+
+import com.tibco.tibrv.TibrvMsg;
+
+public class AmqpMessageConsumer implements MessageListener {
+
+ protected final Log log = LogFactory.getLog(getClass());
+
+ private AtomicLong count = new AtomicLong(0);
+
+ @Override
+ public void onMessage(Message message) {
+ long c = count.incrementAndGet();
+ if (c % 1000 == 0) {
+ log.debug(count);
+ try {
+ new TibrvMsg(message.getBody());
+ log.debug(message.toString() + " <<" + message.getBody().length + ">>");
+ } catch (Exception e) {
+ new RuntimeException(e);
+ }
+ }
+ }
+}
71 src/main/java/com/ndpar/tibrv/rabbit/RabbitmqAdaptor.java
@@ -0,0 +1,71 @@
+package com.ndpar.tibrv.rabbit;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.annotation.PostConstruct;
+import javax.annotation.PreDestroy;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jmx.export.annotation.ManagedAttribute;
+import org.springframework.jmx.export.annotation.ManagedResource;
+
+import com.ndpar.tibrv.MessageListener;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.tibco.tibrv.TibrvMsg;
+
+@ManagedResource(objectName = "com.ndpar:type=tibrv,id=RabbitmqAdaptor")
+public class RabbitmqAdaptor implements MessageListener {
+
+ private AtomicLong count = new AtomicLong(0);
+
+ @Autowired
+ private ConnectionFactory cf;
+ private String exchange;
+
+ private Connection conn;
+ private Channel channel;
+
+
+ public void setExchange(String exchange) {
+ this.exchange = exchange;
+ }
+
+ @PostConstruct
+ public void init() {
+ try {
+ conn = cf.newConnection();
+ channel = conn.createChannel();
+ } catch (IOException e) {
+ new RuntimeException(e);
+ }
+ }
+
+ @PreDestroy
+ public void destroy() {
+ try {
+ channel.close();
+ conn.close();
+ } catch (IOException e) {
+ new RuntimeException(e);
+ }
+ }
+
+ @ManagedAttribute
+ public long getMessageCount() {
+ return count.get();
+ }
+
+ @Override
+ public void onMessage(TibrvMsg message) {
+ try {
+ count.incrementAndGet();
+ String routing = message.getSendSubject();
+ channel.basicPublish(exchange, routing, null, message.getAsBytes());
+ } catch (Exception e) {
+ new RuntimeException(e);
+ }
+ }
+}
87 src/main/resources/applicationContext.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns:p="http://www.springframework.org/schema/p"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd">
+
+ <context:component-scan base-package="com.ndpar" />
+ <context:mbean-export />
+
+ <!-- Tibco RV containers -->
+
+ <bean id="fxTibrvListenerContainer" class="com.ndpar.tibrv.SimpleMessageListenerContainer"
+ p:service="51011"
+ p:network=";239.255.10.7"
+ p:daemon="tcp:localhost:7500"
+ p:subject=">"
+ p:messageListener-ref="tibrvRabbitAdaptor"
+ p:taskExecutor-ref="executor" />
+
+ <bean id="stockTibrvListenerContainer" class="com.ndpar.tibrv.SimpleMessageListenerContainer"
+ p:service="17785"
+ p:network=";239.255.10.42"
+ p:daemon="tcp:localhost:7500"
+ p:subject=">"
+ p:messageListener-ref="tibrvRabbitAdaptor"
+ p:taskExecutor-ref="executor" />
+
+ <bean id="executor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"
+ p:corePoolSize="2" p:maxPoolSize="5" p:queueCapacity="10">
+ <property name="rejectedExecutionHandler">
+ <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
+ </property>
+ </bean>
+
+
+ <!-- TibRV-RabbitMQ adaptor -->
+
+ <bean id="tibrvRabbitAdaptor" class="com.ndpar.tibrv.rabbit.RabbitmqAdaptor"
+ p:exchange="ndpar.topic" />
+
+ <bean id="rabbitConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"
+ p:username="guest" p:password="guest" p:port="5672" p:host="localhost" />
+
+
+ <!-- Spring-AMQP client -->
+
+ <bean class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer"
+ p:connectionFactory-ref="amqpConnectionFactory"
+ p:queueName="ndpar.queue"
+ p:messageListener-ref="amqpMessageListener" />
+
+ <bean id="amqpConnectionFactory" class="org.springframework.amqp.rabbit.connection.SingleConnectionFactory"
+ p:username="guest" p:password="guest" p:port="5672">
+ <constructor-arg value="localhost" />
+ </bean>
+
+ <bean id="amqpMessageListener" class="com.ndpar.tibrv.rabbit.AmqpMessageConsumer" />
+
+
+ <!-- Spring-AMQP admin (optional, to create infrastructure) -->
+
+ <bean id="rabbitAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
+ <constructor-arg ref="amqpConnectionFactory" />
+ </bean>
+
+ <bean id="topic" class="org.springframework.amqp.core.TopicExchange">
+ <constructor-arg value="ndpar.topic" />
+ </bean>
+
+ <bean id="queue" class="org.springframework.amqp.core.Queue">
+ <constructor-arg value="ndpar.queue" />
+ </bean>
+
+ <bean id="usdCcyRates" class="org.springframework.amqp.core.Binding">
+ <constructor-arg index="0" ref="queue" />
+ <constructor-arg index="1" ref="topic" />
+ <constructor-arg index="2" value="#.USD.#" />
+ </bean>
+
+ <bean id="usdStockPrices" class="org.springframework.amqp.core.Binding">
+ <constructor-arg index="0" ref="queue" />
+ <constructor-arg index="1" ref="topic" />
+ <constructor-arg index="2" value="USD.#" />
+ </bean>
+</beans>
25 src/main/resources/log4j.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Threshold" value="TRACE" />
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p [%t:%c] %m%n" />
+ </layout>
+ </appender>
+
+ <logger name="com.ndpar">
+ <level value="TRACE" />
+ </logger>
+
+ <logger name="org.springframework.amqp">
+ <level value="INFO" />
+ </logger>
+
+ <root>
+ <level value="DEBUG" />
+ <appender-ref ref="CONSOLE" />
+ </root>
+
+</log4j:configuration>
16 src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,16 @@
+<!DOCTYPE web-app PUBLIC "-//Sun Microsystems, Inc.//DTD Web Application 2.3//EN" "http://java.sun.com/dtd/web-app_2_3.dtd" >
+
+<web-app>
+ <context-param>
+ <param-name>contextConfigLocation</param-name>
+ <param-value>classpath:applicationContext.xml</param-value>
+ </context-param>
+
+ <listener>
+ <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
+ </listener>
+
+ <listener>
+ <listener-class>org.springframework.web.context.request.RequestContextListener</listener-class>
+ </listener>
+</web-app>

0 comments on commit 2527c63

Please sign in to comment.
Something went wrong with that request. Please try again.