Skip to content

Commit

Permalink
Add the concept of transports to the client library in anticipation of
Browse files Browse the repository at this point in the history
adding fibered em as a transport but also redis / 0mq based transports.

Also added benchmark
  • Loading branch information
Tobias Lütke committed Mar 26, 2011
1 parent e94cdda commit c0c890c
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 14 deletions.
42 changes: 42 additions & 0 deletions benchmark.rb
@@ -0,0 +1,42 @@
require 'client'
require 'benchmark'

expectations = []

# Make rand predictable
srand(0)

# Build a series of 5000 tests, every test has format [expectation, method, *args]
expectations = (0..5000).collect do
case rand(3)
when 0
num = rand(1000000)
[num+num, :add, num, num]
when 1
['hello', :hi]
when 2
data = '.' * rand(10000)
[data, :echo, data]
end
end

# Create 5 concurrent workers to pound the server with the fuzz test
pids = (0..4).collect do
fork do
socket = MessagePipe.new(TcpTransport.new('localhost', 9191))


ms = Benchmark.realtime do
expectations.each do |expectation, method, *args|
raise 'fail' unless socket.call(method, *args) == expectation
end
end

puts "benchmark finished in #{ms}s"
exit 0

end
end

# Wait for all forks to complete
Process.waitall
45 changes: 31 additions & 14 deletions client.rb
Expand Up @@ -2,24 +2,42 @@
require 'eventmachine'
require 'msgpack'

class MessagePipeSocket
class TcpTransport
def initialize(host, port)
@socket = TCPSocket.open(host, port)
end

def read
@socket.recv(4096)
end

def write(data)
@socket.print(data)
end

def open?
true
end
end

class MessagePipe
CMD_CALL = 0x01
RET_OK = 0x02
RET_E = 0x03

class RemoteError < StandardError
end

def initialize(host, port)
@socket = TCPSocket.new(host, port)
def initialize(transport)
@transport = transport
@unpacker = MessagePack::Unpacker.new
end

def call(method, *args)
@socket.print([CMD_CALL, method, args].to_msgpack)
@transport.write([CMD_CALL, method, args].to_msgpack)

loop do
@unpacker.feed(@socket.recv(4096))
while @transport.open?
@unpacker.feed(@transport.read)
@unpacker.each do |msg|
case msg.first
when RET_E
Expand All @@ -29,22 +47,21 @@ def call(method, *args)
else
raise RemoteError, "recieved invalid message: #{msg.inspect}"
end
end
raise RemoteError, 'disconnected' unless @socket.open?
end
end

raise RemoteError, 'disconnected'
end
end




if __FILE__ == $0
require "test/unit"

class TestCase < Test::Unit::TestCase

def setup
$socket ||= MessagePipeSocket.new 'localhost', 9191
$socket ||= MessagePipe.new(TcpTransport.new('localhost', 9191))
end

def test_simple_rpc
Expand All @@ -57,19 +74,19 @@ def test_rpc_with_params
end

def test_throw_exception
assert_raise(MessagePipeSocket::RemoteError) do
assert_raise(MessagePipe::RemoteError) do
$socket.call :throw
end
end

def test_cannot_call_non_existing_method
assert_raise(MessagePipeSocket::RemoteError) do
assert_raise(MessagePipe::RemoteError) do
$socket.call :does_not_exist
end
end

def test_cannot_call_private_method
assert_raise(MessagePipeSocket::RemoteError) do
assert_raise(MessagePipe::RemoteError) do
$socket.call :private_method
end
end
Expand Down
5 changes: 5 additions & 0 deletions server.rb
Expand Up @@ -55,6 +55,11 @@ def hi
'hello'
end

def echo(string)
string
end


def throw
raise StandardError, 'hell'
end
Expand Down

0 comments on commit c0c890c

Please sign in to comment.