Skip to content

Commit

Permalink
added a new queue that stores it's messages in the file system and ad…
Browse files Browse the repository at this point in the history
…ded a test and a sample config to config.ru
  • Loading branch information
Vincent Landgraf committed Aug 28, 2008
1 parent 6b231a8 commit cb74e1b
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 10 deletions.
12 changes: 7 additions & 5 deletions default-server/config.ru
Expand Up @@ -51,11 +51,13 @@ queue_manager = FreeMessageQueue::QueueManager.new(true) do
q.forward_to = ["/fmq_test/test1", "/fmq_test/test2"]
end

queue_manager = FreeMessageQueue::QueueManager.new(true) do
setup_queue "/fmq_test/file_persistent", FreeMessageQueue::FilePersistentQueue do |q|
q.folder = "./tmp/mail_box/threez"
q.max_messages = 10000
end
# this is a file system queue that will save each method
# in the file system until they are polled
# the message is useful if you want to have a queue that will
# save it's state so that you can reboot the server
setup_queue "/fmq_test/file_persistent", FreeMessageQueue::FilePersistentQueue do |q|
q.folder = "./tmp/mail_box/threez"
q.max_messages = 10_000
end
end

Expand Down
32 changes: 27 additions & 5 deletions lib/fmq/queues/file_persistent.rb
Expand Up @@ -33,20 +33,23 @@ module FreeMessageQueue
class FilePersistentQueue < BaseQueue
# Return the
def poll()
messages = Dir[@folder_path + "/*.msg"].sort
first_file = messages.first
check_folder_name
messages = all_messages.sort!
return nil if messages.size == 0

msg_bin = File.open(first_file, "rb") { |f| f.read }
msg_bin = File.open(messages.first, "rb") { |f| f.read }
FileUtils.rm messages.first
remove_message(Marshal.load(msg_bin))
end

# add one message to the queue (will be saved in file system)
def put(message)
check_folder_name
return false if message.nil?

add_message(message) # check constraints and update stats

msg_bin = Marshal.dump(msg)
msg_bin = Marshal.dump(message)
File.open(@folder_path + "/#{Time.now.to_f}.msg", "wb") do |f|
f.write msg_bin
end
Expand All @@ -59,6 +62,25 @@ def put(message)
def folder=(path)
FileUtils.mkdir_p path unless File.exist? path
@folder_path = path
end
end

# remove all items from the queue
def clear
FileUtils.rm all_messages
@size = 0
@bytes = 0
end

private

# returns an array with all paths to queue messages
def all_messages
Dir[@folder_path + "/*.msg"]
end

# raise an exceptin if the folder name is not set
def check_folder_name
raise QueueException.new("[FilePersistentQueue] The folder_path need to be specified", caller) if @folder_path.nil?
end
end
end
18 changes: 18 additions & 0 deletions test/test_file_persistent.rb
@@ -0,0 +1,18 @@
require File.dirname(__FILE__) + '/test_helper.rb'
require "fileutils"

# This is the default test to the message interface
class TestFilePersistentQueue < Test::Unit::TestCase
include FifoQueueTests

def setup
manager = nil # the manager is not needed in this test
@queue = FreeMessageQueue::FilePersistentQueue.new(manager)
@queue.folder = "/tmp/fmq/FilePersistentQueue/testqueue"
end

def teardown
# remove the test data
FileUtils.rm_rf "/tmp/fmq"
end
end

0 comments on commit cb74e1b

Please sign in to comment.