Browse files

first commit

  • Loading branch information...
0 parents commit fef459a033b229e1a128b3ff61849f9c4f9bed48 @videlalvaro committed Feb 26, 2011
3 .gitignore
@@ -0,0 +1,3 @@
+deps/*
+ebin/*
+erl_crash.dump
13 priv/log_watcher.config
@@ -0,0 +1,13 @@
+[{consumer, [
+
+ {consumer_mod, xmpp_consumer},
+
+ {'amqp.params',
+
+ [{host, <<"localhost">>}, {port, 5672}, {user, <<"guest">>}, {password, <<"guest">>}, {virtual_host, <<"/">>}]},
+
+ {exchange_options,
+
+ [{exchange, <<"amq.rabbitmq.log">>}, {type, "topic"}]}
+
+]}].
1 rebar
13 rebar.config
@@ -0,0 +1,13 @@
+{sub_dirs, ["rel"]}.
+
+% {deps, [
+% {amqp_client, "", {hg, "rabbitmq-erlang-client", "amqp_client"}}
+% ]}.
+
+{erl_opts, [debug_info, fail_on_warning,
+ {i, "include"},
+ {i, "deps/amqp_client/include"},
+ {i, "deps/amqp_client/ebin"},
+ {i, "deps/amqp_client/include/rabbit_common/include"},
+ {i, "deps/amqp_client/include/rabbit_common/ebin"}
+]}.
11 rel/files/app.config
@@ -0,0 +1,11 @@
+[
+ %% SASL config
+ {sasl, [
+ {sasl_error_logger, {file, "log/sasl-error.log"}},
+ {errlog_type, error},
+ {error_logger_mf_dir, "log/sasl"}, % Log directory
+ {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size
+ {error_logger_mf_maxfiles, 5} % 5 files max
+ ]}
+].
+
34 rel/files/erl
@@ -0,0 +1,34 @@
+#!/bin/bash
+
+## This script replaces the default "erl" in erts-VSN/bin. This is necessary
+## as escript depends on erl and in turn, erl depends on having access to a
+## bootscript (start.boot). Note that this script is ONLY invoked as a side-effect
+## of running escript -- the embedded node bypasses erl and uses erlexec directly
+## (as it should).
+##
+## Note that this script makes the assumption that there is a start_clean.boot
+## file available in $ROOTDIR/release/VSN.
+
+# Determine the abspath of where this script is executing from.
+ERTS_BIN_DIR=$(cd ${0%/*} && pwd)
+
+# Now determine the root directory -- this script runs from erts-VSN/bin,
+# so we simply need to strip off two dirs from the end of the ERTS_BIN_DIR
+# path.
+ROOTDIR=${ERTS_BIN_DIR%/*/*}
+
+# Parse out release and erts info
+START_ERL=`cat $ROOTDIR/releases/start_erl.data`
+ERTS_VSN=${START_ERL% *}
+APP_VSN=${START_ERL#* }
+
+BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin
+EMU=beam
+PROGNAME=`echo $0 | sed 's/.*\\///'`
+CMD="$BINDIR/erlexec"
+export EMU
+export ROOTDIR
+export BINDIR
+export PROGNAME
+
+exec $CMD -boot $ROOTDIR/releases/$APP_VSN/start_clean ${1+"$@"}
138 rel/files/nodetool
@@ -0,0 +1,138 @@
+%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*-
+%% ex: ft=erlang ts=4 sw=4 et
+%% -------------------------------------------------------------------
+%%
+%% nodetool: Helper Script for interacting with live nodes
+%%
+%% -------------------------------------------------------------------
+
+main(Args) ->
+ ok = start_epmd(),
+ %% Extract the args
+ {RestArgs, TargetNode} = process_args(Args, [], undefined),
+
+ %% See if the node is currently running -- if it's not, we'll bail
+ case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of
+ {true, pong} ->
+ ok;
+ {_, pang} ->
+ io:format("Node ~p not responding to pings.\n", [TargetNode]),
+ halt(1)
+ end,
+
+ case RestArgs of
+ ["ping"] ->
+ %% If we got this far, the node already responsed to a ping, so just dump
+ %% a "pong"
+ io:format("pong\n");
+ ["stop"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]);
+ ["restart"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, restart, [], 60000)]);
+ ["reboot"] ->
+ io:format("~p\n", [rpc:call(TargetNode, init, reboot, [], 60000)]);
+ ["rpc", Module, Function | RpcArgs] ->
+ case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
+ [RpcArgs], 60000) of
+ ok ->
+ ok;
+ {badrpc, Reason} ->
+ io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
+ halt(1);
+ _ ->
+ halt(1)
+ end;
+ ["rpcterms", Module, Function, ArgsAsString] ->
+ case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function),
+ consult(ArgsAsString), 60000) of
+ {badrpc, Reason} ->
+ io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]),
+ halt(1);
+ Other ->
+ io:format("~p\n", [Other])
+ end;
+ Other ->
+ io:format("Other: ~p\n", [Other]),
+ io:format("Usage: nodetool {ping|stop|restart|reboot}\n")
+ end,
+ net_kernel:stop().
+
+process_args([], Acc, TargetNode) ->
+ {lists:reverse(Acc), TargetNode};
+process_args(["-setcookie", Cookie | Rest], Acc, TargetNode) ->
+ erlang:set_cookie(node(), list_to_atom(Cookie)),
+ process_args(Rest, Acc, TargetNode);
+process_args(["-name", TargetName | Rest], Acc, _) ->
+ ThisNode = append_node_suffix(TargetName, "_maint_"),
+ {ok, _} = net_kernel:start([ThisNode, longnames]),
+ process_args(Rest, Acc, nodename(TargetName));
+process_args(["-sname", TargetName | Rest], Acc, _) ->
+ ThisNode = append_node_suffix(TargetName, "_maint_"),
+ {ok, _} = net_kernel:start([ThisNode, shortnames]),
+ process_args(Rest, Acc, nodename(TargetName));
+process_args([Arg | Rest], Acc, Opts) ->
+ process_args(Rest, [Arg | Acc], Opts).
+
+
+start_epmd() ->
+ [] = os:cmd(epmd_path() ++ " -daemon"),
+ ok.
+
+epmd_path() ->
+ ErtsBinDir = filename:dirname(escript:script_name()),
+ Name = "epmd",
+ case os:find_executable(Name, ErtsBinDir) of
+ false ->
+ case os:find_executable(Name) of
+ false ->
+ io:format("Could not find epmd.~n"),
+ halt(1);
+ GlobalEpmd ->
+ GlobalEpmd
+ end;
+ Epmd ->
+ Epmd
+ end.
+
+
+nodename(Name) ->
+ case string:tokens(Name, "@") of
+ [_Node, _Host] ->
+ list_to_atom(Name);
+ [Node] ->
+ [_, Host] = string:tokens(atom_to_list(node()), "@"),
+ list_to_atom(lists:concat([Node, "@", Host]))
+ end.
+
+append_node_suffix(Name, Suffix) ->
+ case string:tokens(Name, "@") of
+ [Node, Host] ->
+ list_to_atom(lists:concat([Node, Suffix, os:getpid(), "@", Host]));
+ [Node] ->
+ list_to_atom(lists:concat([Node, Suffix, os:getpid()]))
+ end.
+
+
+%%
+%% Given a string or binary, parse it into a list of terms, ala file:consult/0
+%%
+consult(Str) when is_list(Str) ->
+ consult([], Str, []);
+consult(Bin) when is_binary(Bin)->
+ consult([], binary_to_list(Bin), []).
+
+consult(Cont, Str, Acc) ->
+ case erl_scan:tokens(Cont, Str, 0) of
+ {done, Result, Remaining} ->
+ case Result of
+ {ok, Tokens, _} ->
+ {ok, Term} = erl_parse:parse_term(Tokens),
+ consult([], Remaining, [Term | Acc]);
+ {eof, _Other} ->
+ lists:reverse(Acc);
+ {error, Info, _} ->
+ {error, Info}
+ end;
+ {more, Cont1} ->
+ consult(Cont1, eof, Acc)
+ end.
155 rel/files/rmq_patterns
@@ -0,0 +1,155 @@
+#!/bin/bash
+# -*- tab-width:4;indent-tabs-mode:nil -*-
+# ex: ts=4 sw=4 et
+
+RUNNER_SCRIPT_DIR=$(cd ${0%/*} && pwd)
+
+RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*}
+RUNNER_ETC_DIR=$RUNNER_BASE_DIR/etc
+RUNNER_LOG_DIR=$RUNNER_BASE_DIR/log
+# Note the trailing slash on $PIPE_DIR/
+PIPE_DIR=/tmp/$RUNNER_BASE_DIR/
+RUNNER_USER=
+
+# Make sure this script is running as the appropriate user
+if [ ! -z "$RUNNER_USER" ] && [ `whoami` != "$RUNNER_USER" ]; then
+ exec sudo -u $RUNNER_USER -i $0 $@
+fi
+
+# Make sure CWD is set to runner base dir
+cd $RUNNER_BASE_DIR
+
+# Make sure log directory exists
+mkdir -p $RUNNER_LOG_DIR
+
+# Extract the target node name from node.args
+NAME_ARG=`grep -e '-[s]*name' $RUNNER_ETC_DIR/vm.args`
+if [ -z "$NAME_ARG" ]; then
+ echo "vm.args needs to have either -name or -sname parameter."
+ exit 1
+fi
+
+# Extract the target cookie
+COOKIE_ARG=`grep -e '-setcookie' $RUNNER_ETC_DIR/vm.args`
+if [ -z "$COOKIE_ARG" ]; then
+ echo "vm.args needs to have a -setcookie parameter."
+ exit 1
+fi
+
+# Identify the script name
+SCRIPT=`basename $0`
+
+# Parse out release and erts info
+START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data`
+ERTS_VSN=${START_ERL% *}
+APP_VSN=${START_ERL#* }
+
+# Add ERTS bin dir to our path
+ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin
+
+# Setup command to control the node
+NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG"
+
+# Check the first argument for instructions
+case "$1" in
+ start)
+ # Make sure there is not already a node running
+ RES=`$NODETOOL ping`
+ if [ "$RES" = "pong" ]; then
+ echo "Node is already running!"
+ exit 1
+ fi
+ HEART_COMMAND="$RUNNER_BASE_DIR/bin/$SCRIPT start"
+ export HEART_COMMAND
+ mkdir -p $PIPE_DIR
+ $ERTS_PATH/run_erl -daemon $PIPE_DIR $RUNNER_LOG_DIR "exec $RUNNER_BASE_DIR/bin/$SCRIPT console" 2>&1
+ ;;
+
+ stop)
+ # Wait for the node to completely stop...
+ case `uname -s` in
+ Linux|Darwin|FreeBSD|DragonFly|NetBSD|OpenBSD)
+ # PID COMMAND
+ PID=`ps ax -o pid= -o command=|\
+ grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
+ ;;
+ SunOS)
+ # PID COMMAND
+ PID=`ps -ef -o pid= -o args=|\
+ grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'`
+ ;;
+ CYGWIN*)
+ # UID PID PPID TTY STIME COMMAND
+ PID=`ps -efW|grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $2}'`
+ ;;
+ esac
+ $NODETOOL stop
+ while `kill -0 $PID 2>/dev/null`;
+ do
+ sleep 1
+ done
+ ;;
+
+ restart)
+ ## Restart the VM without exiting the process
+ $NODETOOL restart
+ ;;
+
+ reboot)
+ ## Restart the VM completely (uses heart to restart it)
+ $NODETOOL reboot
+ ;;
+
+ ping)
+ ## See if the VM is alive
+ $NODETOOL ping
+ ;;
+
+ attach)
+ # Make sure a node IS running
+ RES=`$NODETOOL ping`
+ if [ "$RES" != "pong" ]; then
+ echo "Node is not running!"
+ exit 1
+ fi
+
+ shift
+ $ERTS_PATH/to_erl $PIPE_DIR
+ ;;
+
+ console|console_clean)
+ # .boot file typically just $SCRIPT (ie, the app name)
+ # however, for debugging, sometimes start_clean.boot is useful:
+ case "$1" in
+ console) BOOTFILE=$SCRIPT ;;
+ console_clean) BOOTFILE=start_clean ;;
+ esac
+ # Setup beam-required vars
+ ROOTDIR=$RUNNER_BASE_DIR
+ BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin
+ EMU=beam
+ PROGNAME=`echo $0 | sed 's/.*\\///'`
+ CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$BOOTFILE -embedded -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -- ${1+"$@"}"
+ export EMU
+ export ROOTDIR
+ export BINDIR
+ export PROGNAME
+
+ # Dump environment info for logging purposes
+ echo "Exec: $CMD"
+ echo "Root: $ROOTDIR"
+
+ # Log the startup
+ logger -t "$SCRIPT[$$]" "Starting up"
+
+ # Start the VM
+ exec $CMD
+ ;;
+
+ *)
+ echo "Usage: $SCRIPT {start|stop|restart|reboot|ping|console|attach}"
+ exit 1
+ ;;
+esac
+
+exit 0
22 rel/files/vm.args
@@ -0,0 +1,22 @@
+
+## Name of the node
+## -name rmq_patterns@127.0.0.1
+-sname rmq_patterns
+
+## Cookie for distributed erlang
+-setcookie rmq_patterns
+
+## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive
+## (Disabled by default..use with caution!)
+##-heart
+
+## Enable kernel poll and a few async threads
++K true
++A 5
+
+## Increase number of concurrent ports/sockets
+-env ERL_MAX_PORTS 4096
+
+## Tweak GC to run more often
+-env ERL_FULLSWEEP_AFTER 10
+
34 rel/reltool.config
@@ -0,0 +1,34 @@
+{sys, [
+ {lib_dirs, ["../../"]},
+ {rel, "rmq_patterns", "1",
+ [
+ kernel,
+ stdlib,
+ sasl,
+ rmq_patterns
+ ]},
+ {rel, "start_clean", "",
+ [
+ kernel,
+ stdlib
+ ]},
+ {boot_rel, "rmq_patterns"},
+ {profile, embedded},
+ {excl_sys_filters, ["^bin/.*",
+ "^erts.*/bin/(dialyzer|typer)"]},
+ {app, rmq_patterns, [{incl_cond, include}]},
+ {app, sasl, [{incl_cond, include}]}
+ ]}.
+
+{target_dir, "rmq_patterns"}.
+
+{overlay, [
+ {mkdir, "log/sasl"},
+ {copy, "files/erl", "{{erts_vsn}}/bin/erl"},
+ {copy, "files/nodetool", "{{erts_vsn}}/bin/nodetool"},
+ {copy, "files/rmq_patterns", "bin/rmq_patterns"},
+ {copy, "files/app.config", "etc/app.config"},
+ {copy, "files/vm.args", "etc/vm.args"},
+ {copy, "deps/amqp_client.ez", "lib/amqp_client.ez"},
+ {copy, "deps/rabbit_common.ez", "lib/rabbit_common.ez "}
+ ]}.
203 src/amqp_smart_proxy.erl
@@ -0,0 +1,203 @@
+%% @doc This is a utility module that is used to expose an arbitrary function
+%% via an asynchronous RPC over AMQP mechanism. It frees the implementor of
+%% a simple function from having to plumb this into AMQP. Note that the
+%% RPC server does not handle any data encoding, so it is up to the callback
+%% function to marshall and unmarshall message payloads accordingly.
+-module(amqp_smart_proxy).
+
+-behaviour(gen_server).
+
+-include("amqp_client.hrl").
+
+-export([init/1, terminate/2, code_change/3, handle_call/3,
+ handle_cast/2, handle_info/2]).
+-export([start/5]).
+-export([stop/1]).
+-export([stats/1]).
+
+-record(state, {channel,
+ handler,
+ proxy_queue,
+ proxy_exchange,
+ rpc_exchange,
+ correlation_id = 0,
+ continuations = dict:new(),
+ stats}).
+
+-record(proxied_req, {correlation_id,
+ reply_queue,
+ req_time}).
+
+-record(stats, {msgs_nb = 0,
+ time_total = 0}).
+
+%%--------------------------------------------------------------------------
+%% API
+%%--------------------------------------------------------------------------
+
+%% @spec (Connection, Queue, RpcHandler) -> RpcServer
+%% where
+%% Connection = pid()
+%% ProxyEx = binary()
+%% RpcHandler = binary()
+%% RpcHandler = function()
+%% RpcServer = pid()
+%% @doc Starts a new RPC server instance that receives requests via a
+%% specified queue and dispatches them to a specified handler function. This
+%% function returns the pid of the RPC server that can be used to stop the
+%% server.
+start(Connection, ProxyEx, RpcExchange, Fun, Opts) ->
+ {ok, Pid} = gen_server:start(?MODULE, [Connection, ProxyEx, RpcExchange, Fun], Opts),
+ Pid.
+
+%% @spec (RpcServer) -> ok
+%% where
+%% RpcServer = pid()
+%% @doc Stops an exisiting RPC server.
+stop(Pid) ->
+ gen_server:call(Pid, stop, infinity).
+
+stats(Pid) ->
+ gen_server:call(Pid, stats).
+
+%%--------------------------------------------------------------------------
+%% gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+init([Connection, ProxyEx, RpcExchange, Fun]) ->
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ %%setup reply queue to get messages from RPC Server.
+ #'queue.declare_ok'{queue = ReplyQ}
+ = amqp_channel:call(Channel, #'queue.declare'{}),
+ log("queue.declare", "ok"),
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = ReplyQ}, self()),
+ log("basic.consume", "ok"),
+
+ %%setup proxy exchange
+ ExDeclare = #'exchange.declare'{exchange=ProxyEx, durable = true},
+ #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
+
+ %%setup proxy queue to get messages from RPC Client
+ #'queue.declare_ok'{queue = ProxyQ}
+ = amqp_channel:call(Channel, #'queue.declare'{}),
+ log("queue.declare", "ok2"),
+ QueueBind = #'queue.bind'{queue = ProxyQ,
+ exchange = ProxyEx},
+ #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind),
+ log("queue.bind", "ok"),
+ amqp_channel:subscribe(Channel, #'basic.consume'{queue = ProxyQ}, self()),
+ log("subscribe", "ok"),
+
+ {ok, #state{channel = Channel, handler = Fun, proxy_queue = ReplyQ,
+ proxy_exchange = ProxyEx, rpc_exchange = RpcExchange, stats = #stats{}}}.
+
+%% @private
+handle_info(shutdown, State) ->
+ {stop, normal, State};
+
+%% @private
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+%% @private
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+%% @private
+%% handles message from the RPC Client and forwards to the RPC server
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag, exchange = ProxyEx},
+ #amqp_msg{props = Props} = Msg},
+ #state{handler = _Fun, channel = Channel, proxy_queue = ProxyQ,
+ proxy_exchange = ProxyEx, rpc_exchange = RpcExchange,
+ correlation_id = CorrelationId, continuations = Continuations,
+ stats = Stats} = State) ->
+
+ %%extract original message props and store for later
+ #'P_basic'{correlation_id = ClientCorrelationId,
+ reply_to = ClientQ} = Props,
+
+ %% add our own correlation_id and reply_to fields
+ Properties = Props#'P_basic'{correlation_id = <<CorrelationId:64>>, reply_to = ProxyQ},
+
+ Publish = #'basic.publish'{exchange = RpcExchange},
+
+ Now = date_utils:now_to_milliseconds_hires(erlang:now()),
+
+ amqp_channel:call(Channel, Publish, Msg#amqp_msg{props = Properties}),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+
+ ProxiedReq = #proxied_req{correlation_id = ClientCorrelationId,
+ reply_queue = ClientQ, req_time = Now},
+
+ NewStats = Stats#stats{msgs_nb = Stats#stats.msgs_nb + 1},
+
+ NewState = State#state{correlation_id = CorrelationId + 1,
+ continuations = dict:store(CorrelationId, ProxiedReq, Continuations),
+ stats = NewStats},
+
+ {noreply, NewState};
+
+%% @private
+%% handles message from the RPC server and sends it back to the client
+handle_info({#'basic.deliver'{delivery_tag = DeliveryTag},
+ #amqp_msg{props = Props} = Msg},
+ #state{handler = _Fun, channel = Channel,
+ continuations = Continuations, stats = Stats} = State) ->
+
+ Now = date_utils:now_to_milliseconds_hires(erlang:now()),
+
+ #'P_basic'{correlation_id = <<CorrelationId:64>>} = Props,
+
+ #proxied_req{correlation_id = ClientCorrelationId, reply_queue = ClientQ,
+ req_time = ReqTime} = dict:fetch(CorrelationId, Continuations),
+
+ Cont = dict:erase(CorrelationId, Continuations),
+
+ Properties = Props#'P_basic'{correlation_id = ClientCorrelationId},
+ Publish = #'basic.publish'{exchange = <<>>,
+ routing_key = ClientQ},
+
+ amqp_channel:call(Channel, Publish, Msg#amqp_msg{props = Properties}),
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+
+ Elapsed = Now - ReqTime,
+
+ log("Now", Now),
+ log("ReqTime", ReqTime),
+ log("Elapsed", Elapsed),
+ log("time_total", Stats#stats.time_total),
+
+ NewStats = Stats#stats{time_total = Stats#stats.time_total + Elapsed},
+
+ {noreply, State#state{stats = NewStats, continuations = Cont}}.
+
+handle_call(stats, _From, #state{stats = Stats} = State) ->
+ io:format("Stats ~p~n", [Stats]),
+ {reply, ok, State};
+
+%% @private
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State}.
+
+%%--------------------------------------------------------------------------
+%% Rest of the gen_server callbacks
+%%--------------------------------------------------------------------------
+
+%% @private
+handle_cast(_Message, State) ->
+ {noreply, State}.
+
+%% Closes the channel this gen_server instance started
+%% @private
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+%% @private
+code_change(_OldVsn, State, _Extra) ->
+ State.
+
+log(Key,Value) ->
+ io:format("~p: ~p~n",[Key,Value]).
18 src/amqp_test.erl
@@ -0,0 +1,18 @@
+-module(amqp_test).
+
+-export([start/0, start_debug/0]).
+
+-include("amqp_client.hrl").
+
+start(Opts) ->
+ {ok, Connection} = amqp_connection:start(network, #amqp_params{}),
+ ProxyEx = <<"char_count">>,
+ RpcExchange = <<"char_count_server">>,
+ Fun = undefined,
+ amqp_smart_proxy:start(Connection, ProxyEx, RpcExchange, Fun, Opts).
+
+start() ->
+ start([]).
+
+start_debug() ->
+ start([{debug, [trace]}]).
115 src/date_utils.erl
@@ -0,0 +1,115 @@
+%% from https://gist.github.com/104903
+-module(date_utils).
+-compile(export_all).
+
+epoch() ->
+ now_to_seconds(now())
+.
+
+epoch_hires() ->
+ now_to_seconds_hires(now())
+.
+
+now_to_seconds({Mega, Sec, _}) ->
+ (Mega * 1000000) + Sec
+.
+
+now_to_milliseconds({Mega, Sec, Micro}) ->
+ now_to_seconds({Mega, Sec, Micro}) * 1000
+.
+
+now_to_seconds_hires({Mega, Sec, Micro}) ->
+ now_to_seconds({Mega, Sec, Micro}) + (Micro / 1000000)
+.
+
+now_to_milliseconds_hires({Mega, Sec, Micro}) ->
+ now_to_seconds_hires({Mega, Sec, Micro}) * 1000
+.
+
+epoch_gregorian_seconds() ->
+ calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}})
+.
+
+now_to_gregorian_seconds() ->
+ epoch_to_gregorian_seconds(now())
+.
+
+epoch_to_gregorian_seconds({Mega, Sec, Micro}) ->
+ epoch_to_gregorian_seconds(now_to_seconds({Mega, Sec, Micro}));
+epoch_to_gregorian_seconds(Now) ->
+ EpochSecs = epoch_gregorian_seconds()
+ , Now + EpochSecs
+.
+
+gregorian_seconds_to_epoch(Secs) ->
+ EpochSecs = epoch_gregorian_seconds()
+ , Secs - EpochSecs
+.
+
+date_to_epoch(Date) ->
+ datetime_to_epoch({Date, {0,0,0} })
+.
+
+datetime_to_epoch({Date, Time}) ->
+ gregorian_seconds_to_epoch(
+ calendar:datetime_to_gregorian_seconds({Date, Time}))
+.
+
+is_older_by(T1, T2, {days, N}) ->
+ N1 = day_difference(T1, T2)
+ , case N1 of
+ N2 when (-N < N2) ->
+ true;
+ _ ->
+ false
+ end
+.
+
+is_sooner_by(T1, T2, {days, N}) ->
+ case day_difference(T1, T2) of
+ N1 when N > N1 ->
+ true;
+ _ ->
+ false
+ end
+.
+
+is_time_older_than({Date, Time}, Mark) ->
+ is_time_older_than(calendar:datetime_to_gregorian_seconds({Date, Time})
+ , Mark);
+is_time_older_than(Time, {DateMark, TimeMark}) ->
+ is_time_older_than(Time
+ , calendar:datetime_to_gregorian_seconds({DateMark, TimeMark}));
+is_time_older_than(Time, Mark) when is_integer(Time), is_integer(Mark) ->
+ Time < Mark
+.
+
+day_difference({D1, _}, D2) ->
+ day_difference(D1, D2);
+day_difference(D1, {D2, _}) ->
+ day_difference(D1, D2);
+day_difference(D1, D2) ->
+ Days1 = calendar:date_to_gregorian_days(D1)
+ , Days2 = calendar:date_to_gregorian_days(D2)
+ , Days1 - Days2
+.
+
+is_time_sooner_than({Date, Time}, Mark) ->
+ is_time_sooner_than(calendar:datetime_to_gregorian_seconds({Date, Time})
+ , Mark);
+is_time_sooner_than(Time, {DateMark, TimeMark}) ->
+ is_time_sooner_than(Time
+ , calendar:datetime_to_gregorian_seconds({DateMark, TimeMark}));
+is_time_sooner_than(Time, Mark) when is_integer(Time), is_integer(Mark) ->
+ Time > Mark
+.
+
+subtract(Date, {days, N}) ->
+ New = calendar:date_to_gregorian_days(Date) - N
+ , calendar:gregorian_days_to_date(New)
+.
+
+add(Date, {days, N}) ->
+ New = calendar:date_to_gregorian_days(Date) + N
+ , calendar:gregorian_days_to_date(New)
+.
155 src/rmq_control.erl
@@ -0,0 +1,155 @@
+-module(rmq_control).
+%
+% -behaviour(gen_server).
+%
+% -export([start_link/0]).
+%
+% -export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+% terminate/2, code_change/3]).
+%
+% -include("amqp_client.hrl").
+%
+% -record(state, {connection, channel, consumer_tag, exchange, routing_key, queue_name}).
+%
+% start_link() ->
+% gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+%
+% init([]) ->
+% io:format("Starting rmq_control...~n", []),
+% {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}} = setup_consumer(),
+% {ok, #state{connection = Connection,
+% channel = Channel,
+% queue_name = Queue,
+% consumer_tag = ConsumerTag,
+% exchange = Exchange,
+% routing_key = RoutingKey}}.
+%
+% %% callbacks
+% handle_call(hello, _From, State) ->
+% io:format("Hello from server!~n", []),
+% {reply, ok, State};
+%
+% handle_call(_Request, _From, State) ->
+% Reply = ok,
+% {reply, Reply, State}.
+%
+% handle_cast(_Msg, State) ->
+% {noreply, State}.
+%
+% handle_info({#'basic.deliver'{consumer_tag=_ConsumerTag, delivery_tag=_DeliveryTag,
+% redelivered=_Redelivered, exchange=_Exchange,
+% routing_key=_RoutingKey}, Content}, State) ->
+% #amqp_msg{payload = Payload} = Content,
+% io:format("~p~n", [Payload]),
+% {noreply, State};
+%
+% handle_info(Info, State) ->
+% io:format("~p~n", [Info]),
+% {noreply, State}.
+%
+% terminate(_Reason, _State) ->
+% ok.
+%
+% code_change(_OldVsn, State, _Extra) ->
+% {ok, State}.
+%
+% setup_consumer() ->
+% {ok, Connection} = amqp_connection:start(network, get_record(amqp_params)),
+% {ok, Channel} = amqp_connection:open_channel(Connection),
+%
+% #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, get_record('queue.declare')),
+%
+% Exchange = <<"amq.rabbitmq.log">>,
+% % Declare = #'exchange.declare'{exchange = Exchange,
+% % type = <<"topic">>},
+% %
+% % -record('exchange.declare', {ticket = 0, exchange, type = <<"direct">>, passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = []}).
+% % #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare),
+%
+% RoutingKey = <<"#">>,
+% Binding = #'queue.bind'{queue = Queue,
+% exchange = Exchange,
+% routing_key = RoutingKey},
+% #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
+%
+% ConsumerTag = <<"my_tag">>,
+% BasicConsume = #'basic.consume'{queue = Queue,
+% consumer_tag = ConsumerTag,
+% no_ack = true},
+% #'basic.consume_ok'{consumer_tag = ConsumerTag}
+% = amqp_channel:subscribe(Channel, BasicConsume, self()),
+%
+% receive
+% #'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok
+% end,
+%
+% {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}}.
+%
+% anon_consumer_options() ->
+% [{'queue.declare',
+%
+% [{queue, <<"">>}, {passive, false}, {durable, false}, {exclusive, true}, {auto_delete, true},
+% {nowait, false}, {arguments, []}]},
+%
+% {'exchange.declare',
+%
+% [{exchange, <<"">>}, {type, "direct"}, {passive, false}, {durable, true},
+% {auto_delete, false}, {internal, false}, {nowait, false}, {arguments, []}]},
+%
+% {'queue.bind',
+%
+% [{nowait, false}, {arguments, []}]},
+%
+% {'basic.consume',
+%
+% [{consumer_tag, "msgs_consumer"}, {no_local, false}, {no_ack, false},
+% {exclusive, false}, {nowait, false}, {ticket, []}]}].
+%
+%
+% get_opt(Call, Key) ->
+% {ok, Conf} = application:get_env(Call),
+% proplists:get_value(Key, Conf).
+%
+% get_record('queue.declare') ->
+% #'queue.declare'{ queue = get_opt(queue_declare, queue),
+% passive = get_opt(queue_declare, passive),
+% durable = get_opt(queue_declare, durable),
+% exclusive = get_opt(queue_declare, exclusive),
+% auto_delete = get_opt(queue_declare, auto_delete),
+% nowait = get_opt(queue_declare, nowait),
+% arguments = get_opt(queue_declare, arguments)};
+%
+% get_record('exchange.declare') ->
+% #'exchange.declare'{ exchange = get_opt(exchange_declare, exchange),
+% type = get_opt(exchange_declare, type),
+% passive = get_opt(exchange_declare, passive),
+% durable = get_opt(exchange_declare, durable),
+% auto_delete = get_opt(exchange_declare, auto_delete),
+% internal = get_opt(exchange_declare, internal),
+% nowait = get_opt(exchange_declare, nowait),
+% arguments = get_opt(exchange_declare, arguments)};
+%
+% get_record('queue.bind') ->
+% #'queue.bind'{ queue = get_opt(queue_bind, queue),
+% exchange = get_opt(queue_bind, exchange),
+% routing_key = get_opt(queue_bind, routing_key),
+% nowait = get_opt(queue_bind, nowait),
+% arguments = get_opt(queue_bind, arguments)};
+%
+% get_record('basic.consume') ->
+% #'basic.consume'{ queue = get_opt(basic_consume, queue),
+% consumer_tag = get_opt(basic_consume, queue),
+% no_local = get_opt(basic_consume, no_local),
+% no_ack = get_opt(basic_consume, no_ack),
+% exclusive = get_opt(basic_consume, exclusive),
+% nowait = get_opt(basic_consume, nowait)};
+%
+% get_record('amqp.params') ->
+% #amqp_params{username=get_opt(connection, user),
+% password=get_opt(connection, pass),
+% host=get_opt(connection, host),
+% port=get_opt(connection, port),
+% virtual_host=get_opt(connection, virtual_host)}.
+%
+% log(Key,Value) ->
+% io:format("~p: ~p~n",[Key,Value]).
18 src/rmq_patterns.app.src
@@ -0,0 +1,18 @@
+{application, rmq_patterns,
+ [
+ {description, ""},
+ {vsn, "1"},
+ {modules, [
+ rmq_patterns,
+ rmq_patterns_app,
+ rmq_patterns_sup,
+ rmq_control
+ ]},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { rmq_patterns_app, []}},
+ {env, []}
+ ]}.
8 src/rmq_patterns.erl
@@ -0,0 +1,8 @@
+-module(rmq_patterns).
+-export([start/0, stop/0]).
+
+start() ->
+ application:start(rmq_patterns).
+
+stop() ->
+ application:stop(rmq_patterns).
16 src/rmq_patterns_app.erl
@@ -0,0 +1,16 @@
+-module(rmq_patterns_app).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ rmq_patterns_sup:start_link().
+
+stop(_State) ->
+ ok.
30 src/rmq_patterns_sup.erl
@@ -0,0 +1,30 @@
+
+-module(rmq_patterns_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([]) ->
+ % RmqControl = ?CHILD(rmq_control, worker),
+ SmartProxy = ?CHILD(smart_proxy, worker),
+ {ok, { {one_for_one, 5, 10}, [SmartProxy]} }.
+
185 src/smart_proxy.erl
@@ -0,0 +1,185 @@
+-module(smart_proxy).
+
+-behaviour(gen_server).
+
+-export([start_link/0, consumer_options/0, stop_proxy/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-include("amqp_client.hrl").
+
+-record(state, {connection, channel, consumer_tag, exchange, routing_key, queue_name}).
+
+stop_proxy() ->
+ gen_server:call(?MODULE, stop_proxy).
+
+
+start_link() ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
+
+init([]) ->
+ io:format("Starting rmq_control...~n", []),
+ {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}} = setup_consumer(),
+ {ok, #state{connection = Connection,
+ channel = Channel,
+ queue_name = Queue,
+ consumer_tag = ConsumerTag,
+ exchange = Exchange,
+ routing_key = RoutingKey}}.
+
+%% callbacks
+handle_call(stop_proxy, _From, #state{channel = Channel, consumer_tag = Tag} = State) ->
+ log("Stoping consumer", "OK"),
+ #'basic.cancel_ok'{} =
+ amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}),
+ {reply, ok, State};
+
+handle_call(_Request, _From, State) ->
+ Reply = ok,
+ {reply, Reply, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State};
+
+handle_info(#'basic.cancel_ok'{}, State) ->
+ {stop, normal, State};
+
+handle_info({#'basic.deliver'{consumer_tag=_ConsumerTag, delivery_tag=_DeliveryTag,
+ redelivered=_Redelivered, exchange=_Exchange,
+ routing_key=_RoutingKey}, Content}, State) ->
+ #amqp_msg{payload = Payload} = Content,
+ log("basic.deliver", Payload),
+ {noreply, State};
+
+handle_info(Info, State) ->
+ log(Info, State),
+ {noreply, State}.
+
+terminate(_Reason, #state{channel = Channel}) ->
+ amqp_channel:close(Channel),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+setup_consumer() ->
+ {ok, Connection} = amqp_connection:start(network, get_record(amqp_params)),
+ % {ok, Connection} = amqp_connection:start(network, #amqp_params{}),
+ {ok, Channel} = amqp_connection:open_channel(Connection),
+
+ log("Channel", "OK"),
+
+ QDeclare = get_record('queue.declare'),
+ #'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare),
+
+ log("QDeclare", "OK"),
+
+ ExDeclare = get_record('exchange.declare'),
+ #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare),
+
+ log("ExDeclare", "OK"),
+
+ Bind = get_record('queue.bind'),
+ #'queue.bind_ok'{} = amqp_channel:call(Channel, Bind),
+
+ log("Bind", "OK"),
+
+ BasicConsume = get_record('basic.consume'),
+ #'basic.consume_ok'{consumer_tag = ConsumerTag}
+ = amqp_channel:subscribe(Channel, BasicConsume, self()),
+
+ log("BasicConsume", "OK"),
+
+ receive
+ #'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok
+ end,
+
+ log("Ready", "OK"),
+
+ {ok, {Connection, Channel,
+ QDeclare#'queue.declare'.queue,
+ BasicConsume#'basic.consume'.consumer_tag,
+ ExDeclare#'exchange.declare'.exchange, Bind#'queue.bind'.routing_key}}.
+
+consumer_options() ->
+ Ex = <<"upload_picture">>,
+ Q = <<"smart_proxy">>,
+ RK = <<"">>,
+ CTag = <<"smart_proxy_consumer">>,
+
+ [{amqp_params,
+
+ [{host, "localhost"}, {port, 5672}, {username, <<"guest">>},
+ {password, <<"guest">>}, {virtual_host, <<"/">>}]},
+
+ {'queue.declare',
+
+ [{queue, Q}, {passive, false}, {durable, true}, {exclusive, false}, {auto_delete, false},
+ {nowait, false}, {arguments, []}]},
+
+ {'exchange.declare',
+
+ [{exchange, Ex}, {type, <<"direct">>}, {passive, false}, {durable, true},
+ {auto_delete, false}, {internal, false}, {nowait, false}, {arguments, []}]},
+
+ {'queue.bind',
+
+ [{queue, Q}, {exchange, Ex}, {routing_key, RK}, {nowait, false}, {arguments, []}]},
+
+ {'basic.consume',
+
+ [{queue, Q}, {consumer_tag, CTag}, {no_local, false}, {no_ack, false},
+ {exclusive, false}, {nowait, false}]}].
+
+
+get_opt(Call, Key) ->
+ Conf = proplists:get_value(Call, consumer_options()),
+ proplists:get_value(Key, Conf).
+
+get_record('queue.declare') ->
+ #'queue.declare'{ queue = get_opt('queue.declare', queue),
+ passive = get_opt('queue.declare', passive),
+ durable = get_opt('queue.declare', durable),
+ exclusive = get_opt('queue.declare', exclusive),
+ auto_delete = get_opt('queue.declare', auto_delete),
+ nowait = get_opt('queue.declare', nowait),
+ arguments = get_opt('queue.declare', arguments)};
+
+get_record('exchange.declare') ->
+ #'exchange.declare'{ exchange = get_opt('exchange.declare', exchange),
+ type = get_opt('exchange.declare', type),
+ passive = get_opt('exchange.declare', passive),
+ durable = get_opt('exchange.declare', durable),
+ auto_delete = get_opt('exchange.declare', auto_delete),
+ internal = get_opt('exchange.declare', internal),
+ nowait = get_opt('exchange.declare', nowait),
+ arguments = get_opt('exchange.declare', arguments)};
+
+get_record('queue.bind') ->
+ #'queue.bind'{ queue = get_opt('queue.bind', queue),
+ exchange = get_opt('queue.bind', exchange),
+ routing_key = get_opt('queue.bind', routing_key),
+ nowait = get_opt('queue.bind', nowait),
+ arguments = get_opt('queue.bind', arguments)};
+
+get_record('basic.consume') ->
+ #'basic.consume'{ queue = get_opt('basic.consume', queue),
+ consumer_tag = get_opt('basic.consume', queue),
+ no_local = get_opt('basic.consume', no_local),
+ no_ack = get_opt('basic.consume', no_ack),
+ exclusive = get_opt('basic.consume', exclusive),
+ nowait = get_opt('basic.consume', nowait)};
+
+get_record(amqp_params) ->
+ #amqp_params{username = get_opt(amqp_params, username),
+ password = get_opt(amqp_params, password),
+ host = get_opt(amqp_params, host),
+ port = get_opt(amqp_params, port),
+ virtual_host = get_opt(amqp_params, virtual_host)}.
+
+log(Key,Value) ->
+ io:format("~p: ~p~n",[Key,Value]).
9 start-dev.sh
@@ -0,0 +1,9 @@
+#!/bin/sh
+
+# ./start-dev.php <config_file_name>
+
+cd `dirname $0`
+exec erl -pa $PWD/ebin -pa $PWD/deps/amqp_client/include/rabbit_common/ebin \
+-pa $PWD/deps/amqp_client/ebin \
+-sname amqp_consumer \
+-s rmq_patterns -boot start_sasl

0 comments on commit fef459a

Please sign in to comment.