Skip to content

Commit

Permalink
Wait for uninstrumented messages to be processed
Browse files Browse the repository at this point in the history
  • Loading branch information
iliastsi committed May 15, 2013
1 parent 9ed31a1 commit 0e44926
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 42 deletions.
34 changes: 24 additions & 10 deletions src/concuerror_rep.erl
Expand Up @@ -56,6 +56,8 @@

-export([debug_print/1, debug_print/2, debug_apply/3]).

-export_type([dest/0]).

-include("gen.hrl").

%%%----------------------------------------------------------------------
Expand Down Expand Up @@ -408,7 +410,9 @@ rep_register(RegName, P) ->
rep_send(Dest, Msg) ->
check_unknown_process(),
send_center(Dest, Msg),
Dest ! Msg.
Result = Dest ! Msg,
concuerror_util:wait_messages(find_pid(Dest)),
Result.

%% @spec rep_send(dest(), term(), ['nosuspend' | 'noconnect']) ->
%% 'ok' | 'nosuspend' | 'noconnect'
Expand All @@ -420,7 +424,9 @@ rep_send(Dest, Msg) ->
rep_send(Dest, Msg, Opt) ->
check_unknown_process(),
send_center(Dest, Msg),
erlang:send(Dest, Msg, Opt).
Result = erlang:send(Dest, Msg, Opt),
concuerror_util:wait_messages(find_pid(Dest)),
Result.

send_center(Dest, Msg) ->
PlanLid = ?LID_FROM_PID(find_pid(Dest)),
Expand Down Expand Up @@ -662,7 +668,9 @@ rep_send_after(Time, Dest, Msg) ->
rep_exit(Pid, Reason) ->
check_unknown_process(),
concuerror_sched:notify(exit_2, {?LID_FROM_PID(Pid), Reason}),
exit(Pid, Reason).
exit(Pid, Reason),
concuerror_util:wait_messages(find_pid(Pid)),
true.

%% @spec: rep_unlink(pid() | port()) -> 'true'
%% @doc: Replacement for `unlink/1'.
Expand Down Expand Up @@ -704,25 +712,29 @@ rep_whereis(RegName) ->
concuerror_sched:notify(whereis, {RegName, Value}, prev),
R.

%% @spec rep_port_command(port(), term()) -> term()
%% @spec rep_port_command(port(), term()) -> true
%% @doc: Replacement for `port_command/2'.
%%
%% Just yield before calling port_command/2.
-spec rep_port_command(port, term()) -> term().
-spec rep_port_command(port, term()) -> true.
rep_port_command(Port, Data) ->
check_unknown_process(),
concuerror_sched:notify(port_command, Port),
port_command(Port, Data).
port_command(Port, Data),
concuerror_util:wait_messages(not_found),
true.

%% @spec rep_port_command(port(), term(), [force | nosuspend]) -> term()
%% @spec rep_port_command(port(), term(), [force | nosuspend]) -> boolean()
%% @doc: Replacement for `port_command/3'.
%%
%% Just yield before calling port_command/3.
-spec rep_port_command(port, term(), [force | nosuspend]) -> term().
-spec rep_port_command(port, term(), [force | nosuspend]) -> boolean().
rep_port_command(Port, Data, OptionList) ->
check_unknown_process(),
concuerror_sched:notify(port_command, Port),
port_command(Port, Data, OptionList).
Result = port_command(Port, Data, OptionList),
concuerror_util:wait_messages(not_found),
Result.

%% @spec rep_port_control(port(), integer(), term()) -> term()
%% @doc: Replacement for `port_control/3'.
Expand All @@ -732,7 +744,9 @@ rep_port_command(Port, Data, OptionList) ->
rep_port_control(Port, Operation, Data) ->
check_unknown_process(),
concuerror_sched:notify(port_control, Port),
port_control(Port, Operation, Data).
Result = port_control(Port, Operation, Data),
concuerror_util:wait_messages(not_found),
Result.


%%%----------------------------------------------------------------------
Expand Down
25 changes: 0 additions & 25 deletions src/concuerror_sched.erl
Expand Up @@ -1283,11 +1283,6 @@ wait() ->
end.

replace_messages(Lid, VC) ->
%% Let "black" processes send any remaining messages.
case ets:member(?NT_OPTIONS, 'wait_messages') of
true -> wait_black_messages();
false -> ok
end,
Fun =
fun(Pid, MsgAcc) ->
Pid ! ?VECTOR_MSG(Lid, VC),
Expand All @@ -1303,26 +1298,6 @@ replace_messages(Lid, VC) ->
end,
concuerror_lid:fold_pids(Fun, []).

wait_black_messages() ->
%% Check if there is any processes able to run (apart from current)
%% thus check that there is only one processes with status
%% different than waiting.
Priority = process_flag(priority, low),
receive after 2 -> ok end,
Check =
fun() ->
Running = [P ||
P <- processes(),
process_info(P, status) =/= {status, waiting}],
case Running of
[_] -> true;
_ -> false
end
end,
concuerror_util:wait_until(Check, 2),
process_flag(priority, Priority),
ok.

-define(IS_INSTR_MSG(Msg),
(is_tuple(Msg) andalso
size(Msg) =:= 4 andalso
Expand Down
36 changes: 29 additions & 7 deletions src/concuerror_util.erl
Expand Up @@ -14,7 +14,7 @@

-module(concuerror_util).
-export([doc/1, test/0, flat_format/2, flush_mailbox/0, get_module_name/1,
is_erl_source/1, funs/1, funs/2, funLine/3, pmap/2, wait_until/2,
is_erl_source/1, funs/1, funs/2, funLine/3, pmap/2, wait_messages/1,
timer_init/0, timer_start/1, timer/1, timer_stop/1, timer_destroy/0,
init_state/0, progress_bar/2, to_elapsed_time/1, to_elapsed_time/2]).

Expand Down Expand Up @@ -283,10 +283,32 @@ pmap(Fun, List) ->


%% -------------------------------------------------------------------
%% Wait for something to happen
-spec wait_until(fun(() -> boolean()), non_neg_integer()) -> ok.
wait_until(Fun, Time) ->
case Fun() of
true -> ok;
false -> receive after Time -> wait_until(Fun, Time) end
%% Wait for uninstrumented messages to be processed.
-spec wait_messages(concuerror_rep:dest()) -> ok.
wait_messages(Dest) ->
WaitFlag = ets:member(?NT_OPTIONS, 'wait_messages'),
NotInstr = concuerror_lid:from_pid(Dest) =:= 'not_found',
case (WaitFlag andalso NotInstr) of
true ->
Self = self(),
Pid = spawn(fun() -> trace(Self) end),
receive {Pid, ok} -> ok end;
false ->
ok
end.

trace(Pid) ->
%% Wait until Pid receives a message
{message_queue_len, MsgQueueLen} = process_info(Pid, message_queue_len),
traceLoop(Pid, MsgQueueLen, 5),
Pid ! {self(), ok}.

traceLoop(_Pid, _MsgQueueLen, 0) ->
ok;
traceLoop(Pid, MsgQueueLen, I) ->
{message_queue_len, NewLen} = process_info(Pid, message_queue_len),
case NewLen > MsgQueueLen of
true -> ok;
false ->
receive after 2 -> traceLoop(Pid, MsgQueueLen, I-1) end
end.

0 comments on commit 0e44926

Please sign in to comment.