forked from igrigorik/em-proxy
/
balancing.rb
161 lines (128 loc) · 3.8 KB
/
balancing.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
require 'lib/em-proxy'
require 'ansi/code'
require 'rack'
module BalancingProxy
extend self
BACKENDS = [
{"http://0.0.0.0:3000" => 0},
{"http://0.0.0.0:3001" => 0},
{"http://0.0.0.0:3002" => 0}
]
# Represents a "backend", ie. the endpoint for the proxy.
#
# This could be eg. a WEBrick webserver (see below), so the proxy server works as a _reverse_ proxy.
# But it could also be a proxy server, so the proxy server works as a _forward_ proxy.
#
class Backend
attr_reader :url, :host, :port, :strategy
alias :to_s :url
def initialize(url)
@url = url
parsed = URI.parse(@url)
@host, @port = parsed.host, parsed.port
end
# Select the least loaded backend
#
def self.select(strategy = :balanced)
@strategy = strategy.to_sym
case @strategy
when :balanced
backend = new list.sort { |a,b| a.values <=> b.values }.first.keys.first
when :roundrobin
@pool = list.clone if @pool.nil? || @pool.empty?
backend = new @pool.shift.keys.first
when :random
backend = new list[ rand(list.size-1) ].keys.first
else
raise ArgumentError, "Unknown strategy: #{@strategy}"
end
puts "---> Selecting #{backend}"
backend.increment_counter if @strategy == :balanced
yield backend if block_given?
backend
end
# List of backends
#
def self.list
@list ||= BACKENDS
end
# Increment "currently serving requests" counter
#
def increment_counter
Backend.list.select { |b| b.keys.first == url }.first[url] += 1
end
# Decrement "currently serving requests" counter
#
def decrement_counter
Backend.list.select { |b| b.keys.first == url }.first[url] -= 1
end
end
# Callbacks for em-proxy events
#
module Callbacks
include ANSI::Code
extend self
def on_connect
lambda do |backend|
puts black_on_magenta { 'on_connect'.ljust(12) } + ' ' + bold { backend }
end
end
def on_data
lambda do |data|
puts black_on_yellow { 'on_data'.ljust(12) }, data
data
end
end
def on_response
lambda do |backend, resp|
puts black_on_green { 'on_response'.ljust(12) } + " from #{backend}", resp
resp
end
end
def on_finish
lambda do |backend|
puts black_on_magenta { 'on_finish'.ljust(12) } + " for #{backend}", ''
backend.decrement_counter if backend.strategy == :balanced
end
end
end
# Wrapping the proxy server
#
module Server
def run(host='0.0.0.0', port=9999)
puts ANSI::Code.bold { "Launching proxy at #{host}:#{port}...\n" }
Proxy.start(:host => host, :port => port, :debug => false) do |conn|
Backend.select do |backend|
conn.server backend, :host => backend.host, :port => backend.port
conn.on_connect &Callbacks.on_connect
conn.on_data &Callbacks.on_data
conn.on_response &Callbacks.on_response
conn.on_finish &Callbacks.on_finish
end
end
end
module_function :run
end
end
if __FILE__ == $0
class Proxy
def self.stop
puts "Terminating ProxyServer"
EventMachine.stop
$servers.each do |pid|
puts "Terminating webserver #{pid}"
Process.kill('KILL', pid)
end
end
end
# Simple Rack app to run
app = proc { |env| [ 200, {'Content-Type' => 'text/plain'}, ["Hello World!"] ] }
# Run app on ports 3000-3002
$servers = []
3.times do |i|
$servers << Process.fork { Rack::Handler::WEBrick.run(app, {:Host => "0.0.0.0", :Port => "300#{i}"}) }
end
puts ANSI::Code::green_on_black { "\n=> Send multiple requests to the proxy by running `ruby balancing-client.rb`\n" }
# Start proxy
BalancingProxy::Server.run
end