Skip to content

Commit

Permalink
MB-9771 Use port stream option to read stdout response
Browse files Browse the repository at this point in the history
For native incremental index updater, use stream method to
read lines from stdout and parse view group header.
Earlier {line, N} option was used to receive stdout response.
It has limited control for receiving binary raw data (header data)
since the port driver itself tries parse lines.

Change-Id: I4104d494a7e56031e4e60cb7599d8382c10d0f3a
Reviewed-on: http://review.couchbase.org/31551
Reviewed-by: Filipe David Borba Manana <fdmanana@gmail.com>
Tested-by: Filipe David Borba Manana <fdmanana@gmail.com>
  • Loading branch information
t3rm1n4l authored and fdmanana committed Jan 20, 2014
1 parent 05d579f commit 68bc5b6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 62 deletions.
79 changes: 45 additions & 34 deletions src/couch_set_view/src/couch_set_view_updater.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1073,7 +1073,7 @@ update_btrees(WriterAcc) ->
Cmd ->
ok
end,
Options = [exit_status, use_stdio, stderr_to_stdout, {line, 4096}, binary],
Options = [exit_status, use_stdio, stderr_to_stdout, stream, binary],
Port = open_port({spawn_executable, Cmd}, Options),

true = port_command(Port, [TmpDir, $\n]),
Expand All @@ -1091,7 +1091,8 @@ update_btrees(WriterAcc) ->
end, SetViews),
true = port_command(Port, [integer_to_list(MaxBatchSize), $\n]),
ok = couch_set_view_util:send_group_header(Group, Port),
{NewGroup, Result} = try update_view_group_wait_loop(Port, Group, [], {0, 0, 0, 0, 0}) of
{NewGroup, Result} =
try update_view_group_wait_loop(Port, Group, <<>>, {0, 0, 0, 0, 0}) of
{ok, Resp} ->
Resp
catch
Expand Down Expand Up @@ -1133,29 +1134,41 @@ update_btrees(WriterAcc) ->
end, [], ViewInfos),
{ok, NewGroup, CleanupCount, NewStats, CompactFiles}.

update_view_group_wait_loop(Port, Group, Acc, Result) ->
update_view_group_wait_loop(Port, Group, Acc0, Result) ->
#set_view_group{
set_name = SetName,
name = DDocId,
type = Type
} = Group,
receive
{Port, {exit_status, 0}} ->
{ok, {Group, Result}};
{Port, {exit_status, 1}} ->
?LOG_INFO("Set view `~s`, ~s group `~s`, index updater stopped successfully.",
[SetName, Type, DDocId]),
throw(stopped);
{Port, {exit_status, Status}} ->
throw({view_group_index_updater_exit, Status});
{Port, {data, {noeol, Data}}} ->
update_view_group_wait_loop(Port, Group, [Data | Acc], Result);
{Port, {data, {eol, <<"Header Len : ", Data/binary>>}}} ->
{Line, Acc} = couch_set_view_util:try_read_line(Acc0),
case Line of
nil ->
receive
{Port, {data, Data}} ->
Acc2 = iolist_to_binary([Acc, Data]),
update_view_group_wait_loop(Port, Group, Acc2, Result);
{Port, {exit_status, 0}} ->
{ok, {Group, Result}};
{Port, {exit_status, 1}} ->
?LOG_INFO("Set view `~s`, ~s group `~s`, index updater stopped successfully.",
[SetName, Type, DDocId]),
throw(stopped);
{Port, {exit_status, Status}} ->
throw({view_group_index_updater_exit, Status});
{Port, Error} ->
throw({view_group_index_updater_error, Error});
stop ->
?LOG_INFO("Set view `~s`, ~s group `~s`, sending stop message to index updater.",
[SetName, Type, DDocId]),
true = port_command(Port, "exit"),
update_view_group_wait_loop(Port, Group, Acc, Result)
end;
<<"Header Len : ", Data/binary>> ->
% Read resulting group from stdout
{ok, [HeaderLen], []} = io_lib:fread("~d", binary_to_list(Data)),

NewGroup = case couch_set_view_util:receive_group_header(Port, HeaderLen) of
{ok, HeaderBin} ->
{NewGroup, Acc2} =
case couch_set_view_util:receive_group_header(Port, HeaderLen, Acc) of
{ok, HeaderBin, Rest} ->
#set_view_group{
id_btree = IdBtree,
views = Views
Expand All @@ -1175,17 +1188,18 @@ update_view_group_wait_loop(Port, Group, Acc, Result) ->
end,
Views, NewViewRoots),

Group#set_view_group{
NewGroup0 = Group#set_view_group{
id_btree = NewIdBtree,
views = NewViews,
index_header = Header
};
{error, Error} ->
},
{NewGroup0, Rest};
{error, Error, Rest} ->
self() ! Error,
Group
{Group, Rest}
end,
update_view_group_wait_loop(Port, NewGroup, Acc, Result);
{Port, {data, {eol, <<"Results = ", Data/binary>>}}} ->
update_view_group_wait_loop(Port, NewGroup, Acc2, Result);
<<"Results = ", Data/binary>> ->
{ok, [IdInserted, IdDeleted, ViewInserted, ViewDeleted, Cleanups], []} =
io_lib:fread("id_inserts : ~d, "
"id_deletes : ~d, "
Expand All @@ -1195,18 +1209,15 @@ update_view_group_wait_loop(Port, Group, Acc, Result) ->
binary_to_list(Data)),
Result2 = {IdInserted, IdDeleted, ViewInserted, ViewDeleted, Cleanups},
update_view_group_wait_loop(Port, Group, Acc, Result2);
{Port, {data, {eol, Data}}} ->
Msg = lists:reverse([Data | Acc]),
Msg ->
#set_view_group{
set_name = SetName,
name = DDocId,
type = Type
} = Group,
?LOG_ERROR("Set view `~s`, ~s group `~s`, received error from index updater: ~s",
[SetName, Type, DDocId, Msg]),
update_view_group_wait_loop(Port, Group, [], Result);
{Port, Error} ->
throw({view_group_index_updater_error, Error});
stop ->
?LOG_INFO("Set view `~s`, ~s group `~s`, sending stop message to index updater.",
[SetName, Type, DDocId]),
true = port_command(Port, "exit"),
update_view_group_wait_loop(Port, Group, Acc, Result)
update_view_group_wait_loop(Port, Group, <<>>, Result)
end.

update_seqs(PartIdSeqs, Seqs) ->
Expand Down
61 changes: 33 additions & 28 deletions src/couch_set_view/src/couch_set_view_util.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@
-export([check_primary_key_size/5, check_primary_value_size/5]).
-export([refresh_viewgroup_header/1]).
-export([shutdown_cleaner/2, shutdown_wait/1]).
-export([send_group_header/2, receive_group_header/2]).
-export([try_read_line/1]).
-export([send_group_header/2, receive_group_header/3]).


-include("couch_db.hrl").
Expand Down Expand Up @@ -673,6 +674,17 @@ shutdown_cleaner(#set_view_group{mod = Mod}, Pid) ->
couch_util:shutdown_sync(Pid)
end.


-spec try_read_line(binary()) -> {binary() | nil, binary()}.
try_read_line(Data) ->
case binary:split(Data, <<"\n">>) of
[Line, Rest] ->
{Line, Rest};
[Rest] ->
{nil, Rest}
end.


% Send binary group header data to a external process via stdin
-spec send_group_header(#set_view_group{}, port()) -> 'ok'.
send_group_header(Group, Port) ->
Expand All @@ -681,34 +693,27 @@ send_group_header(Group, Port) ->
true = port_command(Port, [Len, $\n, HeaderBin]),
ok.


% Read group header from stdout of external process
-spec receive_group_header(port(), integer()) -> {'ok', binary()} | {'error', term()}.
receive_group_header(Port, Len) ->
receive_group_header(Port, Len, []).

receive_group_header(_, 0, HeaderData) ->
{ok, HeaderData};
receive_group_header(Port, Len, HeaderData) ->
receive
{Port, {data, {noeol, Data}}} ->
Data2 = ?l2b([HeaderData, Data]),
receive_group_header(Port, Len - byte_size(Data), Data2);
{Port, {data, {eol, Data}}} ->
Len2 = Len - byte_size(Data),
case Len2 of
0 ->
Data2 = ?l2b([HeaderData, Data]),
Len3 = Len2;
_ ->
Data2 = ?l2b([HeaderData, Data, $\n]),
Len3 = Len2 - 1
end,
receive_group_header(Port, Len3, Data2);
{Port, {exit_status, 0}} ->
self() ! {Port, {exit_status, 0}},
receive_group_header(Port, Len, HeaderData);
Error ->
{error, Error}
-spec receive_group_header(port(), integer(), binary()) ->
{'ok', binary(), binary()} | {'error', term(), binary()}.
receive_group_header(Port, Len, HeaderAcc) ->
case byte_size(HeaderAcc) of
Sz when Sz >= Len + 1 ->
HeaderBin = binary:part(HeaderAcc, 0, Len),
% Remaining data excluding a \n character
Remaining = binary:part(HeaderAcc, Len + 1, Sz - Len - 1),
{ok, HeaderBin, Remaining};
_ ->
receive
{Port, {data, Data}} ->
receive_group_header(Port, Len, Data);
{Port, {exit_status, 0}} ->
self() ! {Port, {exit_status, 0}},
receive_group_header(Port, Len, HeaderAcc);
Error ->
{error, Error, HeaderAcc}
end
end.


Expand Down

0 comments on commit 68bc5b6

Please sign in to comment.