Skip to content

Commit

Permalink
Version 0.3.4
Browse files Browse the repository at this point in the history
Renamed input_queue to message_queue.
Added optional timeout to Client.
Server now deletes message queue when started.
PHP: Fixed exception handling in the Client.
Python: Example code can now run without installation.
  • Loading branch information
nfarring committed Apr 4, 2012
1 parent 2366bd6 commit b0caa18
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 70 deletions.
28 changes: 17 additions & 11 deletions README.markdown
Expand Up @@ -61,7 +61,7 @@ Ruby Usage

```ruby
redis_server = Redis.new
input_queue = 'calc'
message_queue = 'calc'
calculator = RedisRPC::Client.new redis_server, 'calc'
calculator.clr
calculator.add 5
Expand All @@ -75,9 +75,9 @@ assert calculator.val == 4

```ruby
redis_server = Redis.new
input_queue = 'calc'
message_queue = 'calc'
local_object = Calculator.new
server = RedisRPC::Server.new redis_server, input_queue, local_object
server = RedisRPC::Server.new redis_server, message_queue, local_object
server.run
```

Expand All @@ -90,8 +90,8 @@ PHP Usage

```php
$redis_server = new Predis\Client();
$input_queue = 'calc';
$calculator = new RedisRPC\Client($redis_server, $input_queue);
$message_queue = 'calc';
$calculator = new RedisRPC\Client($redis_server, $message_queue);
$calculator->clr();
$calculator->add(5);
$calculator->sub(3);
Expand All @@ -104,9 +104,9 @@ assert($calculator->val() == 4);

```php
$redis_server = new Predis\Client();
$input_queue = 'calc';
$message_queue = 'calc';
$local_object = new Calculator();
$server = new RedisRPC\Server($redis_server, $input_queue, $local_object);
$server = new RedisRPC\Server($redis_server, $message_queue, $local_object);
$server->run();
```

Expand All @@ -117,8 +117,8 @@ Python Usage

```python
redis_server = redis.Redis()
input_queue = 'calc'
calculator = redisrpc.Client(redis_server, input_queue)
message_queue = 'calc'
calculator = redisrpc.Client(redis_server, message_queue)
calculator.clr()
calculator.add(5)
calculator.sub(3)
Expand All @@ -131,9 +131,9 @@ assert calculator.val() == 4

```python
redis_server = redis.Redis()
input_queue = 'calc'
message_queue = 'calc'
local_object = calc.Calculator()
server = redisrpc.Server(redis_server, input_queue, local_object)
server = redisrpc.Server(redis_server, message_queue, local_object)
server.run()
```

Expand Down Expand Up @@ -226,6 +226,12 @@ This software is available under the [GPLv3][GPLv3] or later.
Changelog
----------
Version 0.3.4
* Client now supports optional timeout.
* Server now deletes message queue when starting.
* PHP: Fixed exception handling.
Version 0.3.3
* Ruby: Added a Ruby library implementation.
Expand Down
5 changes: 3 additions & 2 deletions php/examples/client.php
Expand Up @@ -47,8 +47,9 @@ function do_calculations($calculator) {

// 2. Remote object, should act like local object
$redis_server = new Predis\Client();
$input_queue = 'calc';
$calculator = new RedisRPC\Client($redis_server, $input_queue);
$message_queue = 'calc';
$timeout = 1;
$calculator = new RedisRPC\Client($redis_server, $message_queue, $timeout);
do_calculations($calculator);
print "success!\n";

Expand Down
4 changes: 2 additions & 2 deletions php/examples/server.php
Expand Up @@ -33,9 +33,9 @@
require_once 'calc.php';

$redis_server = new Predis\Client();
$input_queue = 'calc';
$message_queue = 'calc';
$local_object = new Calculator();
$server = new RedisRPC\Server($redis_server, $input_queue, $local_object);
$server = new RedisRPC\Server($redis_server, $message_queue, $local_object);
$server->run();

?>
32 changes: 17 additions & 15 deletions php/src/RedisRPC/Client.php
Expand Up @@ -71,11 +71,12 @@ function random_string($size, $valid_chars='ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghij
class Client {

private $redis_server;
private $input_queue;
private $message_queue;

public function __construct($redis_server, $input_queue) {
public function __construct($redis_server, $message_queue, $timeout = 0) {
$this->redis_server = $redis_server;
$this->input_queue = $input_queue;
$this->message_queue = $message_queue;
$this->timeout = $timeout;
}

public function __call($name, $arguments) {
Expand All @@ -87,28 +88,29 @@ public function __call($name, $arguments) {
if (count($arguments) > 0) {
$function_call['args'] = $arguments;
}
$response_queue = "$this->input_queue:rpc:" . random_string(8);
$response_queue = "$this->message_queue:rpc:" . random_string(8);
$rpc_request = array(
'function_call' => $function_call,
'response_queue' => $response_queue
);
$message = json_encode($rpc_request);
debug_print("RPC Request: $message");
$this->redis_server->rpush($this->input_queue, $message);
$timeout_s = 0; # Block forever.
list($message_queue, $message) = $this->redis_server->blpop($response_queue, $timeout_s);
$this->redis_server->rpush($this->message_queue, $message);
$result = $this->redis_server->blpop($response_queue, $this->timeout);
if ($result == NULL) {
throw new TimeoutException();
}
list($message_queue, $message) = $result;
assert($message_queue == $response_queue);
debug_print("RPC Response: $message\n");
$rpc_response = json_decode($message);
if (array_key_exists('exception',$rpc_response) && $rpc_response->exception != NULL) {
throw new RemoteException($rpc_response->exception);
}
if (!array_key_exists('return_value',$rpc_response)) {
throw new RemoteException('Malformed RPC Response message');
}
return $rpc_response->return_value;
# exception = rpc_response.get('exception')
# if exception is not None:
# if DEBUG: print('exception: %r\n' % exception)
# raise RemoteException(exception)
# if 'return_value' not in rpc_response:
# raise RempteException('malformed RPC Response message: %r' % rpc_response)
# return rpc_response['return_value']

}
}

Expand Down
2 changes: 1 addition & 1 deletion php/src/RedisRPC/RemoteException.php
Expand Up @@ -20,7 +20,7 @@
/**
*
*/
class RemoteException extends Exception {
class RemoteException extends \Exception {
}

?>
15 changes: 8 additions & 7 deletions php/src/RedisRPC/Server.php
Expand Up @@ -40,33 +40,34 @@ function debug_print($string,$flag=NULL) {
class Server {

private $redis_server;
private $input_queue;
private $message_queue;
private $local_object;

/**
* Initializes a new server.
*
* @param mixed $redis_server Handle to a Redis server object.
* @param string $input_queue Name of Redis message queue.
* @param string $message_queue Name of Redis message queue.
* @param mixed $local_object Handle to local wrapped object that will receive the RPC calls.
*/
public function __construct($redis_server, $input_queue, $local_object) {
public function __construct($redis_server, $message_queue, $local_object) {
$this->redis_server = $redis_server;
$this->input_queue = $input_queue;
$this->message_queue = $message_queue;
$this->local_object = $local_object;
}

/**
* Starts the server.
*/
public function run() {
$timeout_s = 0;
$this->redis_server->del($this->message_queue);
$timeout = 0;
while (1) {
# Pop a message from the queue.
# Decode the message.
# Check that the function exists.
list($message_queue, $message) = $this->redis_server->blpop($this->input_queue, $timeout_s);
assert($message_queue == $this->input_queue);
list($message_queue, $message) = $this->redis_server->blpop($this->message_queue, $timeout);
assert($message_queue == $this->message_queue);
debug_print("RPC Request: $message");
$rpc_request = json_decode($message);
$response_queue = $rpc_request->response_queue;
Expand Down
26 changes: 26 additions & 0 deletions php/src/RedisRPC/TimeoutException.php
@@ -0,0 +1,26 @@
<?php

# Copyright (C) 2012. Nathan Farrington <nfarring@gmail.com>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

namespace RedisRPC;

/**
*
*/
class TimeoutException extends \Exception {
}

?>
7 changes: 5 additions & 2 deletions python/examples/client.py
Expand Up @@ -5,6 +5,9 @@
import sys

import redis

# Allow this script to run without installing redisrpc.
sys.path.append('..')
import redisrpc

import calc
Expand Down Expand Up @@ -34,7 +37,7 @@ def do_calculations(calculator):

# 2. Remote object, should act like local object
redis_server = redis.Redis()
input_queue = 'calc'
calculator = redisrpc.Client(redis_server, input_queue)
message_queue = 'calc'
calculator = redisrpc.Client(redis_server, message_queue, timeout=1)
do_calculations(calculator)
print('success!')
7 changes: 5 additions & 2 deletions python/examples/server.py
Expand Up @@ -4,6 +4,9 @@
import sys

import redis

# Allow this script to run without installing redisrpc.
sys.path.append('..')
import redisrpc

import calc
Expand All @@ -14,7 +17,7 @@


redis_server = redis.Redis()
input_queue = 'calc'
message_queue = 'calc'
local_object = calc.Calculator()
server = redisrpc.Server(redis_server, input_queue, local_object)
server = redisrpc.Server(redis_server, message_queue, local_object)
server.run()
32 changes: 21 additions & 11 deletions python/redisrpc.py
Expand Up @@ -26,7 +26,8 @@
__all__ = [
'Client',
'Server',
'RemoteException'
'RemoteException',
'TimeoutException'
]


Expand Down Expand Up @@ -90,19 +91,21 @@ def as_python_code(self):
class Client(object):
"""Calls remote functions using Redis as a message queue."""

def __init__(self, redis_server, input_queue):
def __init__(self, redis_server, message_queue, timeout=0):
self.redis_server = redis_server
self.input_queue = input_queue
self.message_queue = message_queue
self.timeout = timeout

def call(self, method_name, *args, **kwargs):
function_call = FunctionCall(method_name, args, kwargs)
response_queue = self.input_queue + ':rpc:' + random_string()
response_queue = self.message_queue + ':rpc:' + random_string()
rpc_request = dict(function_call=function_call, response_queue=response_queue)
message = json.dumps(rpc_request)
logging.debug('RPC Request: %s' % message)
self.redis_server.rpush(self.input_queue, message)
timeout_s = 0 # Block forever.
message_queue, message = self.redis_server.blpop(response_queue, timeout_s)
self.redis_server.rpush(self.message_queue, message)
result = self.redis_server.blpop(response_queue, self.timeout)
if result is None: raise TimeoutException()
message_queue, message = result
assert message_queue == response_queue
logging.debug('RPC Response: %s' % message)
rpc_response = json.loads(message)
Expand All @@ -121,15 +124,17 @@ def __getattr__(self, name):
class Server(object):
"""Executes function calls received from a Redis queue."""

def __init__(self, redis_server, input_queue, local_object):
def __init__(self, redis_server, message_queue, local_object):
self.redis_server = redis_server
self.input_queue = input_queue
self.message_queue = message_queue
self.local_object = local_object

def run(self):
# Flush the message queue.
self.redis_server.delete(self.message_queue)
while True:
message_queue, message = self.redis_server.blpop(self.input_queue)
assert message_queue == self.input_queue
message_queue, message = self.redis_server.blpop(self.message_queue)
assert message_queue == self.message_queue
logging.debug('RPC Request: %s' % message)
rpc_request = json.loads(message)
response_queue = rpc_request['response_queue']
Expand All @@ -149,3 +154,8 @@ def run(self):
class RemoteException(Exception):
"""Raised by an RPC client when an exception occurs on the RPC server."""
pass


class TimeoutException(Exception):
"""Raised by an RPC client when a timeout occurs."""
pass
2 changes: 1 addition & 1 deletion python/setup.py
@@ -1,7 +1,7 @@
from setuptools import setup

# Ref: http://semver.org/
VERSION='0.3.3'
VERSION='0.3.4'

AUTHOR='Nathan Farrington'
AUTHOR_EMAIL='nfarring@gmail.com'
Expand Down
5 changes: 3 additions & 2 deletions ruby/examples/client.rb
Expand Up @@ -32,7 +32,8 @@ def do_calculations(calculator)

# 2. Remote object, should act like local object
redis_server = Redis.new
input_queue = 'calc'
calculator = RedisRPC::Client.new redis_server, 'calc'
message_queue = 'calc'
timeout = 1
calculator = RedisRPC::Client.new redis_server, message_queue, timeout
do_calculations calculator
puts 'success!'
4 changes: 2 additions & 2 deletions ruby/examples/server.rb
Expand Up @@ -6,7 +6,7 @@
require_relative './calc'

redis_server = Redis.new
input_queue = 'calc'
message_queue = 'calc'
local_object = Calculator.new
server = RedisRPC::Server.new redis_server, input_queue, local_object
server = RedisRPC::Server.new redis_server, message_queue, local_object
server.run

0 comments on commit b0caa18

Please sign in to comment.