Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merged topics/middleware from fd - implements fnordmetric#embedded #5

  • Loading branch information...
commit 70f68b6ffbc6e257bf6757255f9429d47b86e9c1 2 parents f62b418 + 77cd359
Paul Asmuth authored
Showing with 61 additions and 45 deletions.
  1. +49 −33 lib/fnordmetric.rb
  2. +12 −12 lib/fnordmetric/app.rb
View
82 lib/fnordmetric.rb
@@ -12,8 +12,8 @@ module FnordMetric
@@namespaces = {}
- def self.namespace(key=nil, &block)
- @@namespaces[key] = block
+ def self.namespace(key=nil, &block)
+ @@namespaces[key] = block
end
def self.server_configuration=(configuration)
@@ -23,7 +23,7 @@ def self.server_configuration=(configuration)
def self.default_options(opts)
opts[:redis_url] ||= "redis://localhost:6379"
- opts[:redis_prefix] ||= "fnordmetric"
+ opts[:redis_prefix] ||= "fnordmetric"
opts[:inbound_stream] ||= ["0.0.0.0", "1337"]
opts[:web_interface] ||= ["0.0.0.0", "4242"]
@@ -38,8 +38,8 @@ def self.default_options(opts)
opts[:event_data_ttl] ||= 3600*24*30
# session data is kept for one month
- opts[:session_data_ttl] ||= 3600*24*30
-
+ opts[:session_data_ttl] ||= 3600*24*30
+
opts
end
@@ -49,25 +49,10 @@ def self.start_em(opts)
trap("TERM", &method(:shutdown))
trap("INT", &method(:shutdown))
- opts = default_options(opts)
-
- if opts[:start_worker]
- worker = Worker.new(@@namespaces.clone, opts)
- worker.ready!
- end
-
- if opts[:inbound_stream]
- begin
- inbound_stream = InboundStream.start(opts)
- log "listening on tcp##{opts[:inbound_stream].join(":")}"
- rescue
- log "cant start FnordMetric::InboundStream. port in use?"
- end
- end
+ app = embedded(opts)
if opts[:web_interface]
- begin
- app = FnordMetric::App.new(@@namespaces.clone, opts)
+ begin
Thin::Server.start(*opts[:web_interface], app)
log "listening on http##{opts[:web_interface].join(":")}"
rescue Exception => e
@@ -75,14 +60,7 @@ def self.start_em(opts)
end
end
- if opts[:print_stats]
- redis = connect_redis(opts[:redis_url])
- EM::PeriodicTimer.new(opts[:print_stats]) do
- print_stats(opts, redis)
- end
- end
-
- end
+ end
end
def self.log(msg)
@@ -113,19 +91,57 @@ def self.connect_redis(redis_url)
def self.print_stats(opts, redis) # FIXME: refactor this mess
keys = [:events_received, :events_processed]
- redis.llen("#{opts[:redis_prefix]}-queue") do |queue_length|
+ redis.llen("#{opts[:redis_prefix]}-queue") do |queue_length|
redis.hmget("#{opts[:redis_prefix]}-stats", *keys) do |data|
data_human = keys.size.times.map{|n|"#{keys[n]}: #{data[n]}"}
log "#{data_human.join(", ")}, queue_length: #{queue_length}"
- end
+ end
end
end
- def self.standalone
+ def self.standalone
require "fnordmetric/logger"
require "fnordmetric/standalone"
end
+ # returns a Rack app which can be mounted under any path.
+ # `:start_worker` starts a worker
+ # `:inbound_stream` starts the TCP interface
+ # `:print_stats` periodicaly prints worker stats
+ def self.embedded(opts={})
+ opts = default_options(opts)
+ app = nil
+
+ if opts[:rack_app] or opts[:web_interface]
+ app = FnordMetric::App.new(@@namespaces.clone, opts)
+ end
+
+ EM.next_tick do
+ if opts[:start_worker]
+ worker = Worker.new(@@namespaces.clone, opts)
+ worker.ready!
+ end
+
+ if opts[:inbound_stream]
+ begin
+ inbound_stream = InboundStream.start(opts)
+ log "listening on tcp##{opts[:inbound_stream].join(":")}"
+ rescue
+ log "cant start FnordMetric::InboundStream. port in use?"
+ end
+ end
+
+ if opts[:print_stats]
+ redis = connect_redis(opts[:redis_url])
+ EM::PeriodicTimer.new(opts[:print_stats]) do
+ print_stats(opts, redis)
+ end
+ end
+ end
+
+ app
+ end
+
end
require "fnordmetric/inbound_stream"
View
24 lib/fnordmetric/app.rb
@@ -1,16 +1,16 @@
# encoding: utf-8
class FnordMetric::App < Sinatra::Base
-
+
@@sessions = Hash.new
-
+
Encoding.default_external = Encoding::UTF_8
#use Rack::Reloader, 0
-
+
enable :session
- set :haml, :format => :html5
+ set :haml, :format => :html5
set :views, ::File.expand_path('../../../haml', __FILE__)
set :public_folder, ::File.expand_path('../../../pub', __FILE__)
@@ -25,7 +25,7 @@ def initialize(namespaces, opts)
end
super(nil)
end
-
+
helpers do
include Rack::Utils
alias_method :h, :escape_html
@@ -38,7 +38,7 @@ def namespaces
@namespaces
end
- def current_namespace
+ def current_namespace
@namespaces[@namespaces.keys.detect{ |k|
k.to_s == params[:namespace]
}.try(:intern)]
@@ -47,7 +47,7 @@ def current_namespace
end
if ENV['RACK_ENV'] == "test"
- set :raise_errors, true
+ set :raise_errors, true
end
get '/' do
@@ -78,7 +78,7 @@ def current_namespace
params[:sum] ? { :sum => _values.values.compact.map(&:to_i).sum } : _values
else
{ (_t = gauge.tick_at(Time.now.to_i-gauge.tick)) => gauge.value_at(_t) }
- end
+ end
data.to_json
end
@@ -88,14 +88,14 @@ def current_namespace
sessions = current_namespace.sessions(:all, :limit => 100).map do |session|
session.fetch_data!
session.to_json
- end
+ end
{ :sessions => sessions }.to_json
end
get '/:namespace/events' do
- events = if params[:type]
+ events = if params[:type]
current_namespace.events(:by_type, :type => params[:type])
elsif params[:session_key]
current_namespace.events(:by_session_key, :session_key => params[:session_key])
@@ -122,7 +122,7 @@ def current_namespace
end
post '/events' do
- halt 400, 'please specify the event_type (_type)' unless params["_type"]
+ halt 400, 'please specify the event_type (_type)' unless params["_type"]
track_event((8**32).to_s(36), parse_params(params))
end
@@ -130,7 +130,7 @@ def current_namespace
def parse_params(hash)
hash.tap do |h|
- h.keys.each{ |k| h[k] = parse_param(h[k]) }
+ h.keys.each{ |k| h[k] = parse_param(h[k]) }
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.