Permalink
Browse files

with defer

  • Loading branch information...
Nick Kallen
Nick Kallen committed Apr 17, 2009
1 parent 275d75a commit c76a1284a9e28c6927a320758ae17dc00d39b321
Showing with 1,430 additions and 40 deletions.
  1. +1 −1 jb
  2. +6 −6 joke_server.rb
  3. +1,018 −0 log/joke_server.rb.log
  4. +361 −0 log/proxy.rb.log
  5. +27 −11 proxy.rb
  6. +2 −4 proxy/balancers/least_connections.rb
  7. +9 −0 util/line_buffered_connection.rb
  8. +0 −16 util/line_protocol.rb
  9. +6 −2 { → util}/statosaurus.rb
View
2 jb
@@ -38,5 +38,5 @@ benchmark = Benchmark.measure do
threads.each(&:join)
end
-mean = benchmark.real / count_per_worker
+mean = benchmark.real / $options[:count]
puts "Requests per second:\t%2f [#/sec] (mean)" % mean
View
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby
-['rubygems', 'eventmachine', 'activesupport', 'statosaurus', 'optparse'].each { |dependency| require dependency }
-['util/line_protocol'].each { |dependency| require dependency }
+['rubygems', 'eventmachine', 'activesupport', 'optparse'].each { |dependency| require dependency }
+['util/statosaurus', 'util/line_buffered_connection'].each { |dependency| require dependency }
begin
$options = {
@@ -18,15 +18,15 @@
end
module JokeServer
- include LineProtocol
+ include LineBufferedConnection
- def call(data)
+ def receive_line(line)
$stats.transaction do
- $stats.set('source_transaction_id', data)
+ $stats.set('source_transaction_id', line)
$stats.measure('job') do
10000.times {}
sleep rand
- send_data("knock knock\n")
+ send_data("KNOCK KNOCK\n")
end
end
end
View

Large diffs are not rendered by default.

Oops, something went wrong.
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby
-['rubygems', 'activesupport', 'eventmachine', 'socket', 'optparse', 'statosaurus'].each { |dependency| require dependency }
-['util/line_protocol', 'proxy/server', 'proxy/balancers/first', 'proxy/balancers/random', 'proxy/balancers/round_robin', 'proxy/balancers/least_connections'].each { |dependency| require dependency }
+['rubygems', 'activesupport', 'eventmachine', 'socket', 'optparse'].each { |dependency| require dependency }
+['util/statosaurus', 'util/line_buffered_connection', 'proxy/server', 'proxy/balancers/first', 'proxy/balancers/random', 'proxy/balancers/round_robin', 'proxy/balancers/least_connections'].each { |dependency| require dependency }
begin
$options = {
@@ -23,22 +23,38 @@
end
module ProxyServer
- include LineProtocol
- include EventMachine::Deferrable
+ include LineBufferedConnection
- def call(data)
- proxy = self
- p "spawning"
- EventMachine.spawn do
+ def receive_line(line)
+ EventMachine.defer do
$stats.transaction do
$stats.measure('job') do
message = $stats.transaction_id + "\n"
+ p "forwarding"
+ sleep 0
response = ProxyServer.forward(message)
- proxy.send_data(response)
- p "finished"
+ p "sending data"
+ sleep 0
+ send_data(response)
+ p "sent"
end
end
- end.run
+ end
+ end
+
+ def defer(&block)
+ p "pushed block"
+ (@queue ||= Queue.new) << block
+ initialize_thread
+ end
+
+ def initialize_thread
+ @thread ||= Thread.new do
+ while true
+ p "looking"
+ @queue.pop.call
+ end
+ end
end
def self.forward(data)
@@ -14,12 +14,10 @@ def next_server
server = servers.shift
end
- result = yield server
-
+ yield server
+ ensure
Thread.exclusive do
servers.push server
end
-
- result
end
end
@@ -0,0 +1,9 @@
+['rubygems', 'eventmachine'].each { |dependency| require dependency }
+
+module LineBufferedConnection
+ def receive_data(data)
+ (@buffer ||= BufferedTokenizer.new).extract(data).each do |line|
+ receive_line(line)
+ end
+ end
+end
View
@@ -1,16 +0,0 @@
-['rubygems', 'eventmachine'].each { |dependency| require dependency }
-
-module LineProtocol
- def receive_data(data)
- @data ||= ""
- if first_newline = data.index("\n")
- call(@data + data[0..first_newline])
- @data = ""
- data_after_first_newline = \
- data[(first_newline+1)..-1]
- if data_after_first_newline
- receive_data(data_after_first_newline)
- end
- end
- end
-end
@@ -9,10 +9,13 @@ def initialize(fields, logger)
end
def measure(field, &block)
- measurement = Benchmark.measure(&block)
+ measurement = Benchmark.measure do
+ result = yield
+ end
@values["#{field}_real"] = min(measurement.real)
@values["#{field}_sys"] = min(measurement.stime)
@values["#{field}_user"] = min(measurement.utime)
+ result
end
def set(key, value)
@@ -21,9 +24,10 @@ def set(key, value)
def transaction
@transaction_id = "#{Process.pid}-#{Time.now.to_i}-#{rand(9999)}"
- yield
+ result = yield
print
@values, @transaction_id = {}, nil
+ result
end
private

0 comments on commit c76a128

Please sign in to comment.