Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge branch 'release_2_2' into graph
Browse files Browse the repository at this point in the history
  • Loading branch information
Robey Pointer committed Mar 29, 2012
2 parents 5be4502 + abdc873 commit aed4364
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 52 deletions.
2 changes: 1 addition & 1 deletion src/main/ruby/kcluster/VERSION
@@ -1 +1 @@
1.1.0
1.1.1
108 changes: 59 additions & 49 deletions src/main/ruby/kcluster/bin/kcluster
Expand Up @@ -22,40 +22,59 @@ def verbose(s)
puts s if $options[:verbose]
end

def fetch_stats(host, port, data)
verbose "--- Fetching stats from #{host}:#{port}"
sock = TCPSocket.open(host, port)
sock.puts("stats")
done = false
while !done && line = sock.gets.chomp
if (line == 'END') then
done = true
elsif line =~ /STAT queue_([\w\+\-]+) (\d+)/
key = $1
value = $2.to_i
(stat, queue_name) = case key
when /([\w\+\-]+)_total_items/ then [:total_items, $1]
when /([\w\+\-]+)_expired_items/ then [:expired_items, $1]
when /([\w\+\-]+)_mem_items/ then [:mem_items, $1]
when /([\w\+\-]+)_items/ then [:items, $1]
when /([\w\+\-]+)_mem_bytes/ then [:mem_bytes, $1]
when /([\w\+\-]+)_bytes/ then [:bytes, $1]
when /([\w\+\-]+)_age/ then [:age, $1]
end
def status(s)
print s
STDOUT.flush
end

if (queue_name)
queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts]
def with_server(hostname, port, &block)
begin
sock = TCPSocket.open(hostname, port)
rescue
puts "WARNING: Unable to connect to #{hostname}:#{port}"
return
end
begin
yield sock
ensure
sock.close
end
end

if (stat == :age)
data[:min_age][queue_name] = value if value < data[:min_age][queue_name]
data[:max_age][queue_name] = value if value > data[:max_age][queue_name]
else
data[stat][queue_name] += value
def fetch_stats(hostname, port, data)
verbose "--- Fetching stats from #{hostname}:#{port}"
with_server(hostname, port) do |sock|
sock.puts("stats")
done = false
while !done && line = sock.gets.chomp
if (line == 'END') then
done = true
elsif line =~ /STAT queue_([\w\+\-]+) (\d+)/
key = $1
value = $2.to_i
(stat, queue_name) = case key
when /([\w\+\-]+)_total_items/ then [:total_items, $1]
when /([\w\+\-]+)_expired_items/ then [:expired_items, $1]
when /([\w\+\-]+)_mem_items/ then [:mem_items, $1]
when /([\w\+\-]+)_items/ then [:items, $1]
when /([\w\+\-]+)_mem_bytes/ then [:mem_bytes, $1]
when /([\w\+\-]+)_bytes/ then [:bytes, $1]
when /([\w\+\-]+)_age/ then [:age, $1]
end

if (queue_name)
queue_name = queue_name.split('+', 2).first if $options[:rollup_fanouts]

if (stat == :age)
data[:min_age][queue_name] = value if value < data[:min_age][queue_name]
data[:max_age][queue_name] = value if value > data[:max_age][queue_name]
else
data[stat][queue_name] += value
end
end
end
end
end
sock.close
end

def fetch_all
Expand All @@ -65,11 +84,7 @@ def fetch_all
end
end
$options[:server_list].each do |server|
begin
fetch_stats(server, $options[:port], data)
rescue => e
puts "Could not connect to host #{server}: #{e}"
end
fetch_stats(server, $options[:port], data)
end
data
end
Expand All @@ -94,28 +109,23 @@ def report_all(data, keys)
puts ""
end

def delete_all(queue_name)
def broadcast(command, queue_name, verb)
$options[:server_list].each do |server|
print "--- Deleting queue #{queue_name} from #{server}:#{$options[:port]} ... "
STDOUT.flush
sock = TCPSocket.open(server, $options[:port])
sock.puts("delete " + queue_name)
puts sock.readline.chomp
sock.close
status "--- #{verb} queue #{queue_name} from #{server}:#{$options[:port]} ... "
with_server(server, $options[:port]) do |sock|
sock.puts("#{command} " + queue_name)
puts sock.readline.chomp
end
end
puts "Done."
end

def delete_all(queue_name)
broadcast("delete", queue_name, "Deleting")
end

def flush_all(queue_name)
$options[:server_list].each do |server|
print "--- Flushing queue #{queue_name} from #{server}:#{$options[:port]} ... "
STDOUT.flush
sock = TCPSocket.open(server, $options[:port])
sock.puts("flush " + queue_name)
puts sock.readline.chomp
sock.close
end
puts "Done."
broadcast("flush", queue_name, "Flushing")
end

def keep_unchanged_data(last, current)
Expand Down
10 changes: 8 additions & 2 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Expand Up @@ -44,7 +44,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c

// age of the last item read from the queue:
private var _currentAge: Duration = 0.milliseconds

// time the queue was created
private var _createTime = Time.now

Expand Down Expand Up @@ -95,7 +95,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
def waiterCount: Long = synchronized { waiters.size }
def isClosed: Boolean = synchronized { closed || paused }
def createTime: Long = synchronized { _createTime }
def createTime: Long = synchronized { _createTime.inSeconds }

// mostly for unit tests.
def memoryLength: Long = synchronized { queue.size }
Expand Down Expand Up @@ -414,6 +414,12 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Stats.clearGauge(statNamed("age_msec"))
Stats.clearGauge(statNamed("waiters"))
Stats.clearGauge(statNamed("open_transactions"))
Stats.clearGauge(statNamed("create_time"))
Stats.removeMetric(statNamed("set_latency_usec"))
Stats.removeMetric(statNamed("get_timeout_msec"))
Stats.removeMetric(statNamed("delivery_latency_msec"))
Stats.removeMetric(statNamed("get_hit_latency_usec"))
Stats.removeMetric(statNamed("get_miss_latency_usec"))
}

private final def nextXid(): Int = {
Expand Down

0 comments on commit aed4364

Please sign in to comment.