Skip to content
This repository has been archived by the owner on Apr 18, 2021. It is now read-only.

Commit

Permalink
added queue class
Browse files Browse the repository at this point in the history
  • Loading branch information
Jakub Kuźma committed Aug 12, 2009
1 parent af968db commit 9c71d61
Show file tree
Hide file tree
Showing 7 changed files with 118 additions and 219 deletions.
187 changes: 0 additions & 187 deletions bin/stree

This file was deleted.

4 changes: 3 additions & 1 deletion lib/sqs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
require "sqs/exceptions"
require "sqs/service"
require "sqs/signature"
require "sqs/support"
require "sqs/queue"

module Sqs
# Default (and only) host serving S3 stuff
# Default (and only) host serving SQS stuff
HOST = "queue.amazonaws.com"
end
56 changes: 56 additions & 0 deletions lib/sqs/queue.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
module Sqs
class Queue
extend Roxy::Moxie
extend Forwardable

attr_reader :path, :name, :service

def_instance_delegators :service, :service_request

def retrieve
queue_request(:get, :params => { :max_keys => 0 })
self
end

def ==(other)
self.name == other.name and self.service == other.service
end

def destroy(force)
queue_request(:delete)
true
end

def inspect #:nodoc:
"#<#{self.class}:#{name}>"
end

def initialize(service, url) #:nodoc:
self.service = service
self.url = url
end

private

attr_writer :service, :path

def url=(url)
parsed_url = URI.parse(url)
self.path = parsed_url.path[1..-1]
self.name = parsed_url.path.split("/").last
end

def name=(name)
raise ArgumentError.new("Invalid queue name: #{name}") unless name_valid?(name)
@name = name
end

def queue_request(method, options = {})
service_request(method, options.merge(:host => host, :path => path))
end

def name_valid?(name)
name =~ /\A[a-z0-9][a-z0-9\._-]{2,254}\Z/ and name !~ /\A#{URI::REGEXP::PATTERN::IPV4ADDR}\Z/
end
end
end
66 changes: 38 additions & 28 deletions lib/sqs/service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ def initialize(options)
@debug = options[:debug]
end

# Returns all buckets in the service and caches the result (see reload)
def buckets(reload = false)
if reload or @buckets.nil?
response = service_request(:get)
@buckets = parse_buckets(response.body)
# Returns all queues in the service and caches the result (see reload)
def queues(reload = false)
if reload or @queues.nil?
response = service_request(:params => { "Action" => "ListQueues" })
@queues = parse_queues(response.body)
else
@buckets
@queues
end
end

Expand All @@ -46,35 +46,36 @@ def port
use_ssl ? 443 : 80
end

proxy :buckets do
# Builds new bucket with given name
def build(name)
Bucket.new(proxy_owner, name)
proxy :queues do
# Builds new queue with given name
def create(name, default_visibility_timeout = nil)
url = proxy_owner.send(:create_queue, name, default_visibility_timeout)
Queue.new(proxy_owner, url)
end

# Finds the bucket with given name
# Finds the queue with given name
def find_first(name)
bucket = build(name)
bucket.retrieve
queue = build(name)
queue.retrieve
end
alias :find :find_first

# Find all buckets in the service
# Find all queues in the service
def find_all
proxy_target
end

# Reloads the bucket list (clears the cache)
# Reloads the queue list (clears the cache)
def reload
proxy_owner.buckets(true)
proxy_owner.queues(true)
end

# Destroy all buckets in the service. Doesn't destroy non-empty
# buckets by default, pass true to force destroy (USE WITH
# Destroy all queues in the service. Doesn't destroy non-empty
# queues by default, pass true to force destroy (USE WITH
# CARE!).
def destroy_all(force = false)
proxy_target.each do |bucket|
bucket.destroy(force)
proxy_target.each do |queue|
queue.destroy(force)
end
end
end
Expand All @@ -85,8 +86,17 @@ def inspect #:nodoc:

private

def service_request(method, options = {})
connection.request(method, options.merge(:path => "/#{options[:path]}"))
def create_queue(name, default_visibility_timeout = nil)
params = {
"Action" => "CreateQueue",
"QueueName" => name,
}
params["DefaultVisibilityTimeout"] = default_visibility_timeout if default_visibility_timeout
service_request({ :params => params })
end

def service_request(options = {})
connection.request(options.merge(:path => "/#{options[:path]}"))
end

def connection
Expand All @@ -101,13 +111,13 @@ def connection
@connection
end

def parse_buckets(xml_body)
def parse_queues(xml_body)
xml = XmlSimple.xml_in(xml_body)
buckets = xml["Buckets"].first["Bucket"]
if buckets
buckets_names = buckets.map { |bucket| bucket["Name"].first }
buckets_names.map do |bucket_name|
Bucket.new(self, bucket_name)
queues = xml["ListQueuesResult"]
if queues
queues_names = queues.map { |queue| queue["QueueUrl"].first }
queues_names.map do |queue_name|
Queue.new(self, queue_name)
end
else
[]
Expand Down
3 changes: 0 additions & 3 deletions lib/sqs/signature.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ module Sqs
#
# Implements algorithm defined by Amazon Web Services to sign
# request with secret private credentials
#
# === See:
# http://docs.amazonwebservices.com/AmazonS3/latest/index.html?RESTAuthentication.html

class Signature

Expand Down
11 changes: 11 additions & 0 deletions lib/sqs/support.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module Sqs
module Support
def self.classify(symbol)
camelize(symbol.to_s.sub(/.*\./, ''))
end

def self.camelize(lower_case_and_underscored_word)
lower_case_and_underscored_word.to_s.gsub(/(?:^|_)(.)/) { $1.upcase }
end
end
end
Loading

0 comments on commit 9c71d61

Please sign in to comment.