Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Fixed nasty dets bug..
NOTE TO SELF : make sure owning process is alive before writing to dets
  • Loading branch information
xslogic committed Sep 27, 2010
1 parent 99b8076 commit 1013d57
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 90 deletions.
3 changes: 0 additions & 3 deletions include/phoebus.hrl
Expand Up @@ -51,6 +51,3 @@
vertex_state = active,
vertex_value,
edge_list = []}).
-record(edge, {value,
target_vid,
target_vname}).
11 changes: 6 additions & 5 deletions src/phoebus_master.erl
Expand Up @@ -21,6 +21,7 @@
post_algo/2,
check_algo_finish/2,
store_result/2,
end_state/2,
state_name/3, handle_event/3,
handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

Expand Down Expand Up @@ -171,11 +172,11 @@ check_algo_finish(timeout, #state{step = Step, max_steps = MaxSteps,
store_result({store_result_done, WId, _WData},
#state{workers = Workers} = State) ->
{NewWorkers, NextState} =
update_workers(store_result, all_done, bla, Workers, WId),
case NextState of
all_done -> {stop, normal, State};
_ -> {next_state, NextState, State#state{workers = NewWorkers}}
end.
update_workers(store_result, end_state, bla, Workers, WId),
{next_state, NextState, State#state{workers = NewWorkers}}.

end_state(_Event, State) ->
{next_state, end_state, State}.

%%--------------------------------------------------------------------
%% @private
Expand Down
74 changes: 47 additions & 27 deletions src/phoebus_source.erl
Expand Up @@ -62,42 +62,66 @@ check_dir(URI, Conf) ->

start_reading(file, File, State) ->
MyPid = self(),
RPid = spawn(fun() -> reader_loop({init, File}, MyPid, State) end),
RPid = spawn(fun() -> reader_loop({init, File}, MyPid, {State, []}) end),
{ok, RPid, State}.


reader_loop({init, File}, Pid, State) ->
{ok, FD} = file:open(File, [read, {read_ahead, 4096}]),
{ok, FD} = file:open(File, [raw, {read_ahead, 16384}]),
reader_loop(FD, Pid, State);
reader_loop(FD, Pid, State) ->
{Recs, IsDone} =
lists:foldl(
fun(_, {Records, true}) -> {Records, true};
(_, {Records, X}) ->
case file:read_line(FD) of
{ok, Line} ->
case convert_to_rec(Line) of
#vertex{vertex_id = nil} -> {Records, X};
V -> {[V|Records], X}
end;
eof ->
file:close(FD),
{Records, true}
reader_loop(FD, Pid, {State, Buffer}) ->
X = file:read_line(FD),
%% io:format("~n~n X : [~p][~p] ~n~n", [X, State]),
case X of
{ok, Line} ->
Y = convert_to_rec(Line),
%% io:format("~n~n Y : [~p][~p] ~n~n", [Y, State]),
case Y of
nil -> reader_loop(FD, Pid, {State, Buffer});
V ->
case length(Buffer) > 100 of
true ->
gen_fsm:send_event(Pid, {vertices, [V|Buffer], self(), State}),
reader_loop(FD, Pid, {State, []});
_ ->
reader_loop(FD, Pid, {State, [V|Buffer]})
end
end, {[], false}, lists:seq(1, 10)),
case IsDone of
true -> gen_fsm:send_event(Pid, {vertices_done, Recs, self(), State});
_ -> gen_fsm:send_event(Pid, {vertices, Recs, self(), State}),
reader_loop(FD, Pid, State)
end;
eof ->
gen_fsm:send_event(Pid, {vertices_done, Buffer, self(), State}),
file:close(FD)
end.
%% {Recs, IsDone} =
%% lists:foldl(
%% fun(_, {Records, true}) -> {Records, true};
%% (_, {Records, X}) ->
%% case file:read_line(FD) of
%% {ok, Line} ->
%% case convert_to_rec(Line) of
%% nil -> {Records, X};
%% V -> {[V|Records], X}
%% end;
%% eof ->
%% file:close(FD),
%% {Records, true}
%% end
%% end, {[], false}, lists:seq(1, 100)),
%% case IsDone of
%% true -> gen_fsm:send_event(Pid, {vertices_done, Recs, self(), State});
%% _ -> gen_fsm:send_event(Pid, {vertices, Recs, self(), State}),
%% reader_loop(FD, Pid, State)
%% end.


%% {Vid, VName, VVal, VState, [{EVal, VName}]
%% vname \t vval \t [eval \t tvname \t].. \n
convert_to_rec(Line) ->
convert_to_rec(Line, #vertex{}, [], [], vname).

convert_to_rec([$\n | _], #vertex{vertex_id = nil}, _, _, _) -> nil;
convert_to_rec([$\n | _], V, EList, _, _) ->
V#vertex{edge_list = EList};
{V#vertex.vertex_id, V#vertex.vertex_name, V#vertex.vertex_value,
V#vertex.vertex_state, EList};
convert_to_rec([$\t | Rest], V, EList, Buffer, vname) ->
VName = lists:reverse(Buffer),
VId = erlang:phash2(VName),
Expand All @@ -110,10 +134,6 @@ convert_to_rec([$\t | Rest], V, EList, Buffer, eval) ->
convert_to_rec(Rest, V, EList, [], {tvname, lists:reverse(Buffer)});
convert_to_rec([$\t | Rest], V, EList, Buffer, {tvname, EVal}) ->
VName = lists:reverse(Buffer),
VId = erlang:phash2(VName),
convert_to_rec(Rest, V, [#edge{value = EVal,
target_vid = VId,
target_vname = VName}|EList],
[], eval);
convert_to_rec(Rest, V, [{EVal, VName}|EList], [], eval);
convert_to_rec([X | Rest], V, EList, Buffer, Token) ->
convert_to_rec(Rest, V, EList, [X|Buffer], Token).
22 changes: 19 additions & 3 deletions src/phoebus_worker.erl
Expand Up @@ -130,6 +130,7 @@ vsplit_phase1({vertices, Vertices, RefPid, SS},
worker_info = {JobId, WId} = WInfo,
num_workers = NumWorkers,
part_file = Partition,
table = Table,
sub_state = {reading_partition, {RefPid, _, FDs}}} =
State) ->
?DEBUG("Worker In State.. ", [{state, vsplit_phase1},
Expand All @@ -139,6 +140,12 @@ vsplit_phase1({vertices, Vertices, RefPid, SS},
NewFDs = handle_vertices(NumWorkers, WInfo, Vertices, 0, FDs),
notify_master({MNode, MPid}, {vsplit_phase1_inter, WId,
length(Vertices)}),
worker_store:sync_table(Table, vertex, false),
%% TName = worker_store:table_name(Table, vertex),
%% case dets:info(TName, size) > 1000 of
%% true -> dets:sync(TName);
%% _ -> void
%% end,
{next_state, vsplit_phase1,
State#state{sub_state = {reading_partition, {RefPid, SS, NewFDs}}},
?SOURCE_TIMEOUT()};
Expand All @@ -148,6 +155,7 @@ vsplit_phase1({vertices_done, Vertices, RefPid, SS},
worker_info = {JobId, WId} = WInfo,
num_workers = NumWorkers,
part_file = Partition,
table = Table,
sub_state = {reading_partition, {RefPid, _, FDs}}} =
State) ->
?DEBUG("Worker In State.. ", [{state, vsplit_phase1},
Expand All @@ -157,9 +165,12 @@ vsplit_phase1({vertices_done, Vertices, RefPid, SS},
{worker, WId}]),
NewFDs = handle_vertices(NumWorkers, WInfo, Vertices, 0, FDs),
phoebus_source:destroy(SS),
worker_store:sync_table(Table, vertex, true),
lists:foreach(
fun({_, FD}) -> worker_store:close_step_file(FD) end, NewFDs),
notify_master({MNode, MPid}, {vsplit_phase1_done, WId, length(Vertices)}),
notify_master({MNode, MPid}, {vsplit_phase1_inter, WId,
length(Vertices)}),
notify_master({MNode, MPid}, {vsplit_phase1_done, WId, 0}),
?DEBUG("Worker Exiting State.. ", [{state, vsplit_phase1},
{job, JobId}, {worker, WId}]),
{next_state, await_master, State#state{sub_state = none},
Expand Down Expand Up @@ -210,7 +221,7 @@ post_algo(_Event, #state{master_info = {MNode, MPid, _},
{next_state, await_master, State}.

store_result(_Event, State) ->
{stop, normal, State}.
{next_state, await_master, State}.


%% ------------------------------------------------------------------------
Expand Down Expand Up @@ -345,7 +356,7 @@ code_change(_OldVsn, StateName, State, _Extra) ->
%%%===================================================================
handle_vertices(NumWorkers, {JobId, MyWId}, Vertices, Step, FDs) ->
lists:foldl(
fun(#vertex{vertex_id = VId} = Vertex, OldFDs) ->
fun({VId, _, _, _, _} = Vertex, OldFDs) ->
{Node, WId} = phoebus_utils:vertex_owner(JobId, VId, NumWorkers),
worker_store:store_vertex(Vertex, {Node, {JobId, MyWId, WId}},
Step, OldFDs)
Expand All @@ -367,5 +378,10 @@ notify_master({MNode, MPid}, Notification) ->
%% TODO : have to implemnt.. using a table manager..
acquire_table(JobId, WId) ->
Table = list_to_atom("test_table_" ++ integer_to_list(WId)),
%% Vtable = Worker_store:table_name(Table, vertex),
%% MTable = worker_store:table_name(Table, msg),
ets:insert(table_mapping, {{JobId, WId}, Table}),
worker_store:init_step_file(vertex, JobId, WId, [write], 0),
%% dets:open_file(MTable,
%% [{file, step_data(msg, JobId, WId, Step, Idx)}]);
Table.

0 comments on commit 1013d57

Please sign in to comment.