From cb74e1b916cbc09782abcfb8341f961d73e5c3a7 Mon Sep 17 00:00:00 2001 From: Vincent Landgraf Date: Thu, 28 Aug 2008 14:20:29 +0200 Subject: [PATCH] added a new queue that stores it's messages in the file system and added a test and a sample config to config.ru --- default-server/config.ru | 12 +++++++----- lib/fmq/queues/file_persistent.rb | 32 ++++++++++++++++++++++++++----- test/test_file_persistent.rb | 18 +++++++++++++++++ 3 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 test/test_file_persistent.rb diff --git a/default-server/config.ru b/default-server/config.ru index 0e777bf..bb7e26c 100644 --- a/default-server/config.ru +++ b/default-server/config.ru @@ -51,11 +51,13 @@ queue_manager = FreeMessageQueue::QueueManager.new(true) do q.forward_to = ["/fmq_test/test1", "/fmq_test/test2"] end - queue_manager = FreeMessageQueue::QueueManager.new(true) do - setup_queue "/fmq_test/file_persistent", FreeMessageQueue::FilePersistentQueue do |q| - q.folder = "./tmp/mail_box/threez" - q.max_messages = 10000 - end + # this is a file system queue that will save each method + # in the file system until they are polled + # the message is useful if you want to have a queue that will + # save it's state so that you can reboot the server + setup_queue "/fmq_test/file_persistent", FreeMessageQueue::FilePersistentQueue do |q| + q.folder = "./tmp/mail_box/threez" + q.max_messages = 10_000 end end diff --git a/lib/fmq/queues/file_persistent.rb b/lib/fmq/queues/file_persistent.rb index 0f3cf4c..bd4c786 100644 --- a/lib/fmq/queues/file_persistent.rb +++ b/lib/fmq/queues/file_persistent.rb @@ -33,20 +33,23 @@ module FreeMessageQueue class FilePersistentQueue < BaseQueue # Return the def poll() - messages = Dir[@folder_path + "/*.msg"].sort - first_file = messages.first + check_folder_name + messages = all_messages.sort! + return nil if messages.size == 0 - msg_bin = File.open(first_file, "rb") { |f| f.read } + msg_bin = File.open(messages.first, "rb") { |f| f.read } + FileUtils.rm messages.first remove_message(Marshal.load(msg_bin)) end # add one message to the queue (will be saved in file system) def put(message) + check_folder_name return false if message.nil? add_message(message) # check constraints and update stats - msg_bin = Marshal.dump(msg) + msg_bin = Marshal.dump(message) File.open(@folder_path + "/#{Time.now.to_f}.msg", "wb") do |f| f.write msg_bin end @@ -59,6 +62,25 @@ def put(message) def folder=(path) FileUtils.mkdir_p path unless File.exist? path @folder_path = path - end + end + + # remove all items from the queue + def clear + FileUtils.rm all_messages + @size = 0 + @bytes = 0 + end + + private + + # returns an array with all paths to queue messages + def all_messages + Dir[@folder_path + "/*.msg"] + end + + # raise an exceptin if the folder name is not set + def check_folder_name + raise QueueException.new("[FilePersistentQueue] The folder_path need to be specified", caller) if @folder_path.nil? + end end end diff --git a/test/test_file_persistent.rb b/test/test_file_persistent.rb new file mode 100644 index 0000000..4af90d9 --- /dev/null +++ b/test/test_file_persistent.rb @@ -0,0 +1,18 @@ +require File.dirname(__FILE__) + '/test_helper.rb' +require "fileutils" + +# This is the default test to the message interface +class TestFilePersistentQueue < Test::Unit::TestCase + include FifoQueueTests + + def setup + manager = nil # the manager is not needed in this test + @queue = FreeMessageQueue::FilePersistentQueue.new(manager) + @queue.folder = "/tmp/fmq/FilePersistentQueue/testqueue" + end + + def teardown + # remove the test data + FileUtils.rm_rf "/tmp/fmq" + end +end