Permalink
Browse files

Add file to queue and queue to file examples

  • Loading branch information...
1 parent 0ee7d04 commit 672fc1e93489f3e9c5421baba9a461d1168021ca @reidmorrison committed Sep 8, 2011
View
@@ -1,3 +1,13 @@
+## 0.11.2 (2011-06-01)
+
+* Issue #8 Add ability to set Producer delivery mode using a Symbol
+* Include ActiveMQ InVM working example along with Log4J properties file
+
+## 0.11.1 (2011-05-25)
+
+* Fixes the condition where a bad session keeps being re-used in a session pool.
+ It is now removed from the pool anytime a JMS exception has occurred
+
## 0.11.0 (2011-04-18)
* Compatibility with JRuby 1.6
View
@@ -9,7 +9,7 @@ desc "Build gem"
task :gem do |t|
gemspec = Gem::Specification.new do |s|
s.name = 'jruby-jms'
- s.version = '0.11.0'
+ s.version = '0.11.2'
s.author = 'Reid Morrison'
s.email = 'rubywmq@gmail.com'
s.homepage = 'https://github.com/reidmorrison/jruby-jms'
@@ -0,0 +1,51 @@
+#
+# Example : files_to_q : Place all files in a directory to a queue
+# Each file is written as a separate message
+# Place the data in a file ending with '.data'
+# and the header information in a file with same name, but with an
+# extension of '.yml'
+#
+# jruby files_to_q.rb activemq my_queue
+#
+
+# Allow examples to be run in-place without requiring a gem install
+$LOAD_PATH.unshift File.dirname(__FILE__) + '/../../lib'
+
+require 'rubygems'
+require 'jms'
+require 'yaml'
+
+raise("Required Parameters: 'jms_provider' 'queue_name' 'input_directory'") unless ARGV.count >= 2
+jms_provider = ARGV[0]
+queue_name = ARGV[1]
+path = ARGV[2] || queue_name
+
+# Load Connection parameters from configuration file
+config = YAML.load_file(File.join(File.dirname(__FILE__), '..', 'jms.yml'))[jms_provider]
+raise "JMS Provider option:#{jms_provider} not found in jms.yml file" unless config
+
+counter = 0
+# Consume all available messages on the queue
+JMS::Connection.session(config) do |session|
+ session.producer(:queue_name => queue_name) do |producer|
+ Dir.glob(File.join(path,'*.data')) do |filename|
+ unless File.directory?(filename)
+ printf("%5d: #{filename}\n",counter = counter + 1)
+ data = File.open(filename, 'rb') {|file| file.read }
+ header_filename = File.join(File.dirname(filename), File.basename(filename))
+ header_filename = header_filename[0, header_filename.length - '.data'.length] + '.yml'
+ header = File.exist?(header_filename) ? YAML.load_file(header_filename) : nil
+ message = session.message(data, :bytes)
+ if header
+ header[:attributes].each_pair do |k,v|
+ next if k == :jms_destination
+ message.send("#{k}=".to_sym, v) if message.respond_to?("#{k}=".to_sym)
+ end if header[:attributes]
+ message.properties = header[:properties] || {}
+ end
+ producer.send(message)
+ end
+ end
+ end
+end
+puts "Read #{counter} messages from #{path} and wrote to #{queue_name}"
@@ -0,0 +1,44 @@
+#
+# Example: q_to_files:
+# Copy all messages in a queue to separate files in a directory
+# The messages are left on the queue by
+#
+# jruby q_to_files.rb activemq my_queue
+#
+
+# Allow examples to be run in-place without requiring a gem install
+$LOAD_PATH.unshift File.dirname(__FILE__) + '/../../lib'
+
+require 'rubygems'
+require 'jms'
+require 'yaml'
+require 'fileutils'
+
+raise("Required Parameters: 'jms_provider' 'queue_name' 'output_directory'") unless ARGV.count >= 2
+jms_provider = ARGV[0]
+queue_name = ARGV[1]
+path = ARGV[2] || queue_name
+
+# Load Connection parameters from configuration file
+config = YAML.load_file(File.join(File.dirname(__FILE__), '..', 'jms.yml'))[jms_provider]
+raise "JMS Provider option:#{jms_provider} not found in jms.yml file" unless config
+
+# Create supplied path if it does not exist
+FileUtils.mkdir_p(path)
+
+counter = 0
+# Consume all available messages on the queue
+JMS::Connection.session(config) do |session|
+ session.browse(:queue_name => queue_name, :timeout=>1000) do |message|
+ counter += 1
+ filename = File.join(path, "message_%03d" % counter)
+ File.open(filename+'.data', 'wb') {|file| file.write(message.data) }
+ header = {
+ :attributes => message.attributes,
+ :properties => message.properties
+ }
+ File.open(filename+'.yml', 'wb') {|file| file.write(header.to_yaml) }
+ end
+end
+
+puts "Saved #{counter} messages to #{path}"
View
@@ -0,0 +1,44 @@
+#
+# Sample ActiveMQ InVM Example:
+# Write to a queue and then consume the message in a separate thread
+#
+# Note: This example only works with ActiveMQ
+# Update the jar files path in ../jms.yml to point to your ActiveMQ installation
+
+# Allow examples to be run in-place without requiring a gem install
+$LOAD_PATH.unshift File.dirname(__FILE__) + '/../../lib'
+
+require 'rubygems'
+require 'yaml'
+require 'jms'
+require 'benchmark'
+
+# Set Log4J properties file so that it does not need to be in the CLASSPATH
+java.lang.System.properties['log4j.configuration'] = "file://#{File.join(File.dirname(__FILE__), 'log4j.properties')}"
+
+jms_provider = 'activemq-invm'
+
+# Load Connection parameters from configuration file
+config = YAML.load_file(File.join(File.dirname(__FILE__), '..', 'jms.yml'))[jms_provider]
+raise "JMS Provider option:#{jms_provider} not found in jms.yml file" unless config
+
+JMS::Connection.start(config) do |connection|
+ # Consume messages in a separate thread
+ connection.on_message(:queue_name => 'ExampleQueue') do |message|
+ JMS::logger.info "Consumed message from ExampleQueue: '#{message.data}'"
+ end
+
+ # Send a single message within a new session
+ connection.session do |session|
+ session.producer(:queue_name => 'ExampleQueue') do |producer|
+ producer.send(session.message("Hello World. #{Time.now}"))
+ end
+ end
+
+ JMS::logger.info "Put message on ExampleQueue"
+
+ # Give the consume thread time to process the message before terminating
+ sleep 1
+
+ JMS::logger.info "Shutting down"
+end
@@ -0,0 +1,58 @@
+#
+# This file controls most of the logging in ActiveMQ which is mainly based around
+# the commons logging API.
+#
+log4j.rootLogger=INFO, logfile, console
+log4j.logger.org.apache.activemq.spring=WARN
+log4j.logger.org.apache.activemq.web.handler=WARN
+log4j.logger.org.springframework=WARN
+log4j.logger.org.apache.xbean=WARN
+log4j.logger.org.apache.camel=INFO
+
+# When debugging or reporting problems to the ActiveMQ team,
+# comment out the above lines and uncomment the next.
+
+#log4j.rootLogger=DEBUG, logfile, console
+
+# Or for more fine grained debug logging uncomment one of these
+#log4j.logger.org.apache.activemq=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
+
+# Console appender
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{ISO8601} %-5p: %m [%c %t]%n
+log4j.appender.console.threshold=INFO
+
+# File appender
+log4j.appender.logfile=org.apache.log4j.RollingFileAppender
+log4j.appender.logfile.file=activemq.log
+log4j.appender.logfile.maxFileSize=10240KB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.logfile.append=true
+log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
+log4j.appender.logfile.layout.ConversionPattern=%d{ISO8601} %-5p: %m [%c %t]%n
+# use some of the following patterns to see MDC logging data
+#
+# %X{activemq.broker}
+# %X{activemq.connector}
+# %X{activemq.destination}
+#
+# e.g.
+#
+# log4j.appender.logfile.layout.ConversionPattern=%d | %-20.20X{activemq.connector} | %-5p | %m | %c | %t%n
+
+###########
+# Audit log
+###########
+
+log4j.additivity.org.apache.activemq.audit=false
+log4j.logger.org.apache.activemq.audit=INFO, audit
+
+log4j.appender.audit=org.apache.log4j.FileAppender
+log4j.appender.audit.file=activemq-audit.log
+log4j.appender.logfile.maxFileSize=10240KB
+log4j.appender.logfile.maxBackupIndex=5
+log4j.appender.audit.append=true
+log4j.appender.audit.layout=org.apache.log4j.PatternLayout
+log4j.appender.audit.layout.ConversionPattern=%d{ISO8601} %-5p: %m [%c %t]%n
View
@@ -14,26 +14,28 @@ activemq:
:username: system
:password: manager
:require_jars:
- - ~/Applications/apache-activemq-5.4.2-fuse-03-09/activemq-all-5.4.2-fuse-03-09.jar
-# ActiveMQ 5.5 Requires additional jar files
-# - ~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar
-# - ~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar
-# - ~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar
+ - ~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar
# ActiveMQ In VM Broker (Supports messaging within a JVM instance)
-activemq-vm:
+activemq-invm:
:factory: org.apache.activemq.ActiveMQConnectionFactory
- :broker_url: vm://127.0.0.1
+ :broker_url: vm://mybroker
:object_message_serialization_defered: true
:require_jars:
- - ~/Applications/apache-activemq-5.4.2-fuse-03-09/activemq-all-5.4.2-fuse-03-09.jar
+ - ~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar
# ActiveMQ with failover to slave instance
activemq-ha:
:factory: org.apache.activemq.ActiveMQConnectionFactory
:broker_url: failover://(tcp://msg1:61616,tcp://msg2:61616)?randomize=false&timeout=30000&initialReconnectDelay=100&useExponentialBackOff=true
:require_jars:
- - ~/Applications/apache-activemq-5.4.2-fuse-03-09/activemq-all-5.4.2-fuse-03-09.jar
+ - ~/Applications/apache-activemq-5.5.0/activemq-all-5.5.0.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/slf4j-log4j12-1.5.11.jar
+ - ~/Applications/apache-activemq-5.5.0/lib/optional/log4j-1.2.14.jar
# JBoss 4 Messaging
:jndi_name: ConnectionFactory
@@ -18,7 +18,7 @@
JMS::Connection.session(config) do |session|
session.producer(:queue_name => 'ExampleQueue') do |producer|
- producer.delivery_mode = JMS::DeliveryMode::NON_PERSISTENT
+ producer.delivery_mode_sym = :non_persistent
producer.send(session.message("Hello World: #{Time.now}"))
JMS::logger.info "Successfully sent one message to queue ExampleQueue"
end
View
@@ -121,6 +121,7 @@ def fetch_dependencies(jar_list)
require 'jms/object_message'
require 'jms/session'
require 'jms/message_consumer'
+ require 'jms/message_producer'
require 'jms/queue_browser'
require 'jms/oracle_a_q_connection_factory'
end
View
@@ -225,8 +225,8 @@ def jms_delivery_mode_sym=(mode)
def attributes
{
:jms_correlation_id => jms_correlation_id,
- :jms_delivery_mode => jms_delivery_mode_sym,
- :jms_destination => jms_destination,
+ :jms_delivery_mode_sym => jms_delivery_mode_sym,
+ :jms_destination => jms_destination.nil? ? nil : jms_destination.to_string,
:jms_expiration => jms_expiration,
:jms_message_id => jms_message_id,
:jms_priority => jms_priority,
@@ -0,0 +1,59 @@
+################################################################################
+# Copyright 2008, 2009, 2010, 2011 J. Reid Morrison
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Extend JMS Message Producer Interface with Ruby methods
+#
+# For further help on javax.jms.Message
+# http://download.oracle.com/javaee/6/api/javax/jms/MessageProducer.html
+#
+# Interface javax.jms.Producer
+module JMS::MessageProducer
+
+ # Return the Delivery Mode as a Ruby symbol
+ # :persistent
+ # :non_persistent
+ # nil if unknown
+ def delivery_mode_sym
+ case delivery_mode
+ when JMS::DeliveryMode::PERSISTENT
+ :persistent
+ when JMS::DeliveryMode::NON_PERSISTENT
+ :non_persistent
+ else
+ nil
+ end
+ end
+
+ # Set the JMS Delivery Mode from a Ruby Symbol
+ # Valid values for mode
+ # :persistent
+ # :non_persistent
+ #
+ # Example:
+ # producer.delivery_mode_sym = :persistent
+ def delivery_mode_sym=(mode)
+ val = case mode
+ when :persistent
+ JMS::DeliveryMode::PERSISTENT
+ when :non_persistent
+ JMS::DeliveryMode::NON_PERSISTENT
+ else
+ raise "Unknown delivery mode symbol: #{mode}"
+ end
+ self.delivery_mode = val
+ end
+
+end
Oops, something went wrong.

0 comments on commit 672fc1e

Please sign in to comment.