Skip to content

Commit

Permalink
Renamed to script-exchange, and generalised to support python etc as …
Browse files Browse the repository at this point in the history
…well as js
  • Loading branch information
tonyg committed Mar 7, 2010
1 parent 5b3b7b7 commit 32cc375
Show file tree
Hide file tree
Showing 13 changed files with 246 additions and 74 deletions.
4 changes: 2 additions & 2 deletions COPYING
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,6 @@
This package, js-exchange, an exchange type plugin for use with This package, script-exchange, an exchange type plugin for use with
RabbitMQ, is licensed under the MPL. For the MPL, please see RabbitMQ, is licensed under the MPL. For the MPL, please see
LICENSE-MPL-js-exchange. LICENSE-MPL-script-exchange.


If you have any questions regarding licensing, please contact If you have any questions regarding licensing, please contact
info@rabbitmq.com. info@rabbitmq.com.
2 changes: 1 addition & 1 deletion LICENSE-MPL-js-exchange → LICENSE-MPL-script-exchange
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ EXHIBIT A -Mozilla Public License.
License for the specific language governing rights and limitations License for the specific language governing rights and limitations
under the License. under the License.


The Original Code is js-exchange. The Original Code is script-exchange.


The Initial Developers of the Original Code are LShift Ltd and The Initial Developers of the Original Code are LShift Ltd and
Tony Garnock-Jones. Tony Garnock-Jones.
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,4 @@
PACKAGE=rabbit_js_exchange PACKAGE=rabbit_script_exchange
DEPS=rabbitmq-server rabbitmq-erlang-client erlang-rfc4627 DEPS=rabbitmq-server rabbitmq-erlang-client erlang-rfc4627
EXTRA_PACKAGE_DIRS=priv EXTRA_PACKAGE_DIRS=priv


Expand Down
5 changes: 3 additions & 2 deletions README.md
Original file line number Original file line Diff line number Diff line change
@@ -1,3 +1,4 @@
## RabbitMQ Javascript exchange plugin ## RabbitMQ Script Exchange Plugin


Requires Spidermonkey. I've been using version "1.8.0 pre-release 1 2007-10-03". Requires Spidermonkey for Javascript support. I've been using version
"1.8.0 pre-release 1 2007-10-03".
11 changes: 0 additions & 11 deletions ebin/rabbit_js_exchange.app

This file was deleted.

12 changes: 12 additions & 0 deletions ebin/rabbit_script_exchange.app
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,12 @@
{application, rabbit_script_exchange,
[{description, "RabbitMQ Script Exchange Plugin"},
{vsn, "0.01"},
{modules, [script_exchange,
script_instance,
script_instance_manager,
script_manager_sup]},
{registered, []},
{env, [{languages, [{<<"text/javascript">>, [{command_line, "js js_exchange_boot.js"}]},
{<<"text/x-python">>, [{command_line, "python py_exchange_boot.py"}]}]},
{max_instance_count, 3}]},
{applications, [kernel, stdlib, rabbit, mnesia]}]}.
15 changes: 8 additions & 7 deletions examples/d.py → examples/example_js.py
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
ch = conn.channel() ch = conn.channel()


# try: # try:
# print ch.exchange_delete(exchange='x-js') # print ch.exchange_delete(exchange='x-script')
# except pika.exceptions.ChannelClosed: # except pika.exceptions.ChannelClosed:
# ch = conn.channel() # ch = conn.channel()


print ch.exchange_declare(exchange='x-js', type='x-js', arguments={ print ch.exchange_declare(exchange='x-script', type='x-script', arguments={
"type": "text/javascript",
"definition": r""" "definition": r"""
var counter = 0; var counter = 0;
function (msg) { function (msg) {
Expand All @@ -29,11 +30,11 @@
""" """
}) })


print ch.queue_declare(queue='x-js-q', auto_delete=True) print ch.queue_declare(queue='x-script-q', auto_delete=True)
print ch.queue_bind(queue='x-js-q', exchange='x-js', routing_key='hi') print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='hi')
print ch.queue_bind(queue='x-js-q', exchange='x-js', routing_key='*.y.#') print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='*.y.#')
print ch.queue_bind(queue='x-js-q', exchange='x-js', routing_key='x.#') print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='x.#')
print ch.basic_publish(exchange='x-js', routing_key='testrk', body='testbody', print ch.basic_publish(exchange='x-script', routing_key='testrk', body='testbody',
properties=pika.BasicProperties(content_type='text/plain')) properties=pika.BasicProperties(content_type='text/plain'))


conn.close() conn.close()
Expand Down
41 changes: 41 additions & 0 deletions examples/example_python.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,41 @@
#!/usr/bin/env python
import sys
sys.path.append('../pika')
import pika
import asyncore

conn = pika.AsyncoreConnection(pika.ConnectionParameters(
'127.0.0.1',
credentials=pika.PlainCredentials('guest', 'guest')))

ch = conn.channel()

# try:
# print ch.exchange_delete(exchange='x-script')
# except pika.exceptions.ChannelClosed:
# ch = conn.channel()

print ch.exchange_declare(exchange='x-script', type='x-script', arguments={
"type": "text/x-python",
"definition": r"""
counter = 0
def handler(msg):
global counter
msg.fanout()
msg.direct("hi")
msg.topic("x.y." + msg.routing_key)
msg.properties['user_id'] = "THISISTHEUSER"
msg.body = str(counter) + "\n" + msg.body
counter = counter + 1
"""
})

print ch.queue_declare(queue='x-script-q', auto_delete=True)
print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='hi')
print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='*.y.#')
print ch.queue_bind(queue='x-script-q', exchange='x-script', routing_key='x.#')
print ch.basic_publish(exchange='x-script', routing_key='testrk', body='testbody',
properties=pika.BasicProperties(content_type='text/plain'))

conn.close()
asyncore.loop()
92 changes: 92 additions & 0 deletions priv/py_exchange_boot.py
Original file line number Original file line Diff line number Diff line change
@@ -0,0 +1,92 @@
import simplejson
import sys

modules = {}

def send_obj(id, op, payload):
print simplejson.dumps({'id': id, 'op': op, 'payload': payload})
sys.stdout.flush()

def dbg(v):
send_obj(None, "cast", ["io", "format", ["~p~n", [v]]])

def info(v):
send_obj(None, "cast", ["error_logger", "info_report", [v]])

def log_error(report):
send_obj(None, "cast", ["error_logger", "error_report", [report]])

def do_cast(payload):
(m, f, a) = payload
m = modules[m]
f = getattr(m, f)
return f(*a)

def mainloop1():
try:
line = raw_input()
except EOFError:
return False
req = simplejson.loads(line)
return dispatch_request(req)

def dispatch_request(req):
op = req['op']
if op == "call":
send_obj(req['id'], "reply", do_cast(req['payload']))
elif op == "cast":
do_cast(req['payload'])
else:
log_error({"message": "Invalid request sent to js instance", "req": req})
return True

def mainloop():
try:
while mainloop1():
pass
except:
import traceback
log_error({"message": "Exception in mainloop", "e": traceback.format_exc()})

###########################################################################

class Message(object):
def __init__(self, rk, props, body):
self.routing_key = rk
self.properties = props
self.body = body
self.routing_actions = []

def fanout(self):
self.routing_actions.append("fanout")

def direct(self, rk):
self.routing_actions.append(["direct", rk])

def topic(self, rk):
self.routing_actions.append(["topic", rk])

class Exchange:
def __init__(self):
self.handlers = {}

def validate(self, xname, args):
if not 'definition' in args:
return False
return True

def create(self, xname, args):
vars = {}
exec args['definition'] in globals(), vars
self.handlers[','.join(xname)] = vars['handler']

def publish(self, xname, rk, props, body):
handler = self.handlers[','.join(xname)]
msg = Message(rk, props, body)
handler(msg)
return [msg.properties, msg.body, msg.routing_actions]

modules['Exchange'] = Exchange()

info("Python instance started OK")
mainloop()
37 changes: 22 additions & 15 deletions src/rabbit_exchange_type_js.erl → src/script_exchange.erl
Original file line number Original file line Diff line number Diff line change
@@ -1,9 +1,9 @@
-module(rabbit_exchange_type_js). -module(script_exchange).
-include_lib("rfc4627_jsonrpc/include/rfc4627.hrl"). -include_lib("rfc4627_jsonrpc/include/rfc4627.hrl").
-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit_framing.hrl").


-define(EXCHANGE_TYPE_BIN, <<"x-js">>). -define(EXCHANGE_TYPE_BIN, <<"x-script">>).


-rabbit_boot_step({?MODULE, -rabbit_boot_step({?MODULE,
[{mfa, {rabbit_exchange_type_registry, register, [?EXCHANGE_TYPE_BIN, ?MODULE]}}, [{mfa, {rabbit_exchange_type_registry, register, [?EXCHANGE_TYPE_BIN, ?MODULE]}},
Expand Down Expand Up @@ -47,7 +47,7 @@ do_routing_action(#exchange{name = Name}, Delivery, [<<"direct">>, RK]) ->
do_routing_action(Exchange, Delivery, [<<"topic">>, RK]) -> do_routing_action(Exchange, Delivery, [<<"topic">>, RK]) ->
rabbit_exchange_type_topic:publish(Exchange, update_delivery_rk(Delivery, RK)); rabbit_exchange_type_topic:publish(Exchange, update_delivery_rk(Delivery, RK));
do_routing_action(Exchange, _Delivery, Action) -> do_routing_action(Exchange, _Delivery, Action) ->
error_logger:error_report({bad_js_exchange_routing_action, Exchange, Action}), error_logger:error_report({bad_script_exchange_routing_action, Exchange, Action}),
{unroutable, []}. {unroutable, []}.


merge_pieces([]) -> merge_pieces([]) ->
Expand All @@ -68,6 +68,11 @@ choose_result(routed, _) -> routed;
choose_result(_, routed) -> routed; choose_result(_, routed) -> routed;
choose_result(V, _) -> V. choose_result(V, _) -> V.


script_manager_pid(#exchange{arguments = Args}) ->
{value, {_, _, MimeTypeBin}} = lists:keysearch(<<"type">>, 1, Args),
{ok, Pid} = script_manager_sup:lookup(MimeTypeBin),
Pid.

publish(Exchange = #exchange{name = Name}, publish(Exchange = #exchange{name = Name},
Delivery = #delivery{message = Message0 = #basic_message{ Delivery = #delivery{message = Message0 = #basic_message{
routing_key = RK, routing_key = RK,
Expand All @@ -77,30 +82,32 @@ publish(Exchange = #exchange{name = Name},
properties = Properties, properties = Properties,
payload_fragments_rev = PayloadRev payload_fragments_rev = PayloadRev
} = rabbit_binary_parser:ensure_content_decoded(Content0), } = rabbit_binary_parser:ensure_content_decoded(Content0),
case js_instance_manager:call(<<"Exchange">>, <<"publish">>, case script_instance_manager:call(
[name_to_js(Name), script_manager_pid(Exchange), <<"Exchange">>, <<"publish">>,
RK, [name_to_js(Name),
?RFC4627_FROM_RECORD('P_basic', Properties), RK,
list_to_binary(lists:reverse(PayloadRev))]) of ?RFC4627_FROM_RECORD('P_basic', Properties),
list_to_binary(lists:reverse(PayloadRev))]) of
{ok, [NewProps, NewBody, Actions]} -> {ok, [NewProps, NewBody, Actions]} ->
NewContent = js_to_content(Content0, NewProps, NewBody), NewContent = js_to_content(Content0, NewProps, NewBody),
NewDelivery = Delivery#delivery{message = Message0#basic_message{content = NewContent}}, NewDelivery = Delivery#delivery{message = Message0#basic_message{content = NewContent}},
merge_pieces([do_routing_action(Exchange, NewDelivery, Action) merge_pieces([do_routing_action(Exchange, NewDelivery, Action)
|| Action <- Actions]); || Action <- Actions]);
Other -> Other ->
error_logger:error_report({bad_reply_from_js_exchange_publish, Other}), error_logger:error_report({bad_reply_from_script_exchange_publish, Other}),
%% TODO FIXME do something more sensible here %% TODO FIXME do something more sensible here
[] []
end. end.


validate_or_create(MethodName, #exchange{name = Name, arguments = Args}) -> validate(X = #exchange{name = Name, arguments = Args}) ->
js_instance_manager:call(<<"Exchange">>, MethodName, {ok, true} = script_instance_manager:call(script_manager_pid(X), <<"Exchange">>, <<"validate">>,
[name_to_js(Name), [name_to_js(Name), table_to_js(Args)]),
table_to_js(Args)]),
ok. ok.


validate(X) -> validate_or_create(<<"validate">>, X). create(X = #exchange{name = Name, arguments = Args}) ->
create(X) -> validate_or_create(<<"create">>, X). {ok, _} = script_instance_manager:call(script_manager_pid(X), <<"Exchange">>, <<"create">>,
[name_to_js(Name), table_to_js(Args)]),
ok.


recover(X, _Bs) -> recover(X, _Bs) ->
create(X). create(X).
Expand Down
22 changes: 11 additions & 11 deletions src/js_instance.erl → src/script_instance.erl
Original file line number Original file line Diff line number Diff line change
@@ -1,4 +1,4 @@
-module(js_instance). -module(script_instance).


-behaviour(gen_server). -behaviour(gen_server).


Expand All @@ -11,8 +11,8 @@


%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------


start_link(ScriptName) -> start_link(CommandLine) ->
gen_server:start_link(?MODULE, [ScriptName], []). gen_server:start_link(?MODULE, [CommandLine], []).


call(Pid, M, F, A) -> call(Pid, M, F, A) ->
gen_server:call(Pid, {call, M, F, A}). gen_server:call(Pid, {call, M, F, A}).
Expand All @@ -29,8 +29,8 @@ send_obj(Id, Op, Payload, #state{port = Port}) ->
[rfc4627:encode({obj, [{id, Id}, {op, Op}, {payload, Payload}]}), "\n"]), [rfc4627:encode({obj, [{id, Id}, {op, Op}, {payload, Payload}]}), "\n"]),
ok. ok.


bad_js_message(Message, State) -> bad_script_message(Message, State) ->
{stop, {bad_js_message, Message}, State}. {stop, {bad_script_message, Message}, State}.


handle_msg(Message, State = #state{requests = OldReq}) -> handle_msg(Message, State = #state{requests = OldReq}) ->
Id = rfc4627:get_field(Message, "id", null), Id = rfc4627:get_field(Message, "id", null),
Expand All @@ -40,7 +40,7 @@ handle_msg(Message, State = #state{requests = OldReq}) ->
<<"reply">> -> <<"reply">> ->
case dict:find(Id, OldReq) of case dict:find(Id, OldReq) of
error -> error ->
bad_js_message(Message, State); bad_script_message(Message, State);
{ok, From} -> {ok, From} ->
gen_server:reply(From, {ok, Payload}), gen_server:reply(From, {ok, Payload}),
{noreply, State#state{requests = dict:erase(Id, OldReq)}} {noreply, State#state{requests = dict:erase(Id, OldReq)}}
Expand All @@ -50,15 +50,15 @@ handle_msg(Message, State = #state{requests = OldReq}) ->
apply(list_to_atom(binary_to_list(MBin)), list_to_atom(binary_to_list(FBin)), A), apply(list_to_atom(binary_to_list(MBin)), list_to_atom(binary_to_list(FBin)), A),
{noreply, State}; {noreply, State};
_Other -> _Other ->
bad_js_message(Message, State) bad_script_message(Message, State)
end. end.


%%--------------------------------------------------------------------------- %%---------------------------------------------------------------------------


init([ScriptName]) -> init([CommandLine]) ->
ScriptDir = js_instance_manager:script_dir(), ScriptDir = script_instance_manager:script_dir(),
error_logger:info_report({starting_js, ScriptDir, ScriptName}), error_logger:info_report({starting_script, ScriptDir, CommandLine}),
Port = open_port({spawn, "js " ++ ScriptName}, Port = open_port({spawn, CommandLine},
[{line, 1024}, {cd, ScriptDir}, use_stdio, eof]), [{line, 1024}, {cd, ScriptDir}, use_stdio, eof]),
{ok, #state{port = Port, {ok, #state{port = Port,
fragments_rev = [], fragments_rev = [],
Expand Down
Loading

0 comments on commit 32cc375

Please sign in to comment.