diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b0730cf --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +deps/* +ebin/* +erl_crash.dump \ No newline at end of file diff --git a/priv/log_watcher.config b/priv/log_watcher.config new file mode 100644 index 0000000..167b424 --- /dev/null +++ b/priv/log_watcher.config @@ -0,0 +1,13 @@ +[{consumer, [ + + {consumer_mod, xmpp_consumer}, + + {'amqp.params', + + [{host, <<"localhost">>}, {port, 5672}, {user, <<"guest">>}, {password, <<"guest">>}, {virtual_host, <<"/">>}]}, + + {exchange_options, + + [{exchange, <<"amq.rabbitmq.log">>}, {type, "topic"}]} + +]}]. \ No newline at end of file diff --git a/rebar b/rebar new file mode 120000 index 0000000..e3de534 --- /dev/null +++ b/rebar @@ -0,0 +1 @@ +/usr/local/bin/rebar \ No newline at end of file diff --git a/rebar.config b/rebar.config new file mode 100644 index 0000000..a0592b2 --- /dev/null +++ b/rebar.config @@ -0,0 +1,13 @@ +{sub_dirs, ["rel"]}. + +% {deps, [ +% {amqp_client, "", {hg, "rabbitmq-erlang-client", "amqp_client"}} +% ]}. + +{erl_opts, [debug_info, fail_on_warning, + {i, "include"}, + {i, "deps/amqp_client/include"}, + {i, "deps/amqp_client/ebin"}, + {i, "deps/amqp_client/include/rabbit_common/include"}, + {i, "deps/amqp_client/include/rabbit_common/ebin"} +]}. \ No newline at end of file diff --git a/rel/files/app.config b/rel/files/app.config new file mode 100644 index 0000000..bba388b --- /dev/null +++ b/rel/files/app.config @@ -0,0 +1,11 @@ +[ + %% SASL config + {sasl, [ + {sasl_error_logger, {file, "log/sasl-error.log"}}, + {errlog_type, error}, + {error_logger_mf_dir, "log/sasl"}, % Log directory + {error_logger_mf_maxbytes, 10485760}, % 10 MB max file size + {error_logger_mf_maxfiles, 5} % 5 files max + ]} +]. + diff --git a/rel/files/erl b/rel/files/erl new file mode 100755 index 0000000..e500626 --- /dev/null +++ b/rel/files/erl @@ -0,0 +1,34 @@ +#!/bin/bash + +## This script replaces the default "erl" in erts-VSN/bin. This is necessary +## as escript depends on erl and in turn, erl depends on having access to a +## bootscript (start.boot). Note that this script is ONLY invoked as a side-effect +## of running escript -- the embedded node bypasses erl and uses erlexec directly +## (as it should). +## +## Note that this script makes the assumption that there is a start_clean.boot +## file available in $ROOTDIR/release/VSN. + +# Determine the abspath of where this script is executing from. +ERTS_BIN_DIR=$(cd ${0%/*} && pwd) + +# Now determine the root directory -- this script runs from erts-VSN/bin, +# so we simply need to strip off two dirs from the end of the ERTS_BIN_DIR +# path. +ROOTDIR=${ERTS_BIN_DIR%/*/*} + +# Parse out release and erts info +START_ERL=`cat $ROOTDIR/releases/start_erl.data` +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } + +BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin +EMU=beam +PROGNAME=`echo $0 | sed 's/.*\\///'` +CMD="$BINDIR/erlexec" +export EMU +export ROOTDIR +export BINDIR +export PROGNAME + +exec $CMD -boot $ROOTDIR/releases/$APP_VSN/start_clean ${1+"$@"} \ No newline at end of file diff --git a/rel/files/nodetool b/rel/files/nodetool new file mode 100755 index 0000000..eb08fa4 --- /dev/null +++ b/rel/files/nodetool @@ -0,0 +1,138 @@ +%% -*- mode: erlang;erlang-indent-level: 4;indent-tabs-mode: nil -*- +%% ex: ft=erlang ts=4 sw=4 et +%% ------------------------------------------------------------------- +%% +%% nodetool: Helper Script for interacting with live nodes +%% +%% ------------------------------------------------------------------- + +main(Args) -> + ok = start_epmd(), + %% Extract the args + {RestArgs, TargetNode} = process_args(Args, [], undefined), + + %% See if the node is currently running -- if it's not, we'll bail + case {net_kernel:hidden_connect_node(TargetNode), net_adm:ping(TargetNode)} of + {true, pong} -> + ok; + {_, pang} -> + io:format("Node ~p not responding to pings.\n", [TargetNode]), + halt(1) + end, + + case RestArgs of + ["ping"] -> + %% If we got this far, the node already responsed to a ping, so just dump + %% a "pong" + io:format("pong\n"); + ["stop"] -> + io:format("~p\n", [rpc:call(TargetNode, init, stop, [], 60000)]); + ["restart"] -> + io:format("~p\n", [rpc:call(TargetNode, init, restart, [], 60000)]); + ["reboot"] -> + io:format("~p\n", [rpc:call(TargetNode, init, reboot, [], 60000)]); + ["rpc", Module, Function | RpcArgs] -> + case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function), + [RpcArgs], 60000) of + ok -> + ok; + {badrpc, Reason} -> + io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]), + halt(1); + _ -> + halt(1) + end; + ["rpcterms", Module, Function, ArgsAsString] -> + case rpc:call(TargetNode, list_to_atom(Module), list_to_atom(Function), + consult(ArgsAsString), 60000) of + {badrpc, Reason} -> + io:format("RPC to ~p failed: ~p\n", [TargetNode, Reason]), + halt(1); + Other -> + io:format("~p\n", [Other]) + end; + Other -> + io:format("Other: ~p\n", [Other]), + io:format("Usage: nodetool {ping|stop|restart|reboot}\n") + end, + net_kernel:stop(). + +process_args([], Acc, TargetNode) -> + {lists:reverse(Acc), TargetNode}; +process_args(["-setcookie", Cookie | Rest], Acc, TargetNode) -> + erlang:set_cookie(node(), list_to_atom(Cookie)), + process_args(Rest, Acc, TargetNode); +process_args(["-name", TargetName | Rest], Acc, _) -> + ThisNode = append_node_suffix(TargetName, "_maint_"), + {ok, _} = net_kernel:start([ThisNode, longnames]), + process_args(Rest, Acc, nodename(TargetName)); +process_args(["-sname", TargetName | Rest], Acc, _) -> + ThisNode = append_node_suffix(TargetName, "_maint_"), + {ok, _} = net_kernel:start([ThisNode, shortnames]), + process_args(Rest, Acc, nodename(TargetName)); +process_args([Arg | Rest], Acc, Opts) -> + process_args(Rest, [Arg | Acc], Opts). + + +start_epmd() -> + [] = os:cmd(epmd_path() ++ " -daemon"), + ok. + +epmd_path() -> + ErtsBinDir = filename:dirname(escript:script_name()), + Name = "epmd", + case os:find_executable(Name, ErtsBinDir) of + false -> + case os:find_executable(Name) of + false -> + io:format("Could not find epmd.~n"), + halt(1); + GlobalEpmd -> + GlobalEpmd + end; + Epmd -> + Epmd + end. + + +nodename(Name) -> + case string:tokens(Name, "@") of + [_Node, _Host] -> + list_to_atom(Name); + [Node] -> + [_, Host] = string:tokens(atom_to_list(node()), "@"), + list_to_atom(lists:concat([Node, "@", Host])) + end. + +append_node_suffix(Name, Suffix) -> + case string:tokens(Name, "@") of + [Node, Host] -> + list_to_atom(lists:concat([Node, Suffix, os:getpid(), "@", Host])); + [Node] -> + list_to_atom(lists:concat([Node, Suffix, os:getpid()])) + end. + + +%% +%% Given a string or binary, parse it into a list of terms, ala file:consult/0 +%% +consult(Str) when is_list(Str) -> + consult([], Str, []); +consult(Bin) when is_binary(Bin)-> + consult([], binary_to_list(Bin), []). + +consult(Cont, Str, Acc) -> + case erl_scan:tokens(Cont, Str, 0) of + {done, Result, Remaining} -> + case Result of + {ok, Tokens, _} -> + {ok, Term} = erl_parse:parse_term(Tokens), + consult([], Remaining, [Term | Acc]); + {eof, _Other} -> + lists:reverse(Acc); + {error, Info, _} -> + {error, Info} + end; + {more, Cont1} -> + consult(Cont1, eof, Acc) + end. diff --git a/rel/files/rmq_patterns b/rel/files/rmq_patterns new file mode 100755 index 0000000..18fa951 --- /dev/null +++ b/rel/files/rmq_patterns @@ -0,0 +1,155 @@ +#!/bin/bash +# -*- tab-width:4;indent-tabs-mode:nil -*- +# ex: ts=4 sw=4 et + +RUNNER_SCRIPT_DIR=$(cd ${0%/*} && pwd) + +RUNNER_BASE_DIR=${RUNNER_SCRIPT_DIR%/*} +RUNNER_ETC_DIR=$RUNNER_BASE_DIR/etc +RUNNER_LOG_DIR=$RUNNER_BASE_DIR/log +# Note the trailing slash on $PIPE_DIR/ +PIPE_DIR=/tmp/$RUNNER_BASE_DIR/ +RUNNER_USER= + +# Make sure this script is running as the appropriate user +if [ ! -z "$RUNNER_USER" ] && [ `whoami` != "$RUNNER_USER" ]; then + exec sudo -u $RUNNER_USER -i $0 $@ +fi + +# Make sure CWD is set to runner base dir +cd $RUNNER_BASE_DIR + +# Make sure log directory exists +mkdir -p $RUNNER_LOG_DIR + +# Extract the target node name from node.args +NAME_ARG=`grep -e '-[s]*name' $RUNNER_ETC_DIR/vm.args` +if [ -z "$NAME_ARG" ]; then + echo "vm.args needs to have either -name or -sname parameter." + exit 1 +fi + +# Extract the target cookie +COOKIE_ARG=`grep -e '-setcookie' $RUNNER_ETC_DIR/vm.args` +if [ -z "$COOKIE_ARG" ]; then + echo "vm.args needs to have a -setcookie parameter." + exit 1 +fi + +# Identify the script name +SCRIPT=`basename $0` + +# Parse out release and erts info +START_ERL=`cat $RUNNER_BASE_DIR/releases/start_erl.data` +ERTS_VSN=${START_ERL% *} +APP_VSN=${START_ERL#* } + +# Add ERTS bin dir to our path +ERTS_PATH=$RUNNER_BASE_DIR/erts-$ERTS_VSN/bin + +# Setup command to control the node +NODETOOL="$ERTS_PATH/escript $ERTS_PATH/nodetool $NAME_ARG $COOKIE_ARG" + +# Check the first argument for instructions +case "$1" in + start) + # Make sure there is not already a node running + RES=`$NODETOOL ping` + if [ "$RES" = "pong" ]; then + echo "Node is already running!" + exit 1 + fi + HEART_COMMAND="$RUNNER_BASE_DIR/bin/$SCRIPT start" + export HEART_COMMAND + mkdir -p $PIPE_DIR + $ERTS_PATH/run_erl -daemon $PIPE_DIR $RUNNER_LOG_DIR "exec $RUNNER_BASE_DIR/bin/$SCRIPT console" 2>&1 + ;; + + stop) + # Wait for the node to completely stop... + case `uname -s` in + Linux|Darwin|FreeBSD|DragonFly|NetBSD|OpenBSD) + # PID COMMAND + PID=`ps ax -o pid= -o command=|\ + grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'` + ;; + SunOS) + # PID COMMAND + PID=`ps -ef -o pid= -o args=|\ + grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $1}'` + ;; + CYGWIN*) + # UID PID PPID TTY STIME COMMAND + PID=`ps -efW|grep "$RUNNER_BASE_DIR/.*/[b]eam"|awk '{print $2}'` + ;; + esac + $NODETOOL stop + while `kill -0 $PID 2>/dev/null`; + do + sleep 1 + done + ;; + + restart) + ## Restart the VM without exiting the process + $NODETOOL restart + ;; + + reboot) + ## Restart the VM completely (uses heart to restart it) + $NODETOOL reboot + ;; + + ping) + ## See if the VM is alive + $NODETOOL ping + ;; + + attach) + # Make sure a node IS running + RES=`$NODETOOL ping` + if [ "$RES" != "pong" ]; then + echo "Node is not running!" + exit 1 + fi + + shift + $ERTS_PATH/to_erl $PIPE_DIR + ;; + + console|console_clean) + # .boot file typically just $SCRIPT (ie, the app name) + # however, for debugging, sometimes start_clean.boot is useful: + case "$1" in + console) BOOTFILE=$SCRIPT ;; + console_clean) BOOTFILE=start_clean ;; + esac + # Setup beam-required vars + ROOTDIR=$RUNNER_BASE_DIR + BINDIR=$ROOTDIR/erts-$ERTS_VSN/bin + EMU=beam + PROGNAME=`echo $0 | sed 's/.*\\///'` + CMD="$BINDIR/erlexec -boot $RUNNER_BASE_DIR/releases/$APP_VSN/$BOOTFILE -embedded -config $RUNNER_ETC_DIR/app.config -args_file $RUNNER_ETC_DIR/vm.args -- ${1+"$@"}" + export EMU + export ROOTDIR + export BINDIR + export PROGNAME + + # Dump environment info for logging purposes + echo "Exec: $CMD" + echo "Root: $ROOTDIR" + + # Log the startup + logger -t "$SCRIPT[$$]" "Starting up" + + # Start the VM + exec $CMD + ;; + + *) + echo "Usage: $SCRIPT {start|stop|restart|reboot|ping|console|attach}" + exit 1 + ;; +esac + +exit 0 diff --git a/rel/files/vm.args b/rel/files/vm.args new file mode 100644 index 0000000..29fb907 --- /dev/null +++ b/rel/files/vm.args @@ -0,0 +1,22 @@ + +## Name of the node +## -name rmq_patterns@127.0.0.1 +-sname rmq_patterns + +## Cookie for distributed erlang +-setcookie rmq_patterns + +## Heartbeat management; auto-restarts VM if it dies or becomes unresponsive +## (Disabled by default..use with caution!) +##-heart + +## Enable kernel poll and a few async threads ++K true ++A 5 + +## Increase number of concurrent ports/sockets +-env ERL_MAX_PORTS 4096 + +## Tweak GC to run more often +-env ERL_FULLSWEEP_AFTER 10 + diff --git a/rel/reltool.config b/rel/reltool.config new file mode 100644 index 0000000..2194d7b --- /dev/null +++ b/rel/reltool.config @@ -0,0 +1,34 @@ +{sys, [ + {lib_dirs, ["../../"]}, + {rel, "rmq_patterns", "1", + [ + kernel, + stdlib, + sasl, + rmq_patterns + ]}, + {rel, "start_clean", "", + [ + kernel, + stdlib + ]}, + {boot_rel, "rmq_patterns"}, + {profile, embedded}, + {excl_sys_filters, ["^bin/.*", + "^erts.*/bin/(dialyzer|typer)"]}, + {app, rmq_patterns, [{incl_cond, include}]}, + {app, sasl, [{incl_cond, include}]} + ]}. + +{target_dir, "rmq_patterns"}. + +{overlay, [ + {mkdir, "log/sasl"}, + {copy, "files/erl", "{{erts_vsn}}/bin/erl"}, + {copy, "files/nodetool", "{{erts_vsn}}/bin/nodetool"}, + {copy, "files/rmq_patterns", "bin/rmq_patterns"}, + {copy, "files/app.config", "etc/app.config"}, + {copy, "files/vm.args", "etc/vm.args"}, + {copy, "deps/amqp_client.ez", "lib/amqp_client.ez"}, + {copy, "deps/rabbit_common.ez", "lib/rabbit_common.ez "} + ]}. diff --git a/src/amqp_smart_proxy.erl b/src/amqp_smart_proxy.erl new file mode 100644 index 0000000..04bef91 --- /dev/null +++ b/src/amqp_smart_proxy.erl @@ -0,0 +1,203 @@ +%% @doc This is a utility module that is used to expose an arbitrary function +%% via an asynchronous RPC over AMQP mechanism. It frees the implementor of +%% a simple function from having to plumb this into AMQP. Note that the +%% RPC server does not handle any data encoding, so it is up to the callback +%% function to marshall and unmarshall message payloads accordingly. +-module(amqp_smart_proxy). + +-behaviour(gen_server). + +-include("amqp_client.hrl"). + +-export([init/1, terminate/2, code_change/3, handle_call/3, + handle_cast/2, handle_info/2]). +-export([start/5]). +-export([stop/1]). +-export([stats/1]). + +-record(state, {channel, + handler, + proxy_queue, + proxy_exchange, + rpc_exchange, + correlation_id = 0, + continuations = dict:new(), + stats}). + +-record(proxied_req, {correlation_id, + reply_queue, + req_time}). + +-record(stats, {msgs_nb = 0, + time_total = 0}). + +%%-------------------------------------------------------------------------- +%% API +%%-------------------------------------------------------------------------- + +%% @spec (Connection, Queue, RpcHandler) -> RpcServer +%% where +%% Connection = pid() +%% ProxyEx = binary() +%% RpcHandler = binary() +%% RpcHandler = function() +%% RpcServer = pid() +%% @doc Starts a new RPC server instance that receives requests via a +%% specified queue and dispatches them to a specified handler function. This +%% function returns the pid of the RPC server that can be used to stop the +%% server. +start(Connection, ProxyEx, RpcExchange, Fun, Opts) -> + {ok, Pid} = gen_server:start(?MODULE, [Connection, ProxyEx, RpcExchange, Fun], Opts), + Pid. + +%% @spec (RpcServer) -> ok +%% where +%% RpcServer = pid() +%% @doc Stops an exisiting RPC server. +stop(Pid) -> + gen_server:call(Pid, stop, infinity). + +stats(Pid) -> + gen_server:call(Pid, stats). + +%%-------------------------------------------------------------------------- +%% gen_server callbacks +%%-------------------------------------------------------------------------- + +%% @private +init([Connection, ProxyEx, RpcExchange, Fun]) -> + {ok, Channel} = amqp_connection:open_channel(Connection), + + %%setup reply queue to get messages from RPC Server. + #'queue.declare_ok'{queue = ReplyQ} + = amqp_channel:call(Channel, #'queue.declare'{}), + log("queue.declare", "ok"), + amqp_channel:subscribe(Channel, #'basic.consume'{queue = ReplyQ}, self()), + log("basic.consume", "ok"), + + %%setup proxy exchange + ExDeclare = #'exchange.declare'{exchange=ProxyEx, durable = true}, + #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare), + + %%setup proxy queue to get messages from RPC Client + #'queue.declare_ok'{queue = ProxyQ} + = amqp_channel:call(Channel, #'queue.declare'{}), + log("queue.declare", "ok2"), + QueueBind = #'queue.bind'{queue = ProxyQ, + exchange = ProxyEx}, + #'queue.bind_ok'{} = amqp_channel:call(Channel, QueueBind), + log("queue.bind", "ok"), + amqp_channel:subscribe(Channel, #'basic.consume'{queue = ProxyQ}, self()), + log("subscribe", "ok"), + + {ok, #state{channel = Channel, handler = Fun, proxy_queue = ReplyQ, + proxy_exchange = ProxyEx, rpc_exchange = RpcExchange, stats = #stats{}}}. + +%% @private +handle_info(shutdown, State) -> + {stop, normal, State}; + +%% @private +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +%% @private +handle_info(#'basic.cancel_ok'{}, State) -> + {stop, normal, State}; + +%% @private +%% handles message from the RPC Client and forwards to the RPC server +handle_info({#'basic.deliver'{delivery_tag = DeliveryTag, exchange = ProxyEx}, + #amqp_msg{props = Props} = Msg}, + #state{handler = _Fun, channel = Channel, proxy_queue = ProxyQ, + proxy_exchange = ProxyEx, rpc_exchange = RpcExchange, + correlation_id = CorrelationId, continuations = Continuations, + stats = Stats} = State) -> + + %%extract original message props and store for later + #'P_basic'{correlation_id = ClientCorrelationId, + reply_to = ClientQ} = Props, + + %% add our own correlation_id and reply_to fields + Properties = Props#'P_basic'{correlation_id = <>, reply_to = ProxyQ}, + + Publish = #'basic.publish'{exchange = RpcExchange}, + + Now = date_utils:now_to_milliseconds_hires(erlang:now()), + + amqp_channel:call(Channel, Publish, Msg#amqp_msg{props = Properties}), + amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), + + ProxiedReq = #proxied_req{correlation_id = ClientCorrelationId, + reply_queue = ClientQ, req_time = Now}, + + NewStats = Stats#stats{msgs_nb = Stats#stats.msgs_nb + 1}, + + NewState = State#state{correlation_id = CorrelationId + 1, + continuations = dict:store(CorrelationId, ProxiedReq, Continuations), + stats = NewStats}, + + {noreply, NewState}; + +%% @private +%% handles message from the RPC server and sends it back to the client +handle_info({#'basic.deliver'{delivery_tag = DeliveryTag}, + #amqp_msg{props = Props} = Msg}, + #state{handler = _Fun, channel = Channel, + continuations = Continuations, stats = Stats} = State) -> + + Now = date_utils:now_to_milliseconds_hires(erlang:now()), + + #'P_basic'{correlation_id = <>} = Props, + + #proxied_req{correlation_id = ClientCorrelationId, reply_queue = ClientQ, + req_time = ReqTime} = dict:fetch(CorrelationId, Continuations), + + Cont = dict:erase(CorrelationId, Continuations), + + Properties = Props#'P_basic'{correlation_id = ClientCorrelationId}, + Publish = #'basic.publish'{exchange = <<>>, + routing_key = ClientQ}, + + amqp_channel:call(Channel, Publish, Msg#amqp_msg{props = Properties}), + amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}), + + Elapsed = Now - ReqTime, + + log("Now", Now), + log("ReqTime", ReqTime), + log("Elapsed", Elapsed), + log("time_total", Stats#stats.time_total), + + NewStats = Stats#stats{time_total = Stats#stats.time_total + Elapsed}, + + {noreply, State#state{stats = NewStats, continuations = Cont}}. + +handle_call(stats, _From, #state{stats = Stats} = State) -> + io:format("Stats ~p~n", [Stats]), + {reply, ok, State}; + +%% @private +handle_call(stop, _From, State) -> + {stop, normal, ok, State}. + +%%-------------------------------------------------------------------------- +%% Rest of the gen_server callbacks +%%-------------------------------------------------------------------------- + +%% @private +handle_cast(_Message, State) -> + {noreply, State}. + +%% Closes the channel this gen_server instance started +%% @private +terminate(_Reason, #state{channel = Channel}) -> + amqp_channel:close(Channel), + ok. + +%% @private +code_change(_OldVsn, State, _Extra) -> + State. + +log(Key,Value) -> + io:format("~p: ~p~n",[Key,Value]). \ No newline at end of file diff --git a/src/amqp_test.erl b/src/amqp_test.erl new file mode 100644 index 0000000..fea1a8d --- /dev/null +++ b/src/amqp_test.erl @@ -0,0 +1,18 @@ +-module(amqp_test). + +-export([start/0, start_debug/0]). + +-include("amqp_client.hrl"). + +start(Opts) -> + {ok, Connection} = amqp_connection:start(network, #amqp_params{}), + ProxyEx = <<"char_count">>, + RpcExchange = <<"char_count_server">>, + Fun = undefined, + amqp_smart_proxy:start(Connection, ProxyEx, RpcExchange, Fun, Opts). + +start() -> + start([]). + +start_debug() -> + start([{debug, [trace]}]). \ No newline at end of file diff --git a/src/date_utils.erl b/src/date_utils.erl new file mode 100644 index 0000000..2268676 --- /dev/null +++ b/src/date_utils.erl @@ -0,0 +1,115 @@ +%% from https://gist.github.com/104903 +-module(date_utils). +-compile(export_all). + +epoch() -> + now_to_seconds(now()) +. + +epoch_hires() -> + now_to_seconds_hires(now()) +. + +now_to_seconds({Mega, Sec, _}) -> + (Mega * 1000000) + Sec +. + +now_to_milliseconds({Mega, Sec, Micro}) -> + now_to_seconds({Mega, Sec, Micro}) * 1000 +. + +now_to_seconds_hires({Mega, Sec, Micro}) -> + now_to_seconds({Mega, Sec, Micro}) + (Micro / 1000000) +. + +now_to_milliseconds_hires({Mega, Sec, Micro}) -> + now_to_seconds_hires({Mega, Sec, Micro}) * 1000 +. + +epoch_gregorian_seconds() -> + calendar:datetime_to_gregorian_seconds({{1970,1,1}, {0,0,0}}) +. + +now_to_gregorian_seconds() -> + epoch_to_gregorian_seconds(now()) +. + +epoch_to_gregorian_seconds({Mega, Sec, Micro}) -> + epoch_to_gregorian_seconds(now_to_seconds({Mega, Sec, Micro})); +epoch_to_gregorian_seconds(Now) -> + EpochSecs = epoch_gregorian_seconds() + , Now + EpochSecs +. + +gregorian_seconds_to_epoch(Secs) -> + EpochSecs = epoch_gregorian_seconds() + , Secs - EpochSecs +. + +date_to_epoch(Date) -> + datetime_to_epoch({Date, {0,0,0} }) +. + +datetime_to_epoch({Date, Time}) -> + gregorian_seconds_to_epoch( + calendar:datetime_to_gregorian_seconds({Date, Time})) +. + +is_older_by(T1, T2, {days, N}) -> + N1 = day_difference(T1, T2) + , case N1 of + N2 when (-N < N2) -> + true; + _ -> + false + end +. + +is_sooner_by(T1, T2, {days, N}) -> + case day_difference(T1, T2) of + N1 when N > N1 -> + true; + _ -> + false + end +. + +is_time_older_than({Date, Time}, Mark) -> + is_time_older_than(calendar:datetime_to_gregorian_seconds({Date, Time}) + , Mark); +is_time_older_than(Time, {DateMark, TimeMark}) -> + is_time_older_than(Time + , calendar:datetime_to_gregorian_seconds({DateMark, TimeMark})); +is_time_older_than(Time, Mark) when is_integer(Time), is_integer(Mark) -> + Time < Mark +. + +day_difference({D1, _}, D2) -> + day_difference(D1, D2); +day_difference(D1, {D2, _}) -> + day_difference(D1, D2); +day_difference(D1, D2) -> + Days1 = calendar:date_to_gregorian_days(D1) + , Days2 = calendar:date_to_gregorian_days(D2) + , Days1 - Days2 +. + +is_time_sooner_than({Date, Time}, Mark) -> + is_time_sooner_than(calendar:datetime_to_gregorian_seconds({Date, Time}) + , Mark); +is_time_sooner_than(Time, {DateMark, TimeMark}) -> + is_time_sooner_than(Time + , calendar:datetime_to_gregorian_seconds({DateMark, TimeMark})); +is_time_sooner_than(Time, Mark) when is_integer(Time), is_integer(Mark) -> + Time > Mark +. + +subtract(Date, {days, N}) -> + New = calendar:date_to_gregorian_days(Date) - N + , calendar:gregorian_days_to_date(New) +. + +add(Date, {days, N}) -> + New = calendar:date_to_gregorian_days(Date) + N + , calendar:gregorian_days_to_date(New) +. \ No newline at end of file diff --git a/src/rmq_control.erl b/src/rmq_control.erl new file mode 100644 index 0000000..7681b89 --- /dev/null +++ b/src/rmq_control.erl @@ -0,0 +1,155 @@ +-module(rmq_control). +% +% -behaviour(gen_server). +% +% -export([start_link/0]). +% +% -export([init/1, handle_call/3, handle_cast/2, handle_info/2, +% terminate/2, code_change/3]). +% +% -include("amqp_client.hrl"). +% +% -record(state, {connection, channel, consumer_tag, exchange, routing_key, queue_name}). +% +% start_link() -> +% gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). +% +% init([]) -> +% io:format("Starting rmq_control...~n", []), +% {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}} = setup_consumer(), +% {ok, #state{connection = Connection, +% channel = Channel, +% queue_name = Queue, +% consumer_tag = ConsumerTag, +% exchange = Exchange, +% routing_key = RoutingKey}}. +% +% %% callbacks +% handle_call(hello, _From, State) -> +% io:format("Hello from server!~n", []), +% {reply, ok, State}; +% +% handle_call(_Request, _From, State) -> +% Reply = ok, +% {reply, Reply, State}. +% +% handle_cast(_Msg, State) -> +% {noreply, State}. +% +% handle_info({#'basic.deliver'{consumer_tag=_ConsumerTag, delivery_tag=_DeliveryTag, +% redelivered=_Redelivered, exchange=_Exchange, +% routing_key=_RoutingKey}, Content}, State) -> +% #amqp_msg{payload = Payload} = Content, +% io:format("~p~n", [Payload]), +% {noreply, State}; +% +% handle_info(Info, State) -> +% io:format("~p~n", [Info]), +% {noreply, State}. +% +% terminate(_Reason, _State) -> +% ok. +% +% code_change(_OldVsn, State, _Extra) -> +% {ok, State}. +% +% setup_consumer() -> +% {ok, Connection} = amqp_connection:start(network, get_record(amqp_params)), +% {ok, Channel} = amqp_connection:open_channel(Connection), +% +% #'queue.declare_ok'{queue = Queue} = amqp_channel:call(Channel, get_record('queue.declare')), +% +% Exchange = <<"amq.rabbitmq.log">>, +% % Declare = #'exchange.declare'{exchange = Exchange, +% % type = <<"topic">>}, +% % +% % -record('exchange.declare', {ticket = 0, exchange, type = <<"direct">>, passive = false, durable = false, auto_delete = false, internal = false, nowait = false, arguments = []}). +% % #'exchange.declare_ok'{} = amqp_channel:call(Channel, Declare), +% +% RoutingKey = <<"#">>, +% Binding = #'queue.bind'{queue = Queue, +% exchange = Exchange, +% routing_key = RoutingKey}, +% #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), +% +% ConsumerTag = <<"my_tag">>, +% BasicConsume = #'basic.consume'{queue = Queue, +% consumer_tag = ConsumerTag, +% no_ack = true}, +% #'basic.consume_ok'{consumer_tag = ConsumerTag} +% = amqp_channel:subscribe(Channel, BasicConsume, self()), +% +% receive +% #'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok +% end, +% +% {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}}. +% +% anon_consumer_options() -> +% [{'queue.declare', +% +% [{queue, <<"">>}, {passive, false}, {durable, false}, {exclusive, true}, {auto_delete, true}, +% {nowait, false}, {arguments, []}]}, +% +% {'exchange.declare', +% +% [{exchange, <<"">>}, {type, "direct"}, {passive, false}, {durable, true}, +% {auto_delete, false}, {internal, false}, {nowait, false}, {arguments, []}]}, +% +% {'queue.bind', +% +% [{nowait, false}, {arguments, []}]}, +% +% {'basic.consume', +% +% [{consumer_tag, "msgs_consumer"}, {no_local, false}, {no_ack, false}, +% {exclusive, false}, {nowait, false}, {ticket, []}]}]. +% +% +% get_opt(Call, Key) -> +% {ok, Conf} = application:get_env(Call), +% proplists:get_value(Key, Conf). +% +% get_record('queue.declare') -> +% #'queue.declare'{ queue = get_opt(queue_declare, queue), +% passive = get_opt(queue_declare, passive), +% durable = get_opt(queue_declare, durable), +% exclusive = get_opt(queue_declare, exclusive), +% auto_delete = get_opt(queue_declare, auto_delete), +% nowait = get_opt(queue_declare, nowait), +% arguments = get_opt(queue_declare, arguments)}; +% +% get_record('exchange.declare') -> +% #'exchange.declare'{ exchange = get_opt(exchange_declare, exchange), +% type = get_opt(exchange_declare, type), +% passive = get_opt(exchange_declare, passive), +% durable = get_opt(exchange_declare, durable), +% auto_delete = get_opt(exchange_declare, auto_delete), +% internal = get_opt(exchange_declare, internal), +% nowait = get_opt(exchange_declare, nowait), +% arguments = get_opt(exchange_declare, arguments)}; +% +% get_record('queue.bind') -> +% #'queue.bind'{ queue = get_opt(queue_bind, queue), +% exchange = get_opt(queue_bind, exchange), +% routing_key = get_opt(queue_bind, routing_key), +% nowait = get_opt(queue_bind, nowait), +% arguments = get_opt(queue_bind, arguments)}; +% +% get_record('basic.consume') -> +% #'basic.consume'{ queue = get_opt(basic_consume, queue), +% consumer_tag = get_opt(basic_consume, queue), +% no_local = get_opt(basic_consume, no_local), +% no_ack = get_opt(basic_consume, no_ack), +% exclusive = get_opt(basic_consume, exclusive), +% nowait = get_opt(basic_consume, nowait)}; +% +% get_record('amqp.params') -> +% #amqp_params{username=get_opt(connection, user), +% password=get_opt(connection, pass), +% host=get_opt(connection, host), +% port=get_opt(connection, port), +% virtual_host=get_opt(connection, virtual_host)}. +% +% log(Key,Value) -> +% io:format("~p: ~p~n",[Key,Value]). \ No newline at end of file diff --git a/src/rmq_patterns.app.src b/src/rmq_patterns.app.src new file mode 100644 index 0000000..b535679 --- /dev/null +++ b/src/rmq_patterns.app.src @@ -0,0 +1,18 @@ +{application, rmq_patterns, + [ + {description, ""}, + {vsn, "1"}, + {modules, [ + rmq_patterns, + rmq_patterns_app, + rmq_patterns_sup, + rmq_control + ]}, + {registered, []}, + {applications, [ + kernel, + stdlib + ]}, + {mod, { rmq_patterns_app, []}}, + {env, []} + ]}. diff --git a/src/rmq_patterns.erl b/src/rmq_patterns.erl new file mode 100644 index 0000000..75806ef --- /dev/null +++ b/src/rmq_patterns.erl @@ -0,0 +1,8 @@ +-module(rmq_patterns). +-export([start/0, stop/0]). + +start() -> + application:start(rmq_patterns). + +stop() -> + application:stop(rmq_patterns). \ No newline at end of file diff --git a/src/rmq_patterns_app.erl b/src/rmq_patterns_app.erl new file mode 100644 index 0000000..909cd0b --- /dev/null +++ b/src/rmq_patterns_app.erl @@ -0,0 +1,16 @@ +-module(rmq_patterns_app). + +-behaviour(application). + +%% Application callbacks +-export([start/2, stop/1]). + +%% =================================================================== +%% Application callbacks +%% =================================================================== + +start(_StartType, _StartArgs) -> + rmq_patterns_sup:start_link(). + +stop(_State) -> + ok. diff --git a/src/rmq_patterns_sup.erl b/src/rmq_patterns_sup.erl new file mode 100644 index 0000000..e3d8433 --- /dev/null +++ b/src/rmq_patterns_sup.erl @@ -0,0 +1,30 @@ + +-module(rmq_patterns_sup). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +%% Helper macro for declaring children of supervisor +-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}). + +%% =================================================================== +%% API functions +%% =================================================================== + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +%% =================================================================== +%% Supervisor callbacks +%% =================================================================== + +init([]) -> + % RmqControl = ?CHILD(rmq_control, worker), + SmartProxy = ?CHILD(smart_proxy, worker), + {ok, { {one_for_one, 5, 10}, [SmartProxy]} }. + diff --git a/src/smart_proxy.erl b/src/smart_proxy.erl new file mode 100644 index 0000000..9572cbe --- /dev/null +++ b/src/smart_proxy.erl @@ -0,0 +1,185 @@ +-module(smart_proxy). + +-behaviour(gen_server). + +-export([start_link/0, consumer_options/0, stop_proxy/0]). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +-include("amqp_client.hrl"). + +-record(state, {connection, channel, consumer_tag, exchange, routing_key, queue_name}). + +stop_proxy() -> + gen_server:call(?MODULE, stop_proxy). + + +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +init([]) -> + io:format("Starting rmq_control...~n", []), + {ok, {Connection, Channel, Queue, ConsumerTag, Exchange, RoutingKey}} = setup_consumer(), + {ok, #state{connection = Connection, + channel = Channel, + queue_name = Queue, + consumer_tag = ConsumerTag, + exchange = Exchange, + routing_key = RoutingKey}}. + +%% callbacks +handle_call(stop_proxy, _From, #state{channel = Channel, consumer_tag = Tag} = State) -> + log("Stoping consumer", "OK"), + #'basic.cancel_ok'{} = + amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = Tag}), + {reply, ok, State}; + +handle_call(_Request, _From, State) -> + Reply = ok, + {reply, Reply, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State}; + +handle_info(#'basic.cancel_ok'{}, State) -> + {stop, normal, State}; + +handle_info({#'basic.deliver'{consumer_tag=_ConsumerTag, delivery_tag=_DeliveryTag, + redelivered=_Redelivered, exchange=_Exchange, + routing_key=_RoutingKey}, Content}, State) -> + #amqp_msg{payload = Payload} = Content, + log("basic.deliver", Payload), + {noreply, State}; + +handle_info(Info, State) -> + log(Info, State), + {noreply, State}. + +terminate(_Reason, #state{channel = Channel}) -> + amqp_channel:close(Channel), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +setup_consumer() -> + {ok, Connection} = amqp_connection:start(network, get_record(amqp_params)), + % {ok, Connection} = amqp_connection:start(network, #amqp_params{}), + {ok, Channel} = amqp_connection:open_channel(Connection), + + log("Channel", "OK"), + + QDeclare = get_record('queue.declare'), + #'queue.declare_ok'{} = amqp_channel:call(Channel, QDeclare), + + log("QDeclare", "OK"), + + ExDeclare = get_record('exchange.declare'), + #'exchange.declare_ok'{} = amqp_channel:call(Channel, ExDeclare), + + log("ExDeclare", "OK"), + + Bind = get_record('queue.bind'), + #'queue.bind_ok'{} = amqp_channel:call(Channel, Bind), + + log("Bind", "OK"), + + BasicConsume = get_record('basic.consume'), + #'basic.consume_ok'{consumer_tag = ConsumerTag} + = amqp_channel:subscribe(Channel, BasicConsume, self()), + + log("BasicConsume", "OK"), + + receive + #'basic.consume_ok'{consumer_tag = ConsumerTag} -> ok + end, + + log("Ready", "OK"), + + {ok, {Connection, Channel, + QDeclare#'queue.declare'.queue, + BasicConsume#'basic.consume'.consumer_tag, + ExDeclare#'exchange.declare'.exchange, Bind#'queue.bind'.routing_key}}. + +consumer_options() -> + Ex = <<"upload_picture">>, + Q = <<"smart_proxy">>, + RK = <<"">>, + CTag = <<"smart_proxy_consumer">>, + + [{amqp_params, + + [{host, "localhost"}, {port, 5672}, {username, <<"guest">>}, + {password, <<"guest">>}, {virtual_host, <<"/">>}]}, + + {'queue.declare', + + [{queue, Q}, {passive, false}, {durable, true}, {exclusive, false}, {auto_delete, false}, + {nowait, false}, {arguments, []}]}, + + {'exchange.declare', + + [{exchange, Ex}, {type, <<"direct">>}, {passive, false}, {durable, true}, + {auto_delete, false}, {internal, false}, {nowait, false}, {arguments, []}]}, + + {'queue.bind', + + [{queue, Q}, {exchange, Ex}, {routing_key, RK}, {nowait, false}, {arguments, []}]}, + + {'basic.consume', + + [{queue, Q}, {consumer_tag, CTag}, {no_local, false}, {no_ack, false}, + {exclusive, false}, {nowait, false}]}]. + + +get_opt(Call, Key) -> + Conf = proplists:get_value(Call, consumer_options()), + proplists:get_value(Key, Conf). + +get_record('queue.declare') -> + #'queue.declare'{ queue = get_opt('queue.declare', queue), + passive = get_opt('queue.declare', passive), + durable = get_opt('queue.declare', durable), + exclusive = get_opt('queue.declare', exclusive), + auto_delete = get_opt('queue.declare', auto_delete), + nowait = get_opt('queue.declare', nowait), + arguments = get_opt('queue.declare', arguments)}; + +get_record('exchange.declare') -> + #'exchange.declare'{ exchange = get_opt('exchange.declare', exchange), + type = get_opt('exchange.declare', type), + passive = get_opt('exchange.declare', passive), + durable = get_opt('exchange.declare', durable), + auto_delete = get_opt('exchange.declare', auto_delete), + internal = get_opt('exchange.declare', internal), + nowait = get_opt('exchange.declare', nowait), + arguments = get_opt('exchange.declare', arguments)}; + +get_record('queue.bind') -> + #'queue.bind'{ queue = get_opt('queue.bind', queue), + exchange = get_opt('queue.bind', exchange), + routing_key = get_opt('queue.bind', routing_key), + nowait = get_opt('queue.bind', nowait), + arguments = get_opt('queue.bind', arguments)}; + +get_record('basic.consume') -> + #'basic.consume'{ queue = get_opt('basic.consume', queue), + consumer_tag = get_opt('basic.consume', queue), + no_local = get_opt('basic.consume', no_local), + no_ack = get_opt('basic.consume', no_ack), + exclusive = get_opt('basic.consume', exclusive), + nowait = get_opt('basic.consume', nowait)}; + +get_record(amqp_params) -> + #amqp_params{username = get_opt(amqp_params, username), + password = get_opt(amqp_params, password), + host = get_opt(amqp_params, host), + port = get_opt(amqp_params, port), + virtual_host = get_opt(amqp_params, virtual_host)}. + +log(Key,Value) -> + io:format("~p: ~p~n",[Key,Value]). \ No newline at end of file diff --git a/start-dev.sh b/start-dev.sh new file mode 100755 index 0000000..8bf30e1 --- /dev/null +++ b/start-dev.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +# ./start-dev.php + +cd `dirname $0` +exec erl -pa $PWD/ebin -pa $PWD/deps/amqp_client/include/rabbit_common/ebin \ +-pa $PWD/deps/amqp_client/ebin \ +-sname amqp_consumer \ +-s rmq_patterns -boot start_sasl \ No newline at end of file