Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Feature/chunk queued #19

Merged
merged 5 commits into from

1 participant

@nextmat
Owner
  • Direct persister now has the ability to chunk large groups of measurements into multiple requests via a max measurements per request option
  • Queues now take advantage of this behavior
  • Added middleware to track total requests made
@nextmat nextmat merged commit 5af8209 into master
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
This page is out of date. Refresh to see the latest.
View
2  lib/librato/metrics/connection.rb
@@ -1,4 +1,5 @@
require 'faraday'
+require 'metrics/middleware/count_requests'
require 'metrics/middleware/expects_status'
require 'metrics/middleware/retry'
@@ -28,6 +29,7 @@ def transport
raise NoClientProvided unless @client
@transport ||= Faraday::Connection.new(:url => api_endpoint + "/v1/") do |f|
#f.use FaradayMiddleware::EncodeJson
+ f.use Librato::Metrics::Middleware::CountRequests
f.use Librato::Metrics::Middleware::Retry
f.use Librato::Metrics::Middleware::ExpectsStatus
#f.use FaradayMiddleware::ParseJson, :content_type => /\bjson$/
View
29 lib/librato/metrics/middleware/count_requests.rb
@@ -0,0 +1,29 @@
+module Librato
+ module Metrics
+ module Middleware
+
+ class CountRequests < Faraday::Response::Middleware
+ @total_requests = 0
+
+ class << self
+ attr_reader :total_requests
+
+ def increment
+ @total_requests += 1
+ end
+
+ def reset
+ @total_requests = 0
+ end
+ end
+
+ def call(env)
+ self.class.increment
+ @app.call(env)
+ end
+ end
+
+ end
+ end
+end
+
View
40 lib/librato/metrics/persistence/direct.rb
@@ -9,10 +9,42 @@ class Direct
# Persist the queued metrics directly to the
# Metrics web API.
#
- def persist(client, queued)
- payload = MultiJson.encode(queued)
- # expects 200
- client.connection.post('metrics', payload)
+ def persist(client, queued, options={})
+ per_request = options[:per_request]
+ if per_request
+ requests = chunk_queued(queued, per_request)
+ else
+ requests = [queued]
+ end
+ requests.each do |request|
+ payload = MultiJson.encode(request)
+ # expects 200
+ client.connection.post('metrics', payload)
+ end
+ end
+
+ private
+
+ def chunk_queued(queued, per_request)
+ return [queued] if queue_count(queued) <= per_request
+ reqs = []
+ queued.each do |metric_type, measurements|
+ if measurements.size <= per_request
+ # we can fit all of this metric type in a single
+ # request, so do so
+ reqs << {metric_type => measurements}
+ else
+ # going to have to split things up
+ measurements.each_slice(per_request) do |elements|
+ reqs << {metric_type => elements}
+ end
+ end
+ end
+ reqs
+ end
+
+ def queue_count(queued)
+ queued.inject(0) { |result, data| result + data.last.size }
end
end
View
2  lib/librato/metrics/persistence/test.rb
@@ -6,7 +6,7 @@ module Persistence
class Test
# persist the given metrics
- def persist(client, metrics)
+ def persist(client, metrics, options={})
@persisted = metrics
return !@return_value.nil? ? @return_value : true
end
View
12 lib/librato/metrics/queue.rb
@@ -1,13 +1,16 @@
module Librato
module Metrics
class Queue
+ MEASUREMENTS_PER_REQUEST = 500
+ attr_reader :per_request
attr_accessor :skip_measurement_times
def initialize(options={})
- @queued ||= {}
- @skip_measurement_times = options.delete(:skip_measurement_times)
- @client = options.delete(:client) || Librato::Metrics.client
+ @queued = {}
+ @per_request = options[:per_request] || MEASUREMENTS_PER_REQUEST
+ @skip_measurement_times = options[:skip_measurement_times]
+ @client = options[:client] || Librato::Metrics.client
end
# Add a metric entry to the metric set:
@@ -99,7 +102,8 @@ def size
# @return Boolean
def submit
raise NoMetricsQueued if self.queued.empty?
- if persister.persist(self.client, self.queued)
+ options = {:per_request => @per_request}
+ if persister.persist(self.client, self.queued, options)
flush and return true
end
false
View
28 spec/integration/metrics/middleware/count_requests_spec.rb
@@ -0,0 +1,28 @@
+require 'spec_helper'
+
+module Librato
+ module Metrics
+ module Middleware
+
+ describe CountRequests do
+ before(:all) { prep_integration_tests }
+
+ it "should count requests" do
+ CountRequests.reset
+ Metrics.submit :foo => 123
+ Metrics.submit :foo => 135
+ CountRequests.total_requests.should == 2
+ end
+
+ it "should be resettable" do
+ Metrics.submit :foo => 123
+ CountRequests.total_requests.should > 0
+ CountRequests.reset
+ CountRequests.total_requests.should == 0
+ end
+
+ end
+
+ end
+ end
+end
View
43 spec/integration/metrics/queue_spec.rb
@@ -0,0 +1,43 @@
+require 'spec_helper'
+
+module Librato
+ module Metrics
+
+ describe Queue do
+ before(:all) { prep_integration_tests }
+
+ context "with a large number of metrics" do
+ it "should submit them in multiple requests" do
+ Middleware::CountRequests.reset
+ queue = Queue.new(:per_request => 3)
+ (1..10).each do |i|
+ queue.add "gauge_#{i}" => 1
+ end
+ queue.submit
+ Middleware::CountRequests.total_requests.should == 4
+ end
+
+ it "should persist all metrics" do
+ queue = Queue.new(:per_request => 2)
+ (1..5).each do |i|
+ queue.add "gauge_#{i}" => i
+ end
+ (1..3).each do |i|
+ queue.add "counter_#{i}" => {:type => :counter, :value => i}
+ end
+
+ delete_all_metrics
+ queue.submit
+
+ metrics = Metrics.list
+ metrics.length.should == 8
+ counter = Metrics.fetch :counter_3, :count => 1
+ counter['unassigned'][0]['value'].should == 3
+ gauge = Metrics.fetch :gauge_5, :count => 1
+ gauge['unassigned'][0]['value'].should == 5
+ end
+ end
+ end
+
+ end
+end
View
2  spec/integration/metrics_spec.rb
@@ -9,7 +9,7 @@ module Librato
delete_all_metrics
Metrics.submit :my_counter => {:type => :counter, :value => 0}
1.upto(2).each do |i|
- sleep 1
+ sleep 1.1
Metrics.submit :my_counter => {:type => :counter, :value => i}
Metrics.submit :my_counter => {:source => 'baz', :type => :counter, :value => i+1}
end
View
6 spec/unit/metrics/queue_spec.rb
@@ -114,6 +114,12 @@ module Metrics
subject.gauges.should eql []
end
end
+
+ describe "#per_request" do
+ it "should default to 500" do
+ subject.per_request.should == 500
+ end
+ end
describe "#size" do
it "should return empty if gauges and counters are emtpy" do
Something went wrong with that request. Please try again.