Skip to content

Commit

Permalink
Added BashoBench driver and config file in priv directory for perform…
Browse files Browse the repository at this point in the history
…ance benchmarking
  • Loading branch information
Vasco committed Oct 26, 2014
1 parent 515e980 commit ef88991
Show file tree
Hide file tree
Showing 12 changed files with 209 additions and 33 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Expand Up @@ -22,3 +22,5 @@ etc/*.erl
out
data
data/*
.rebar
_rel
64 changes: 64 additions & 0 deletions priv/basho_bench_driver_rivus.erl
@@ -0,0 +1,64 @@
-module(basho_bench_driver_rivus).
-compile([{parse_transform, lager_transform}]).

-export([new/1, run/4]).

-include("basho_bench.hrl").

-record(state, {
host,
port,
socket
}).

new(1) ->
{Host, Port, Socket} = connect(),
deploy_queries(Socket),

{ok, #state{
host = Host,
port = Port,
socket = Socket}};
new(_Id) ->
{Host, Port, Socket} = connect(),

{ok, #state{
host = Host,
port = Port,
socket = Socket}}.

connect() ->
{Host, Port} = basho_bench_config:get(rivus_cep_tcp_serv, {"127.0.0.1", 5775}),
{ok, Socket} = gen_tcp:connect(Host, Port, [{active, false}, {nodelay, true}, {packet, 4}, binary]),
{Host, Port, Socket}.

deploy_queries(Socket) ->
Queries = basho_bench_config:get(rivus_cep_queries, []),
lists:foreach(fun({Type, Query}) ->
lager:info("Deploying query type: ~p, stmt: ~p", [Type, Query]),
execute(Type, Query, Socket) end, Queries).

execute(query, Query, Socket) ->
ok = gen_tcp:send(Socket, term_to_binary({load_query, {Query, [benchmark_test], [], []}}));
execute(event, Query, Socket) ->
ok = gen_tcp:send(Socket, term_to_binary({load_query, {Query, [benchmark_event], [], []}})).

send_event(Event, #state{socket = Socket} = State) ->
case gen_tcp:send(Socket, term_to_binary({event, benchmark_test, Event})) of
ok -> {ok, State};
Error -> Error
end.

create_event(_KeyGen, ValueGen) ->
ValueGen.

run(notify, KeyGen, ValueGen, State) ->
Event = create_event(KeyGen, ValueGen),
% Send message
case send_event(Event, State) of
{error, E} ->
{error, E, State};
ok ->
{ok, State};
{ok, State} -> {ok, State}
end.
18 changes: 18 additions & 0 deletions priv/rivus.config
@@ -0,0 +1,18 @@
{mode, {rate, 100000}}.

{duration, 1}.

{concurrent, 2}.

{driver, basho_bench_driver_rivus}.

{operations, [{notify, 1}]}.

{key_generator, {uniform_int, 10000}}.
{value_generator, {function, rivus_cep_event_gen, generate_event, [event11, 4]}}.

{rivus_cep_queries,[{event, "define event11 as (attr1, attr2, attr3, attr4); "},
{query, "define correlation11 as
select sum(ev1.attr1)
from event11 as ev1
within 60 seconds; "}]}.
14 changes: 14 additions & 0 deletions priv/rivus_cep_event_gen.erl
@@ -0,0 +1,14 @@

-module(rivus_cep_event_gen).

%% API
-export([generate_event/3]).

generate_event(_Id, EventName, ParamCount) when is_integer(ParamCount) andalso ParamCount>0 ->
generate(ParamCount, [EventName]).

generate(0, Acc) ->
list_to_tuple(Acc);
generate(ParamCount, Acc) ->
generate(ParamCount-1, Acc ++ [random:uniform(1000)]).

Binary file modified rebar
Binary file not shown.
Binary file added relx
Binary file not shown.
8 changes: 8 additions & 0 deletions relx.config
@@ -0,0 +1,8 @@
{release, {rivus_cep, "0.1"},
[gproc,
lager,
folsom,
gen_leader,
rivus_cep]}.

{extended_start_script, true}.
File renamed without changes.
2 changes: 1 addition & 1 deletion src/rivus_cep.app.src
Expand Up @@ -11,7 +11,7 @@
lager
]},
{mod, { rivus_cep_app, []}},
{env, []}
{env, [{rivus_tcp_serv, {"127.0.0.1", 5775}}]}
]}.


48 changes: 24 additions & 24 deletions src/rivus_cep.erl
Expand Up @@ -130,7 +130,7 @@ handle_call({execute, [QueryStr, _Producers, Subscribers, Options]}, _From, Stat

case QueryDetails of
{module, _} -> {reply, ok, State};
_ -> lager:debug("Query sup, Args: ~p~n", [QueryDetails]),
_ -> lager:info("Query sup, Args: ~p~n", [QueryDetails]),
{ok, Pid} = supervisor:start_child(QuerySup, [QueryDetails]),
{reply, {ok, Pid, QueryDetails}, State#state{win_register = QueryDetails#query_details.window_register}}
end;
Expand Down Expand Up @@ -171,29 +171,29 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%--------------------------------------------------------------------
get_query_details([QueryStr, _Producers, Subscribers, Options], WinReg) ->
QueryClauses = parse_query(QueryStr),

case QueryClauses of
{event, EventDef} -> rivus_cep_event_creator:load_event_mod(EventDef);
_ -> Producers = case _Producers of
[] -> [any];
_ -> _Producers
end,

{{EventWindow, EvWinPid}, {FsmWindow, FsmWinPid}, NewWinReg} = register_windows(QueryClauses, Options, WinReg),

#query_details{
clauses = QueryClauses,
producers = Producers,
subscribers = Subscribers,
options = Options,
event_window = EventWindow,
fsm_window = FsmWindow,
window_register = NewWinReg,
event_window_pid = EvWinPid,
fsm_window_pid = FsmWinPid
}
end.
QueryClauses = parse_query(QueryStr),

case QueryClauses of
{event, EventDef} -> rivus_cep_event_creator:load_event_mod(EventDef);
_ -> Producers = case _Producers of
[] -> [any];
_ -> _Producers
end,

{{EventWindow, EvWinPid}, {FsmWindow, FsmWinPid}, NewWinReg} = register_windows(QueryClauses, Options, WinReg),

#query_details{
clauses = QueryClauses,
producers = Producers,
subscribers = Subscribers,
options = Options,
event_window = EventWindow,
fsm_window = FsmWindow,
window_register = NewWinReg,
event_window_pid = EvWinPid,
fsm_window_pid = FsmWinPid
}
end.

parse_query(QueryStr) ->
{ok, Tokens, _} = rivus_cep_scanner:string(QueryStr, 1),
Expand Down
10 changes: 8 additions & 2 deletions src/rivus_cep_server.erl
Expand Up @@ -121,5 +121,11 @@ process_event(Provider, Event) ->

load_query(Query, Socket) ->
{QueryStr, Providers, UpdateListners, Options} = Query,
{ok, QueryPid, _} = rivus_cep:load_query(QueryStr, Providers, UpdateListners, Options),
ok = gen_tcp:send(Socket, term_to_binary(QueryPid)).

case rivus_cep:execute(QueryStr, Providers, UpdateListners, Options) of
{ok, QueryPid, _} -> ok = gen_tcp:send(Socket, term_to_binary(QueryPid));
ok -> ok = gen_tcp:send(Socket, term_to_binary(ok));
_ -> lager:error("Cannot execute statement:: ~p",[Query]),
ok = gen_tcp:send(Socket, term_to_binary({error,cannot_execute_query}))
end.

76 changes: 70 additions & 6 deletions test/rivus_cep_server_tests.erl
Expand Up @@ -23,10 +23,16 @@ query_worker_test_() ->
application:stop(gproc),
application:stop(rivus_cep)
end,

[{"Test query via TCP connection",
timeout, 60*60,
fun load_query_tcp/0}
[
%% {"Test query via TCP connection",
%% timeout, 60*60,
%% fun load_query_tcp/0},
%% {"Test event module load via TCP connection",
%% timeout, 60*60,
%% fun load_event_tcp/0},
{"Test query and event module load via TCP connection",
timeout, 60*60,
fun load_query_and_event/0}
]
}.

Expand Down Expand Up @@ -71,6 +77,64 @@ load_query_tcp() ->

gen_server:call(QueryPid, stop),
gen_server:call(Pid, stop).



load_event_tcp() ->
EventDefStr = "define event11 as (attr1, attr2, attr3, attr4);",
{ok, {Host, Port}} = application:get_env(rivus_cep, rivus_tcp_serv),
{ok, Socket} = gen_tcp:connect(Host, Port, [{active, false}, {nodelay, true}, {packet, 4}, binary]),
?assertEqual(ok, gen_tcp:send(Socket, term_to_binary({load_query, {EventDefStr, [], [], []}}))),
Result = gen_tcp:recv(Socket, 0),
?assertMatch({ok, _}, Result),
QueryPid = binary_to_term(element(2, Result)).



load_query_and_event() ->
EventDefStr = "define event11 as (attr1, attr2, attr3, attr4);",
Query = "define correlation1 as
select sum(ev1.attr1)
from event11 as ev1
within 60 seconds; ",
{ok, {Host, Port}} = application:get_env(rivus_cep, rivus_tcp_serv),
{ok, Socket} = gen_tcp:connect(Host, Port, [{active, false}, {nodelay, true}, {packet, 4}, binary]),
ok = gen_tcp:send(Socket, term_to_binary({load_query, {EventDefStr, [benchmark_event], [], []}})),
?assertMatch({ok, _}, gen_tcp:recv(Socket, 0)),


ok = gen_tcp:send(Socket, term_to_binary({load_query, {Query, [benchmark_test], [], []}})),

?assertMatch({ok, _}, gen_tcp:recv(Socket, 0)).

%% {ok, {Host, Port}} = application:get_env(rivus_cep, rivus_tcp_serv).
%% {ok, Socket} = gen_tcp:connect(Host, Port, [{active, false}, {nodelay, true}, {packet, 4}, binary]).
%%
%%
%% Event1 = {event11, 10,b,c}.
%% Event2 = {event11, 15,bbb,c}.
%% Event3 = {event11, 20,b,c}.
%% Event4 = {event11, 30,b,cc,d}.
%% Event5 = {event11, 40,bb,cc,dd}.
%%
%% rivus_cep:notify(test1, Event1).
%% rivus_cep:notify(test1, Event2).
%% rivus_cep:notify(test1, Event3).
%% rivus_cep:notify(test1, Event4).
%% rivus_cep:notify(test1, Event5).
%%
%%
%% {ok,Pid} = result_subscriber:start_link().
%% EventDefStr = "define event11 as (attr1, attr2, attr3, attr4);".
%% QueryStr = "define correlation1 as
%% select sum(ev1.attr1)
%% from event11 as ev1
%% within 60 seconds; ".
%% ok = rivus_cep:execute(EventDefStr).
%% {ok, QueryPid, _} = rivus_cep:execute(QueryStr, [test1], [Pid], []).
%%
%%
%%
%% gen_tcp:send(Socket, term_to_binary({event, test1, Event1})).
%% gen_tcp:send(Socket, term_to_binary({event, test1, Event2})).
%% gen_tcp:send(Socket, term_to_binary({event, test1, Event3})).
%% gen_tcp:send(Socket, term_to_binary({event, test1, Event4})).
%% gen_tcp:send(Socket, term_to_binary({event, test1, Event5})).

0 comments on commit ef88991

Please sign in to comment.