Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

spike out using em

  • Loading branch information...
commit 30cfc5b4ba0f0c6c4ba8beb9a8184d47e4e46e0b 1 parent 3e92f77
@qrush qrush authored
View
2  Gemfile
@@ -1,6 +1,8 @@
source "http://rubygems.org"
gem "daemons", "1.1.0"
+gem "em-hiredis", :path => "../em-hiredis"
+gem "eventmachine", "0.12.10"
gem "excon", "0.5.6"
gem "json_pure", "1.4.6"
gem "redis", "2.1.1"
View
10 Gemfile.lock
@@ -1,3 +1,9 @@
+PATH
+ remote: ../em-hiredis
+ specs:
+ em-hiredis (0.0.1)
+ hiredis (~> 0.3.0)
+
GEM
remote: http://rubygems.org/
specs:
@@ -12,11 +18,13 @@ GEM
term-ansicolor (~> 1.0.5)
daemons (1.1.0)
diff-lcs (1.1.2)
+ eventmachine (0.12.10)
excon (0.5.6)
gherkin (2.2.9)
json (~> 1.4.6)
term-ansicolor (~> 1.0.5)
git (1.2.5)
+ hiredis (0.3.1)
jeweler (1.5.1)
bundler (~> 1.0.0)
git (>= 1.2.5)
@@ -45,6 +53,8 @@ DEPENDENCIES
bourne
cucumber
daemons (= 1.1.0)
+ em-hiredis!
+ eventmachine (= 0.12.10)
excon (= 0.5.6)
jeweler
json_pure (= 1.4.6)
View
2  lib/daikon.rb
@@ -7,6 +7,8 @@
require 'stringio'
require 'thread'
+require 'em-hiredis'
+require 'eventmachine'
require 'excon'
require 'daemons'
require 'json'
View
31 lib/daikon/client.rb
@@ -7,26 +7,16 @@ class Client
JSON::ParserError,
Excon::Errors::SocketError]
- attr_accessor :redis, :logger, :config, :http, :monitor
+ attr_accessor :logger, :config, :http
def setup(config, logger = nil)
self.config = config
self.logger = logger
- self.redis = connect
- self.monitor = Monitor.new(connect, logger)
self.http = Excon.new(config.server_prefix)
log "Started Daikon v#{VERSION}"
end
- def connect
- Redis.connect(:url => config.redis_url)
- end
-
- def start_monitor
- monitor.start
- end
-
def log(message)
logger.info message if logger
end
@@ -57,19 +47,22 @@ def push(method, path, body)
"Content-Type" => "application/json"})
end
- def rotate_monitor(start, stop)
- payload = monitor.rotate.merge({
- "start" => start,
- "stop" => stop
- })
+ def report_summaries
+ Daikon::Monitor.pop do |summary|
+ require 'pp'
+ pp summary
+ report_summary(summary)
+ end
+ end
- push :post, "/api/v1/summaries.json", payload
+ def report_summary(summary)
+ push :post, "/api/v1/summaries.json", summary
rescue *EXCEPTIONS => ex
exception(ex)
end
- def report_info
- push :post, "/api/v1/infos.json", redis.info
+ def report_info(info)
+ push :post, "/api/v1/infos.json", info
rescue *EXCEPTIONS => ex
exception(ex)
end
View
56 lib/daikon/daemon.rb
@@ -3,59 +3,41 @@ class Daemon
INFO_INTERVAL = ENV["INFO_INTERVAL"] || 10
SUMMARY_INTERVAL = ENV["SUMMARY_INTERVAL"] || 60
- def self.sleep_time=(sleep_time)
- @@sleep_time = sleep_time
- end
-
- def self.sleep_time
- @@sleep_time ||= 1
- end
-
- def self.run=(run)
- @@run = run
- end
-
- def self.run
- @@run
- end
-
def self.start(argv, ontop = false)
- self.run = true
- config = Daikon::Configuration.new(argv)
+ Daemons.run_proc("daikon", :ARGV => argv, :log_output => true, :backtrace => true, :ontop => ontop) do
+ config = Daikon::Configuration.new(argv)
- if argv.include?("-v") || argv.include?("--version")
- puts "Daikon v#{VERSION}"
- return
- end
+ if argv.include?("-v") || argv.include?("--version")
+ puts "Daikon v#{VERSION}"
+ return
+ end
- Daemons.run_proc("daikon", :ARGV => argv, :log_output => true, :backtrace => true, :ontop => ontop) do
if argv.include?("run")
logger = Logger.new(STDOUT)
else
logger = Logger.new("/tmp/radish.log")
end
- rotated_at = Time.now
- reported_at = Time.now
- client = Daikon::Client.new
-
+ client = Daikon::Client.new
client.setup(config, logger)
- client.start_monitor
- while self.run do
- now = Time.now
+ EventMachine::run do
+ hiredis = EventMachine::Hiredis.connect
+ himonitor = EventMachine::Hiredis.connect
- if now - reported_at >= sleep_time * INFO_INTERVAL.to_i
- client.report_info
- reported_at = now
+ himonitor.monitor do |line|
+ Daikon::Monitor.parse(line)
end
- if now - rotated_at >= sleep_time * SUMMARY_INTERVAL.to_i
- client.rotate_monitor(rotated_at, now)
- rotated_at = now
+ EventMachine::add_periodic_timer(10) do
+ hiredis.info do |info|
+ client.report_info info
+ end
end
- sleep sleep_time
+ EventMachine::add_periodic_timer(60) do
+ client.report_summaries
+ end
end
end
end
View
89 lib/daikon/monitor.rb
@@ -1,7 +1,5 @@
module Daikon
class Monitor
- attr_accessor :data
-
NO_ARG_COMMANDS = ["BGREWRITEAOF", "BGSAVE", "CONFIG RESETSTAT", "DBSIZE", "DEBUG SEGFAULT", "DISCARD", "EXEC", "FLUSHALL", "FLUSHDB", "INFO", "LASTSAVE", "MONITOR", "MULTI", "PING", "QUIT", "RANDOMKEY", "SAVE", "SHUTDOWN", "SYNC", "UNWATCH"]
READ_COMMANDS = ["EXISTS", "GET", "GETBIT", "GETRANGE", "HEXISTS", "HGET", "HGETALL", "HKEYS", "HLEN", "HMGET", "HVALS", "KEYS", "LINDEX", "LLEN", "LRANGE", "MGET", "SCARD", "SDIFF", "SINTER", "SISMEMBER", "SMEMBERS", "SORT", "SRANDMEMBER", "STRLEN", "SUNION", "TTL", "TYPE", "ZCARD", "ZCOUNT", "ZRANGE", "ZRANGEBYSCORE", "ZRANK", "ZREVRANGE", "ZREVRANGEBYSCORE", "ZREVRANK", "ZSCORE"].to_set
WRITE_COMMANDS = ["APPEND", "BLPOP", "BRPOP", "BRPOPLPUSH", "DECR", "DECRBY", "DEL", "GETSET", "EXPIRE", "EXPIREAT", "HDEL", "HINCRBY", "HMSET", "HSET", "HSETNX", "INCR", "INCRBY", "LINSERT", "LPOP", "LPUSH", "LPUSHX", "LREM", "LSET", "LTRIM", "MOVE", "MSET", "MSETNX", "PERSIST", "RENAME", "RENAMENX", "RPOP", "RPOPLPUSH", "RPUSH", "RPUSHX", "SADD", "SDIFFSTORE", "SET", "SETBIT", "SETEX", "SETNX", "SETRANGE", "SINTERSTORE", "SMOVE", "SPOP", "SREM", "SUNIONSTORE", "ZADD", "ZINCRBY", "ZINTERSTORE", "ZREM", "ZREMRANGEBYRANK", "ZREMRANGEBYSCORE", "ZUNIONSTORE"].to_set
@@ -12,40 +10,37 @@ class Monitor
OLD_SINGLE_FORMAT = /^(#{NO_ARG_COMMANDS.join('|')})$/i
OLD_MORE_FORMAT = /^[A-Z]+ .*$/i
- def initialize(redis = nil, logger = nil)
- @data = data_hash
- @redis = redis
- @logger = logger
- @mutex = Mutex.new
+ def self.summaries
+ @@summaries ||= {}
end
- def data_hash
- {"commands" => Hash.new(0),
- "keys" => Hash.new(0),
- "namespaces" => Hash.new(0),
- "totals" => Hash.new(0)}
+ def self.current_summary(time)
+ summaries[time] ||=
+ {"commands" => Hash.new(0),
+ "keys" => Hash.new(0),
+ "namespaces" => Hash.new(0),
+ "totals" => Hash.new(0),
+ "start" => time,
+ "stop" => time}
end
- def start
- Thread.new do
- @redis.monitor do |line|
- parse(line)
- end
- end
+ def self.parse(line)
+ new.parse(line)
end
- def lock(&block)
- @mutex.synchronize(&block)
+ def self.pop
+ time, summary = summaries.first
+ summary["keys"] = Hash[*summary["keys"].sort_by(&:last).reverse[0..99].flatten]
+ yield summary
+ summaries.delete(time)
end
- def rotate
- this_data = nil
- lock do
- this_data = self.data.dup
- self.data.replace(data_hash)
- end
- this_data["keys"] = Hash[*this_data["keys"].sort_by(&:last).reverse[0..99].flatten]
- this_data
+ def initialize
+ @now = Time.now.utc.strftime("%Y-%m-%d %H:%M:00 %Z")
+ end
+
+ def current_summary
+ self.class.current_summary(@now)
end
def parse(line)
@@ -62,50 +57,48 @@ def push(split_command)
return unless ALL_COMMANDS.member?(command)
- lock do
- incr_command(command)
- incr_total(command)
- if key
- key.gsub!(".", "{PERIOD}") if key.include?('.')
- key.gsub!("$", "{DOLLAR}") if key.include?('$')
-
- incr_key(key)
- incr_namespace(key)
- else
- incr_global_namespace
- end
+ incr_command(command)
+ incr_total(command)
+ if key
+ key.gsub!(".", "{PERIOD}") if key.include?('.')
+ key.gsub!("$", "{DOLLAR}") if key.include?('$')
+
+ incr_key(key)
+ incr_namespace(key)
+ else
+ incr_global_namespace
end
end
def incr_namespace(key)
if marker = key =~ /:|-/
- data["namespaces"][key[0...marker]] += 1
+ current_summary["namespaces"][key[0...marker]] += 1
else
incr_global_namespace
end
end
def incr_global_namespace
- data["namespaces"]["global"] += 1
+ current_summary["namespaces"]["global"] += 1
end
def incr_command(command)
- data["commands"][command] += 1
+ current_summary["commands"][command] += 1
end
def incr_key(key)
- data["keys"][key] += 1
+ current_summary["keys"][key] += 1
end
def incr_total(command)
- data["totals"]["all"] += 1
+ current_summary["totals"]["all"] += 1
if READ_COMMANDS.member?(command)
- data["totals"]["read"] += 1
+ current_summary["totals"]["read"] += 1
elsif WRITE_COMMANDS.member?(command)
- data["totals"]["write"] += 1
+ current_summary["totals"]["write"] += 1
elsif OTHER_COMMANDS.member?(command)
- data["totals"]["other"] += 1
+ current_summary["totals"]["other"] += 1
end
end
end
Please sign in to comment.
Something went wrong with that request. Please try again.