Permalink
Browse files

load balancing and parallelism finally seems to work

  • Loading branch information...
1 parent c76a128 commit daab1d9b9f74ad1d6c7a069d598cb88715944c7f Nick Kallen committed Apr 17, 2009
Showing with 2,806 additions and 12,526 deletions.
  1. +1 −1 jb
  2. +2 −2 joke_server.rb
  3. +1,340 −0 log/joke_server.log
  4. +0 −9,162 log/joke_server.rb.log
  5. +1,411 −0 log/proxy.log
  6. +0 −3,321 log/proxy.rb.log
  7. +10 −31 proxy.rb
  8. +6 −4 proxy/balancers/least_connections.rb
  9. +25 −5 proxy/server.rb
  10. +10 −0 util/deferrable.rb
  11. +1 −0 util/statosaurus.rb
View
@@ -31,7 +31,7 @@ benchmark = Benchmark.measure do
socket = TCPSocket.new($options[:host], $options[:port])
count_per_worker.times do |i|
socket.print("\n")
- socket.gets
+ socket.readline
end
end
end
View
@@ -13,7 +13,7 @@
end
begin
- logfile = File.join(File.dirname(__FILE__), 'log', File.basename(__FILE__) + '.log')
+ logfile = File.join(File.dirname(__FILE__), 'log', File.basename(__FILE__, '.rb') + '.log')
$stats = Statosaurus.new(['job_user', 'job_sys', 'job_real', 'source_transaction_id'], Logger.new(logfile))
end
@@ -24,7 +24,7 @@ def receive_line(line)
$stats.transaction do
$stats.set('source_transaction_id', line)
$stats.measure('job') do
- 10000.times {}
+ 1000000.times {}
sleep rand
send_data("KNOCK KNOCK\n")
end
View
Oops, something went wrong.
View
Oops, something went wrong.
View
Oops, something went wrong.
View
Oops, something went wrong.
View
@@ -1,7 +1,7 @@
#!/usr/bin/env ruby
['rubygems', 'activesupport', 'eventmachine', 'socket', 'optparse'].each { |dependency| require dependency }
-['util/statosaurus', 'util/line_buffered_connection', 'proxy/server', 'proxy/balancers/first', 'proxy/balancers/random', 'proxy/balancers/round_robin', 'proxy/balancers/least_connections'].each { |dependency| require dependency }
+['util/statosaurus', 'util/line_buffered_connection', 'util/deferrable', 'proxy/server', 'proxy/balancers/first', 'proxy/balancers/random', 'proxy/balancers/round_robin', 'proxy/balancers/least_connections'].each { |dependency| require dependency }
begin
$options = {
@@ -18,61 +18,40 @@
end
begin
- logfile = File.join(File.dirname(__FILE__), 'log', File.basename(__FILE__) + '.log')
+ logfile = File.join(File.dirname(__FILE__), 'log', File.basename(__FILE__, '.rb') + '.log')
$stats = Statosaurus.new(['job_user', 'job_sys', 'job_real'], Logger.new(logfile))
end
module ProxyServer
- include LineBufferedConnection
+ include LineBufferedConnection, Deferrable
def receive_line(line)
- EventMachine.defer do
+ defer do
$stats.transaction do
$stats.measure('job') do
message = $stats.transaction_id + "\n"
- p "forwarding"
- sleep 0
- response = ProxyServer.forward(message)
- p "sending data"
- sleep 0
- send_data(response)
- p "sent"
+ send_data(ProxyServer.forward(message))
end
end
end
end
-
- def defer(&block)
- p "pushed block"
- (@queue ||= Queue.new) << block
- initialize_thread
- end
-
- def initialize_thread
- @thread ||= Thread.new do
- while true
- p "looking"
- @queue.pop.call
- end
- end
- end
def self.forward(data)
balancer.forward(data)
end
- private
+ private
def self.servers
- Thread.exclusive do
- @servers ||= (1..$options[:count]).inject([]) do |servers, i|
+ @servers ||= Thread.exclusive do
+ (1..$options[:count]).inject([]) do |servers, i|
servers << Server.new($options[:host], $options[:port] + i)
end
end
end
def self.balancer
- Thread.exclusive do
- @balancer ||= $options[:balancer].new(servers)
+ @balancer ||= Thread.exclusive do
+ $options[:balancer].new(servers)
end
end
end
@@ -11,13 +11,15 @@ def forward(data)
def next_server
server = nil
Thread.exclusive do
- server = servers.shift
+ server = servers.min do |s1, s2|
+ s1.connections <=> s2.connections
+ end
+ server.reserve
end
yield server
+
ensure
- Thread.exclusive do
- servers.push server
- end
+ server.release
end
end
View
@@ -1,13 +1,33 @@
class Server
+ include Deferrable
+
def initialize(host, port)
- @socket = TCPSocket.new(host, port)
- @free = true
+ @host, @port = host, port
+ end
+
+ def connections
+ @connections ||= 0
+ end
+
+ def connections=(connections)
+ @connections = connections
end
- def call(data)
+ def reserve
Thread.exclusive do
- @socket.print(data)
- @socket.gets
+ self.connections += 1
end
end
+
+ def release
+ Thread.exclusive do
+ self.connections -= 1
+ end
+ end
+
+ def call(data)
+ socket = TCPSocket.new(@host, @port)
+ socket.print(data)
+ socket.readline
+ end
end
View
@@ -0,0 +1,10 @@
+module Deferrable
+ def defer(&block)
+ (@queue ||= Queue.new) << block
+ @thread ||= Thread.new do
+ while true
+ @queue.pop.call
+ end
+ end
+ end
+end
View
@@ -9,6 +9,7 @@ def initialize(fields, logger)
end
def measure(field, &block)
+ result = nil
measurement = Benchmark.measure do
result = yield
end

0 comments on commit daab1d9

Please sign in to comment.