From 419009cc2091f5ce445e8b274dbc5699f8cf90cc Mon Sep 17 00:00:00 2001 From: Cory G Watson Date: Thu, 23 Feb 2012 14:36:29 -0600 Subject: [PATCH 1/8] Remove some metrics that were hanging around. --- src/main/scala/net/lag/kestrel/PersistentQueue.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/scala/net/lag/kestrel/PersistentQueue.scala b/src/main/scala/net/lag/kestrel/PersistentQueue.scala index 11da2b23..b8ecfea5 100644 --- a/src/main/scala/net/lag/kestrel/PersistentQueue.scala +++ b/src/main/scala/net/lag/kestrel/PersistentQueue.scala @@ -404,6 +404,11 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c Stats.clearGauge(statNamed("age_msec")) Stats.clearGauge(statNamed("waiters")) Stats.clearGauge(statNamed("open_transactions")) + Stats.removeMetric(statNamed("set_latency_usec")) + Stats.removeMetric(statNamed("get_timeout_msec")) + Stats.removeMetric(statNamed("delivery_latency_msec")) + Stats.removeMetric(statNamed("get_hit_latency_usec")) + Stats.removeMetric(statNamed("get_miss_latency_usec")) } private final def nextXid(): Int = { From 8c41a905116d374338018e6c487046eac5e9bf16 Mon Sep 17 00:00:00 2001 From: Cory G Watson Date: Mon, 27 Feb 2012 11:50:13 -0600 Subject: [PATCH 2/8] Bump the version of ostrich and clear create_time from stats. --- project/build/KestrelProject.scala | 2 +- src/main/scala/net/lag/kestrel/PersistentQueue.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/project/build/KestrelProject.scala b/project/build/KestrelProject.scala index 9d91892e..fc73e5ae 100644 --- a/project/build/KestrelProject.scala +++ b/project/build/KestrelProject.scala @@ -11,7 +11,7 @@ class KestrelProject(info: ProjectInfo) extends StandardServiceProject(info) wit val utilEval = "com.twitter" % "util-eval" % "1.12.4" val utilLogging = "com.twitter" % "util-logging" % "1.12.4" - val ostrich = "com.twitter" % "ostrich" % "4.10.0" + val ostrich = "com.twitter" % "ostrich" % "4.10.3" val naggati = "com.twitter" % "naggati" % "2.1.1" intransitive() // allow custom netty val netty = "org.jboss.netty" % "netty" % "3.2.6.Final" diff --git a/src/main/scala/net/lag/kestrel/PersistentQueue.scala b/src/main/scala/net/lag/kestrel/PersistentQueue.scala index b8ecfea5..e4892769 100644 --- a/src/main/scala/net/lag/kestrel/PersistentQueue.scala +++ b/src/main/scala/net/lag/kestrel/PersistentQueue.scala @@ -404,6 +404,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c Stats.clearGauge(statNamed("age_msec")) Stats.clearGauge(statNamed("waiters")) Stats.clearGauge(statNamed("open_transactions")) + Stats.clearGauge(statNamed("create_time")) Stats.removeMetric(statNamed("set_latency_usec")) Stats.removeMetric(statNamed("get_timeout_msec")) Stats.removeMetric(statNamed("delivery_latency_msec")) From fbc5548172153a466ea84999e073eab4b488c12e Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Tue, 6 Mar 2012 20:04:26 -0800 Subject: [PATCH 3/8] make sure create_time is in seconds, and erase it when a queue is deleted --- src/main/scala/net/lag/kestrel/PersistentQueue.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/main/scala/net/lag/kestrel/PersistentQueue.scala b/src/main/scala/net/lag/kestrel/PersistentQueue.scala index 11da2b23..4206a1e7 100644 --- a/src/main/scala/net/lag/kestrel/PersistentQueue.scala +++ b/src/main/scala/net/lag/kestrel/PersistentQueue.scala @@ -44,7 +44,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c // age of the last item read from the queue: private var _currentAge: Duration = 0.milliseconds - + // time the queue was created private var _createTime = Time.now @@ -95,7 +95,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge } def waiterCount: Long = synchronized { waiters.size } def isClosed: Boolean = synchronized { closed || paused } - def createTime: Long = synchronized { _createTime } + def createTime: Long = synchronized { _createTime.inSeconds } // mostly for unit tests. def memoryLength: Long = synchronized { queue.size } @@ -404,6 +404,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c Stats.clearGauge(statNamed("age_msec")) Stats.clearGauge(statNamed("waiters")) Stats.clearGauge(statNamed("open_transactions")) + Stats.clearGauge(statNamed("create_time")) } private final def nextXid(): Int = { From 837584a3a744c9680e1dbc837928c8341b2b6378 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Thu, 8 Mar 2012 18:18:28 -0800 Subject: [PATCH 4/8] factor out socket connections in kcluster, and let offline servers be warnings. --- src/main/ruby/kcluster/bin/kcluster | 104 +++++++++++++++++----------- 1 file changed, 63 insertions(+), 41 deletions(-) diff --git a/src/main/ruby/kcluster/bin/kcluster b/src/main/ruby/kcluster/bin/kcluster index 4195dd0e..77ccca20 100755 --- a/src/main/ruby/kcluster/bin/kcluster +++ b/src/main/ruby/kcluster/bin/kcluster @@ -22,40 +22,66 @@ def verbose(s) puts s if $options[:verbose] end -def fetch_stats(host, port, data) - verbose "--- Fetching stats from #{host}:#{port}" - sock = TCPSocket.open(host, port) - sock.puts("stats") - done = false - while !done && line = sock.gets.chomp - if (line == 'END') then - done = true - elsif line =~ /STAT queue_([\w\+\-]+) (\d+)/ - key = $1 - value = $2.to_i - (stat, queue_name) = case key - when /([\w\+\-]+)_total_items/ then [:total_items, $1] - when /([\w\+\-]+)_expired_items/ then [:expired_items, $1] - when /([\w\+\-]+)_mem_items/ then [:mem_items, $1] - when /([\w\+\-]+)_items/ then [:items, $1] - when /([\w\+\-]+)_mem_bytes/ then [:mem_bytes, $1] - when /([\w\+\-]+)_bytes/ then [:bytes, $1] - when /([\w\+\-]+)_age/ then [:age, $1] - end +def with_server(hostname, port, &block) + begin + sock = TCPSocket.open(hostname, port) + rescue + puts "WARNING: Unable to connect to #{hostname}:#{port}" + return + end + begin + yield sock + ensure + sock.close + end +end - if (queue_name) - queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts] +def flush_all(queue_name) + $options[:server_list].each do |server| + print "--- Flushing queue #{queue_name} from #{server}:#{$options[:port]} ... " + STDOUT.flush + with_server(server, $options[:port]) do |sock| + sock.puts("flush " + queue_name) + puts sock.readline.chomp + end + end + puts "Done." +end + +def fetch_stats(hostname, port, data) + verbose "--- Fetching stats from #{hostname}:#{port}" + with_server(hostname, port) do |sock| + sock.puts("stats") + done = false + while !done && line = sock.gets.chomp + if (line == 'END') then + done = true + elsif line =~ /STAT queue_([\w\+\-]+) (\d+)/ + key = $1 + value = $2.to_i + (stat, queue_name) = case key + when /([\w\+\-]+)_total_items/ then [:total_items, $1] + when /([\w\+\-]+)_expired_items/ then [:expired_items, $1] + when /([\w\+\-]+)_mem_items/ then [:mem_items, $1] + when /([\w\+\-]+)_items/ then [:items, $1] + when /([\w\+\-]+)_mem_bytes/ then [:mem_bytes, $1] + when /([\w\+\-]+)_bytes/ then [:bytes, $1] + when /([\w\+\-]+)_age/ then [:age, $1] + end + + if (queue_name) + queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts] - if (stat == :age) - data[:min_age][queue_name] = value if value < data[:min_age][queue_name] - data[:max_age][queue_name] = value if value > data[:max_age][queue_name] - else - data[stat][queue_name] += value + if (stat == :age) + data[:min_age][queue_name] = value if value < data[:min_age][queue_name] + data[:max_age][queue_name] = value if value > data[:max_age][queue_name] + else + data[stat][queue_name] += value + end end end end end - sock.close end def fetch_all @@ -65,11 +91,7 @@ def fetch_all end end $options[:server_list].each do |server| - begin - fetch_stats(server, $options[:port], data) - rescue => e - puts "Could not connect to host #{server}: #{e}" - end + fetch_stats(server, $options[:port], data) end data end @@ -98,10 +120,10 @@ def delete_all(queue_name) $options[:server_list].each do |server| print "--- Deleting queue #{queue_name} from #{server}:#{$options[:port]} ... " STDOUT.flush - sock = TCPSocket.open(server, $options[:port]) - sock.puts("delete " + queue_name) - puts sock.readline.chomp - sock.close + with_server(server, $options[:port]) do |sock| + sock.puts("delete " + queue_name) + puts sock.readline.chomp + end end puts "Done." end @@ -110,10 +132,10 @@ def flush_all(queue_name) $options[:server_list].each do |server| print "--- Flushing queue #{queue_name} from #{server}:#{$options[:port]} ... " STDOUT.flush - sock = TCPSocket.open(server, $options[:port]) - sock.puts("flush " + queue_name) - puts sock.readline.chomp - sock.close + with_server(server, $options[:port]) do |sock| + sock.puts("flush " + queue_name) + puts sock.readline.chomp + end end puts "Done." end From e8196170a07b07d9ff891e3618695218b7f27d31 Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Fri, 9 Mar 2012 15:02:56 -0800 Subject: [PATCH 5/8] refactor --- src/main/ruby/kcluster/bin/kcluster | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/main/ruby/kcluster/bin/kcluster b/src/main/ruby/kcluster/bin/kcluster index 77ccca20..d2e93611 100755 --- a/src/main/ruby/kcluster/bin/kcluster +++ b/src/main/ruby/kcluster/bin/kcluster @@ -22,6 +22,11 @@ def verbose(s) puts s if $options[:verbose] end +def status(s) + print s + STDOUT.flush +end + def with_server(hostname, port, &block) begin sock = TCPSocket.open(hostname, port) @@ -116,28 +121,23 @@ def report_all(data, keys) puts "" end -def delete_all(queue_name) +def broadcast(command, queue_name, verb) $options[:server_list].each do |server| - print "--- Deleting queue #{queue_name} from #{server}:#{$options[:port]} ... " - STDOUT.flush + status "--- #{verb} queue #{queue_name} from #{server}:#{$options[:port]} ... " with_server(server, $options[:port]) do |sock| - sock.puts("delete " + queue_name) + sock.puts("#{command} " + queue_name) puts sock.readline.chomp end end puts "Done." end +def delete_all(queue_name) + broadcast("delete", queue_name, "Deleting") +end + def flush_all(queue_name) - $options[:server_list].each do |server| - print "--- Flushing queue #{queue_name} from #{server}:#{$options[:port]} ... " - STDOUT.flush - with_server(server, $options[:port]) do |sock| - sock.puts("flush " + queue_name) - puts sock.readline.chomp - end - end - puts "Done." + broadcast("flush", queue_name, "Flushing") end def keep_unchanged_data(last, current) From 2bf5fb0d0bcaefceaa142e00cb18ffa1cfce945e Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Fri, 9 Mar 2012 15:10:12 -0800 Subject: [PATCH 6/8] remove redundant copy of flush_all --- src/main/ruby/kcluster/bin/kcluster | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/src/main/ruby/kcluster/bin/kcluster b/src/main/ruby/kcluster/bin/kcluster index d2e93611..d836a1e5 100755 --- a/src/main/ruby/kcluster/bin/kcluster +++ b/src/main/ruby/kcluster/bin/kcluster @@ -41,18 +41,6 @@ def with_server(hostname, port, &block) end end -def flush_all(queue_name) - $options[:server_list].each do |server| - print "--- Flushing queue #{queue_name} from #{server}:#{$options[:port]} ... " - STDOUT.flush - with_server(server, $options[:port]) do |sock| - sock.puts("flush " + queue_name) - puts sock.readline.chomp - end - end - puts "Done." -end - def fetch_stats(hostname, port, data) verbose "--- Fetching stats from #{hostname}:#{port}" with_server(hostname, port) do |sock| From 2982d6ecd045babc112a9fc710ae895c2df67e3c Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Fri, 16 Mar 2012 16:51:27 -0700 Subject: [PATCH 7/8] bump kcluster gem version to 1.1.1 --- src/main/ruby/kcluster/VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/ruby/kcluster/VERSION b/src/main/ruby/kcluster/VERSION index 9084fa2f..524cb552 100644 --- a/src/main/ruby/kcluster/VERSION +++ b/src/main/ruby/kcluster/VERSION @@ -1 +1 @@ -1.1.0 +1.1.1 From 9028333aa04c55ba2ce32b189e7644771658b6ff Mon Sep 17 00:00:00 2001 From: Robey Pointer Date: Fri, 16 Mar 2012 17:03:46 -0700 Subject: [PATCH 8/8] fix this flaky test --- src/test/scala/net/lag/kestrel/ServerSpec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/net/lag/kestrel/ServerSpec.scala b/src/test/scala/net/lag/kestrel/ServerSpec.scala index df2b27ac..68dee44f 100644 --- a/src/test/scala/net/lag/kestrel/ServerSpec.scala +++ b/src/test/scala/net/lag/kestrel/ServerSpec.scala @@ -273,7 +273,7 @@ class ServerSpec extends Specification with TempFolder with TestLogging { client.startGet("slow/open/t=3599000") Thread.sleep(10) client.disconnect() - kestrel.queueCollection.queue("slow").get.waiterCount mustEqual 0 + kestrel.queueCollection.queue("slow").get.waiterCount must eventually(be_==(0)) } }