Skip to content

Commit

Permalink
nif_SUITE send3 stress test
Browse files Browse the repository at this point in the history
  • Loading branch information
sverker committed Jul 8, 2010
1 parent 299e223 commit 8edd186
Show file tree
Hide file tree
Showing 2 changed files with 245 additions and 37 deletions.
168 changes: 164 additions & 4 deletions erts/emulator/test/nif_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
fin_per_testcase/2, basic/1, reload/1, upgrade/1, heap_frag/1,
types/1, many_args/1, binaries/1, get_string/1, get_atom/1, api_macros/1,
from_array/1, iolist_as_binary/1, resource/1, resource_binary/1, resource_takeover/1,
threading/1, send/1, send2/1, send_threaded/1, neg/1, is_checks/1,
threading/1, send/1, send2/1, send3/1, send_threaded/1, neg/1, is_checks/1,
get_length/1, make_atom/1, make_string/1]).

-export([many_args_100/100]).
Expand All @@ -51,7 +51,7 @@
all(suite) ->
[basic, reload, upgrade, heap_frag, types, many_args, binaries, get_string,
get_atom, api_macros, from_array, iolist_as_binary, resource, resource_binary,
resource_takeover, threading, send, send2, send_threaded, neg, is_checks,
resource_takeover, threading, send, send2, send3, send_threaded, neg, is_checks,
get_length, make_atom, make_string].

%%init_per_testcase(_Case, Config) ->
Expand Down Expand Up @@ -916,7 +916,164 @@ forwarder(To, N) ->

other_term() ->
{fun(X,Y) -> X*Y end, make_ref()}.


send3(doc) -> ["Message sending stress test"];
send3(Config) when is_list(Config) ->
%% Let a number of processes send random message blobs between each other
%% using enif_send. Kill and spawn new ones randomly to keep a ~constant
%% number of workers running.
Seed = now(),
io:format("seed: ~p\n",[Seed]),
random:seed(Seed),
ets:new(nif_SUITE,[named_table,public]),
?line true = ets:insert(nif_SUITE,{send3,0,0,0,0}),
timer:send_after(10000, timeout), % Run for 10 seconds
SpawnCnt = send3_controller(0, [], [], 20),
?line [{_,Rcv,SndOk,SndFail,Balance}] = ets:lookup(nif_SUITE,send3),
io:format("spawns=~p received=~p, sent=~p send-failure=~p balance=~p\n",
[SpawnCnt,Rcv,SndOk,SndFail,Balance]),
ets:delete(nif_SUITE).

send3_controller(SpawnCnt, [], _, infinity) ->
SpawnCnt;
send3_controller(SpawnCnt0, Mons0, Pids0, Tick) ->
receive
timeout ->
io:format("Timeout. Sending 'halt' to ~p\n",[Pids0]),
lists:foreach(fun(P) -> P ! {halt,self()} end, Pids0),
lists:foreach(fun(P) -> receive {halted,P} -> ok end end, Pids0),
QTot = lists:foldl(fun(P,QSum) ->
{message_queue_len,QLen} =
erlang:process_info(P,message_queue_len),
QSum + QLen
end, 0, Pids0),
io:format("Total queue length ~p\n",[QTot]),
lists:foreach(fun(P) -> P ! die end, Pids0),
send3_controller(SpawnCnt0, Mons0, [], infinity);
{'DOWN', MonRef, process, _Pid, _} ->
Mons1 = lists:delete(MonRef, Mons0),
%%io:format("Got DOWN from ~p. Monitors left: ~p\n",[Pid,Mons1]),
send3_controller(SpawnCnt0, Mons1, Pids0, Tick)
after Tick ->
Max = 20,
N = length(Pids0),
PidN = random:uniform(Max),
%%io:format("N=~p PidN=~p Pids0=~p\n", [N,PidN,Pids0]),
case PidN > N of
true ->
{NewPid,Mon} = spawn_opt(fun send3_proc/0, [link,monitor]),
lists:foreach(fun(P) -> P ! {is_born,NewPid} end, Pids0),
?line Balance = ets:lookup_element(nif_SUITE,send3,5),
Inject = (Balance =< 0),
case Inject of
true -> ok;
false -> ets:update_element(nif_SUITE,send3,{5,-1})
end,
NewPid ! {pids,Pids0,Inject},
send3_controller(SpawnCnt0+1, [Mon|Mons0], [NewPid|Pids0], Tick);
false ->
KillPid = lists:nth(PidN,Pids0),
KillPid ! die,
Pids1 = lists:delete(KillPid, Pids0),
lists:foreach(fun(P) -> P ! {is_dead,KillPid} end, Pids1),
send3_controller(SpawnCnt0, Mons0, Pids1, Tick)
end
end.

send3_proc() ->
%%io:format("Process ~p spawned\n",[self()]),
send3_proc([self()], {0,0,0}, {1,2,3,4,5}).
send3_proc(Pids0, Counters={Rcv,SndOk,SndFail}, State0) ->
%%io:format("~p: Pids0=~p", [self(), Pids0]),
%%timer:sleep(10),
receive
{pids, Pids1, Inject} ->
%%io:format("~p: got ~p Inject=~p\n", [self(), Pids1, Inject]),
?line Pids0 = [self()],
Pids2 = [self() | Pids1],
case Inject of
true -> send3_proc_send(Pids2, Counters, State0);
false -> send3_proc(Pids2, Counters, State0)
end;
{is_born, Pid} ->
%%io:format("~p: is_born ~p, got ~p\n", [self(), Pid, Pids0]),
send3_proc([Pid | Pids0], Counters, State0);
{is_dead, Pid} ->
Pids1 = lists:delete(Pid,Pids0),
%%io:format("~p: is_dead ~p, got ~p\n", [self(), Pid, Pids1]),
send3_proc(Pids1, Counters, State0);
{blob, Blob0} ->
%%io:format("~p: blob ~p\n", [self(), Blob0]),
State1 = send3_new_state(State0, Blob0),
send3_proc_send(Pids0, {Rcv+1,SndOk,SndFail}, State1);
die ->
%%io:format("Process ~p terminating, stats = ~p\n",[self(),Counters]),
{message_queue_len,Dropped} = erlang:process_info(self(),message_queue_len),
_R = ets:update_counter(nif_SUITE,send3,
[{2,Rcv},{3,SndOk},{4,SndFail},{5,1-Dropped}]),
%%io:format("~p: dies R=~p\n", [self(), R]),
ok;
{halt,Papa} ->
Papa ! {halted,self()},
io:format("~p halted\n",[self()]),
receive die -> ok end,
io:format("~p dying\n",[self()])
end.

send3_proc_send(Pids, {Rcv,SndOk,SndFail}, State0) ->
To = lists:nth(random:uniform(length(Pids)),Pids),
Blob = send3_make_blob(),
State1 = send3_new_state(State0,Blob),
case send3_send(To, Blob) of
true ->
send3_proc(Pids, {Rcv,SndOk+1,SndFail}, State1);
false ->
send3_proc(Pids, {Rcv,SndOk,SndFail+1}, State1)
end.


send3_make_blob() ->
case random:uniform(20)-1 of
0 -> {term,[]};
N ->
MsgEnv = alloc_msgenv(),
repeat(N bsr 1,
fun(_) -> grow_blob(MsgEnv,other_term(),random:uniform(1 bsl 20))
end, void),
case (N band 1) of
0 -> {term,copy_blob(MsgEnv)};
1 -> {msgenv,MsgEnv}
end
end.

send3_send(Pid, Msg) ->
%% 90% enif_send and 10% normal bang
case random:uniform(10) of
1 -> send3_send_bang(Pid,Msg);
_ -> send3_send_nif(Pid,Msg)
end.
send3_send_nif(Pid, {term,Blob}) ->
%%io:format("~p send term nif\n",[self()]),
send_term(Pid, {blob, Blob}) =:= 1;
send3_send_nif(Pid, {msgenv,MsgEnv}) ->
%%io:format("~p send blob nif\n",[self()]),
send3_blob(MsgEnv, Pid, blob) =:= 1.

send3_send_bang(Pid, {term,Blob}) ->
%%io:format("~p send term bang\n",[self()]),
Pid ! {blob, Blob},
true;
send3_send_bang(Pid, {msgenv,MsgEnv}) ->
%%io:format("~p send blob bang\n",[self()]),
Pid ! {blob, copy_blob(MsgEnv)},
true.

send3_new_state(State, Blob) ->
case random:uniform(5+2) of
N when N =< 5-> setelement(N, State, Blob);
_ -> State % Don't store blob
end.

neg(doc) -> ["Negative testing of load_nif"];
neg(Config) when is_list(Config) ->
TmpMem = tmpmem(),
Expand Down Expand Up @@ -1070,10 +1227,13 @@ send_new_blob(_,_) -> ?nif_stub.
alloc_msgenv() -> ?nif_stub.
clear_msgenv(_) -> ?nif_stub.
grow_blob(_,_) -> ?nif_stub.
grow_blob(_,_,_) -> ?nif_stub.
send_blob(_,_) -> ?nif_stub.
send3_blob(_,_,_) -> ?nif_stub.
send_blob_thread(_,_,_) -> ?nif_stub.
join_send_thread(_) -> ?nif_stub.

copy_blob(_) -> ?nif_stub.
send_term(_,_) -> ?nif_stub.

nif_stub_error(Line) ->
exit({nif_not_loaded,module,?MODULE,line,Line}).
114 changes: 81 additions & 33 deletions erts/emulator/test/nif_SUITE_data/nif_SUITE.c
Original file line number Diff line number Diff line change
Expand Up @@ -1038,36 +1038,41 @@ static ERL_NIF_TERM make_term_copy(struct make_term_info* mti, int n)
{
return enif_make_copy(mti->dst_env, mti->other_term);
}

typedef ERL_NIF_TERM Make_term_Func(struct make_term_info*, int);
static Make_term_Func* make_funcs[] = {
make_term_binary,
make_term_int,
make_term_ulong,
make_term_double,
make_term_atom,
make_term_existing_atom,
make_term_string,
//make_term_ref,
make_term_sub_binary,
make_term_uint,
make_term_long,
make_term_tuple0,
make_term_list0,
make_term_resource,
make_term_new_binary,
make_term_caller_pid,
make_term_tuple,
make_term_list,
make_term_list_cell,
make_term_tuple_from_array,
make_term_list_from_array,
make_term_garbage,
make_term_copy
};
static unsigned num_of_make_funcs()
{
return sizeof(make_funcs)/sizeof(*make_funcs);
}
static int make_term_n(struct make_term_info* mti, int n, ERL_NIF_TERM* res)
{
typedef ERL_NIF_TERM Make_term_Func(struct make_term_info*, int);
static Make_term_Func* funcs[] = {
make_term_binary,
make_term_int,
make_term_ulong,
make_term_double,
make_term_atom,
make_term_existing_atom,
make_term_string,
//make_term_ref,
make_term_sub_binary,
make_term_uint,
make_term_long,
make_term_tuple0,
make_term_list0,
make_term_resource,
make_term_new_binary,
make_term_caller_pid,
make_term_tuple,
make_term_list,
make_term_list_cell,
make_term_tuple_from_array,
make_term_list_from_array,
make_term_garbage,
make_term_copy
};
if (n < sizeof(funcs)/sizeof(*funcs)) {
*res = funcs[n](mti, n);
if (n < num_of_make_funcs()) {
*res = make_funcs[n](mti, n);
push_term(mti, *res);
return 1;
}
Expand Down Expand Up @@ -1167,14 +1172,14 @@ static ERL_NIF_TERM grow_blob(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
{
union { void* vp; struct make_term_info* p; }mti;
ERL_NIF_TERM term;
if (!enif_get_resource(env, argv[0], msgenv_resource_type, &mti.vp)) {
if (!enif_get_resource(env, argv[0], msgenv_resource_type, &mti.vp)
|| (argc>2 && !enif_get_uint(env,argv[2], &mti.p->n))) {
return enif_make_badarg(env);
}
mti.p->caller_env = env;
mti.p->other_term = argv[1];
while (!make_term_n(mti.p, mti.p->n++, &term)) {
mti.p->n = 0;
}
mti.p->n %= num_of_make_funcs();
make_term_n(mti.p, mti.p->n++, &term);
mti.p->blob = enif_make_list_cell(mti.p->dst_env, term, mti.p->blob);
return atom_ok;
}
Expand All @@ -1194,6 +1199,23 @@ static ERL_NIF_TERM send_blob(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[
return enif_make_tuple3(env, atom_ok, enif_make_int(env,res), copy);
}

static ERL_NIF_TERM send3_blob(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
union { void* vp; struct make_term_info* p; }mti;
ErlNifPid to;
ERL_NIF_TERM copy;
int res;
if (!enif_get_resource(env, argv[0], msgenv_resource_type, &mti.vp)
|| !enif_get_local_pid(env, argv[1], &to)) {
return enif_make_badarg(env);
}
mti.p->blob = enif_make_tuple2(mti.p->dst_env,
enif_make_copy(mti.p->dst_env, argv[2]),
mti.p->blob);
res = enif_send(env, &to, mti.p->dst_env, mti.p->blob);
return enif_make_int(env,res);
}

void* threaded_sender(void *arg)
{

Expand Down Expand Up @@ -1253,6 +1275,28 @@ static ERL_NIF_TERM join_send_thread(ErlNifEnv* env, int argc, const ERL_NIF_TER
return enif_make_tuple2(env, atom_ok, enif_make_int(env, mti.p->send_res));
}

static ERL_NIF_TERM copy_blob(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
union { void* vp; struct make_term_info* p; }mti;
if (!enif_get_resource(env, argv[0], msgenv_resource_type, &mti.vp)) {
return enif_make_badarg(env);
}
return enif_make_copy(env, mti.p->blob);
}

static ERL_NIF_TERM send_term(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[])
{
ErlNifEnv* menv;
ErlNifPid pid;
int ret;
if (!enif_get_local_pid(env, argv[0], &pid)) {
return enif_make_badarg(env);
}
menv = enif_alloc_env();
ret = enif_send(env, &pid, menv, enif_make_copy(menv, argv[1]));
enif_free_env(menv);
return enif_make_int(env, ret);
}

static ErlNifFunc nif_funcs[] =
{
Expand Down Expand Up @@ -1291,9 +1335,13 @@ static ErlNifFunc nif_funcs[] =
{"alloc_msgenv", 0, alloc_msgenv},
{"clear_msgenv", 1, clear_msgenv},
{"grow_blob", 2, grow_blob},
{"grow_blob", 3, grow_blob},
{"send_blob", 2, send_blob},
{"send3_blob", 3, send3_blob},
{"send_blob_thread", 3, send_blob_thread},
{"join_send_thread", 1, join_send_thread}
{"join_send_thread", 1, join_send_thread},
{"copy_blob", 1, copy_blob},
{"send_term", 2, send_term}
};

ERL_NIF_INIT(nif_SUITE,nif_funcs,load,reload,upgrade,unload)
Expand Down

0 comments on commit 8edd186

Please sign in to comment.