Permalink
Browse files

Adding flush, bumping to 0.2.0

  • Loading branch information...
1 parent 7f54cc5 commit 1bd4a7f87813b7f50cf8bc597c705cc4edd0df3d @calvinfo calvinfo committed Mar 21, 2013
View
@@ -1,3 +1,7 @@
+0.2.0 / 2013-3-21
+===========
+* Adding flush method
+
0.1.4 / 2013-3-19
===========
* Adding ClassMethods for more extensibility by [arronmabrey](https://github.com/arronmabrey)
View
@@ -17,6 +17,11 @@ def identify(options)
return false unless @client
@client.identify(options)
end
+
+ def flush
+ return false unless @client
+ @client.flush
+ end
end
extend ClassMethods
end
@@ -15,7 +15,7 @@ class Client
# :secret - String of your project's secret
# :max_queue_size - Fixnum of the max calls to remain queued (optional)
# :on_error - Proc which handles error calls from the API
- def initialize (options = {})
+ def initialize(options = {})
@queue = Queue.new
@secret = options[:secret]
@@ -27,14 +27,13 @@ def initialize (options = {})
@thread = Thread.new { @consumer.run }
end
- # public: Join on the thread to close
+ # public: Synchronously waits until the consumer has flushed the queue.
+ # Use only for scripts which are not long-running, and will
+ # specifically exit
#
- def close ()
- @consumer.close
- if @queue.length > 0
- @thread.join
- else
- @thread.join(1)
+ def flush
+ while !@queue.empty? || @consumer.is_requesting?
+ sleep(0.1)
end
end
@@ -18,44 +18,54 @@ class Consumer
# on_error - Proc of what to do on an error
#
def initialize(queue, secret, options = {})
- @open = true
@queue = queue
@secret = secret
@batch_size = options[:batch_size] || AnalyticsRuby::Defaults::Queue::BATCH_SIZE
@on_error = options[:on_error] || Proc.new { |status, error| }
@current_batch = []
+
+ @mutex = Mutex.new
end
# public: Continuously runs the loop to check for new events
#
def run
- while @open || !queue.empty?
+ while true
flush
end
end
# public: Flush some events from our queue
#
def flush
-
# Block until we have something to send
- @current_batch << @queue.pop()
+ item = @queue.pop()
- until @current_batch.length >= @batch_size || @queue.empty?
- @current_batch << @queue.pop()
- end
+ # Synchronize on additions to the current batch
+ @mutex.synchronize {
+ @current_batch << item
+ until @current_batch.length >= @batch_size || @queue.empty?
+ @current_batch << @queue.pop()
+ end
+ }
req = AnalyticsRuby::Request.new
res = req.post(@secret, @current_batch)
@on_error.call(res.status, res.error) unless res.status == 200
- @current_batch = []
+ @mutex.synchronize {
+ @current_batch = []
+ }
end
- # public: Close the consumer.
+ # public: Check whether we have outstanding requests.
#
- def close
- @open = false
+ def is_requesting?
+ requesting = nil
+ @mutex.synchronize {
+ requesting = !@current_batch.empty?
+ }
+ requesting
end
end
@@ -1,3 +1,3 @@
module AnalyticsRuby
- VERSION = '0.1.4'
+ VERSION = '0.2.0'
end
View
@@ -13,7 +13,6 @@
it 'should not error if a secret is supplied' do
Analytics::Client.new secret: AnalyticsHelpers::SECRET
end
-
end
describe '#track' do
@@ -33,7 +32,6 @@
it 'should not error with the required options' do
@client.track AnalyticsHelpers::Queued::TRACK
end
-
end
@@ -50,9 +48,20 @@
it 'should not error with the required options' do
@client.identify AnalyticsHelpers::Queued::IDENTIFY
end
-
end
+ describe '#flush' do
+ before(:all) do
+ @client = Analytics::Client.new secret: AnalyticsHelpers::SECRET
+ end
+
+ it 'should wait for the queue to finish on a flush' do
+ @client.identify AnalyticsHelpers::Queued::IDENTIFY
+ @client.track AnalyticsHelpers::Queued::TRACK
+ @client.flush
+ @client.queued_messages.should == 0
+ end
+ end
end
View
@@ -58,4 +58,31 @@
queue.should be_empty
end
end
+
+ describe '#is_requesting?' do
+
+ it 'should not return true if there isn\'t a current batch' do
+
+ queue = Queue.new
+ consumer = Analytics::Consumer.new(queue, 'testsecret')
+
+ consumer.is_requesting?.should == false
+ end
+
+ it 'should return true if there is a current batch' do
+
+ queue = Queue.new
+ queue << AnalyticsHelpers::Requested::TRACK
+ consumer = Analytics::Consumer.new(queue, 'testsecret')
+
+ Thread.new {
+ consumer.flush
+ consumer.is_requesting?.should == false
+ }
+
+ # sleep barely long enough to let thread flush the queue.
+ sleep(0.001)
+ consumer.is_requesting?.should == true
+ end
+ end
end
View
@@ -55,4 +55,12 @@
sleep(1)
end
end
+
+ describe '#flush' do
+
+ it 'should flush without error' do
+ Analytics.identify AnalyticsHelpers::Queued::IDENTIFY
+ Analytics.flush
+ end
+ end
end

0 comments on commit 1bd4a7f

Please sign in to comment.