Permalink
Browse files

Fixed an issue that would occasionally cause

Armstrong to crash. The GC was collecting the 
context the request and response sockets were 
using and thus would take everything down when the
sockets would try to fetch the next message. This 
is a **critical** patch and is recommended for all
!!!.
  • Loading branch information...
1 parent b06cd14 commit 642f1d0be38893a8448acda80de83c2f67248900 @ox committed Nov 20, 2011
Showing with 60 additions and 25 deletions.
  1. +12 −3 lib/armstrong.rb
  2. +32 −15 lib/armstrong/connection.rb
  3. +11 −3 lib/armstrong/main_actors.rb
  4. +5 −4 response_benchmark.rb
View
@@ -1,8 +1,6 @@
require 'actor'
require 'rubygems'
require 'lazy'
-require 'open-uri'
-require 'json'
libdir = File.dirname(__FILE__)
$LOAD_PATH.unshift(libdir) unless $LOAD_PATH.include?(libdir)
@@ -16,6 +14,7 @@ class Base
class << self
attr_accessor :conn, :routes
+ # uuid generator. There's a pretty low chance of collision.
def new_uuid
values = [
rand(0x0010000),
@@ -76,7 +75,16 @@ def encoded(char)
end
end
- class Armstrong < Base
+ class Armstrong < Base
+
+ # the kicker. It all gets launched from here.
+ # this function makes a new connection object to handle the communication,
+ # promises to start the replier, request handler, and their supervisor,
+ # gives the replier the connection information, tells the request_handler
+ # what routes it should be able to match, then checks that all of the services
+ # are running correctly, gives us a launch time, then jumps into our main loop
+ # that waits for an incoming message, parses it, and sends it off to be
+ # operated on by the request handler. Boom.
def self.run!
uuid = new_uuid
puts "starting Armstrong as mongrel2 service #{uuid}"
@@ -138,6 +146,7 @@ class << self
self.target = Armstrong
end
+ # Sinatras secret sauce.
at_exit { Armstrong.run! }
end
@@ -1,7 +1,9 @@
+require 'ffi'
require 'ffi-rzmq'
+require 'json'
class Connection
- attr_reader :app_id, :sub_addr, :pub_addr, :request_sock, :response_sock
+ attr_reader :app_id, :sub_addr, :pub_addr, :request_sock, :response_sock, :context
def initialize(app_id, zmq_sub_pub_addr=["tcp://127.0.0.1", 9999, "tcp://127.0.0.1", 9998])
@app_id = app_id
@@ -12,40 +14,44 @@ def initialize(app_id, zmq_sub_pub_addr=["tcp://127.0.0.1", 9999, "tcp://127.0.0
end
def connect
- context = ZMQ::Context.new 1
- @request_sock = context.socket ZMQ::PULL
+ @context = ZMQ::Context.new 1
+ @request_sock = @context.socket ZMQ::PULL
@request_sock.connect @sub_addr
- @response_sock = context.socket ZMQ::PUB
+ @response_sock = @context.socket ZMQ::PUB
@response_sock.setsockopt ZMQ::IDENTITY, @app_id
@response_sock.connect @pub_addr
end
- #raw recv
+ #raw recv, unparsed message
def recv
msg = ""
- @request_sock.recv_string msg
+ rc = @request_sock.recv_string(msg)
+ puts "errno [#{ZMQ::Util.errno}] with description [#{ZMQ::Util.error_string}]" unless ZMQ::Util.resultcode_ok?(rc)
msg
end
#parse the request, this is the best way to get stuff back, as a Hash
def receive
- parse(self.recv)
+ parse(recv)
end
+ # sends the message off, formatted for Mongrel2 to understand
def send(uuid, conn_id, msg)
header = "%s %d:%s" % [uuid, conn_id.join(' ').length, conn_id.join(' ')]
string = header + ', ' + msg
#puts "\t\treplying to #{conn_id} with: ", string
@response_sock.send_string string, ZMQ::NOBLOCK
end
- def reply(request, message)
- self.send(request[:uuid], [request[:id]], message)
+ # reply to an env with `message` string
+ def reply(env, message)
+ self.send(env[:sender], [env[:conn_id]], message)
end
- def reply_http(req, code=200, headers={"Content-type" => "text/html"}, body="")
- self.reply(req, http_response(body, code, headers))
+ # reply to a req with a valid http header
+ def reply_http(env, body, code=200, headers={"Content-type" => "text/html"})
+ self.reply(env, http_response(body, code, headers))
end
private
@@ -56,14 +62,25 @@ def http_response(body, code, headers)
"HTTP/1.1 #{code} #{StatusMessage[code.to_i]}\r\n#{headers_s}\r\n\r\n#{body}"
end
+ def parse_netstring(ns)
+ len, rest = ns.split(':', 2)
+ len = len.to_i
+ raise "Netstring did not end in ','" unless rest[len].chr == ','
+ [ rest[0...len], rest[(len+1)..-1] ]
+ end
+
def parse(msg)
- if(msg.empty?)
+ if msg.nil? || msg.empty?
return nil
end
- uuid, id, path, header_size, headers, body_size, body = msg.match(/^(.{36}) (\d+) (.*?) (\d+):(.*?),(\d+):(.*?),$/).to_a[1..-1]
-
- return {:uuid => uuid, :id => id, :path => path, :header_size => header_size, :headers => headers, :body_size => body_size, :body => body}
+ env = {}
+ env[:sender], env[:conn_id], env[:path], rest = msg.split(' ', 4)
+ env[:headers], head_rest = parse_netstring(rest)
+ env[:body], _ = parse_netstring(head_rest)
+
+ env[:headers] = JSON.parse(env[:headers])
+ return env
end
# From WEBrick: thanks dawg.
@@ -7,6 +7,16 @@ class << self
end
end
+# take the route and pattern and keys and this function will match the keyworded params in
+# the url with the pattern. Example:
+#
+# url: /user/2/view/345
+# pattern: /user/:id/view/:comment
+#
+# returns:
+#
+# params = {id: 2, comment: 345}
+#
def process_route(route, pattern, keys, values = [])
return unless match = pattern.match(route)
values += match.captures.map { |v| URI.decode(v) if v }
@@ -32,7 +42,7 @@ def process_route(route, pattern, keys, values = [])
end
msg.when(Reply) do |m|
begin
- conn.reply_http(m.env, m.code, m.headers, m.body)
+ conn.reply_http(m.env, m.body, m.code, m.headers)
rescue Exception => e
puts "Actor[:replier]: I messed up with exception: #{e.message}"
end
@@ -44,7 +54,6 @@ def process_route(route, pattern, keys, values = [])
# this nifty mess helps us just to use the output of the Procs that handle
# the request instead of making the user manually catch messages and send
# them out to the replier.
-
Aleph::Base.container_proc = Proc.new do
data = Actor.receive
env, proccess = data.env, data.proccess
@@ -73,7 +82,6 @@ def process_route(route, pattern, keys, values = [])
f.when(Request) do |r|
failure = true
- r.env[:headers] = JSON.parse(r.env[:headers])
verb = r.env[:headers]["METHOD"]
routes[verb].each do |route|
if route.route[0].match(r.env[:path])
View
@@ -2,11 +2,12 @@
require 'benchmark'
d = 0
+x = (ARGV[0].nil? ? 100 : ARGV[0].to_i)
-puts Benchmark.measure {
- 100.times do
+bench = Benchmark.measure {
+ x.times do
begin
- open("http://localhost:6000/")
+ open("http://localhost:6767/")
rescue Exception => e
puts e.message
d += 1
@@ -15,4 +16,4 @@
end
}
-puts "failed: #{d}"
+puts "succeeded: #{x-d}", "failed: #{d}", "reqs/sec: #{(x-d)/bench.real}", "time elapsed: %.6f" % bench.real

0 comments on commit 642f1d0

Please sign in to comment.