Skip to content

Commit

Permalink
Added HDFS support..
Browse files Browse the repository at this point in the history
  • Loading branch information
xslogic committed Nov 4, 2010
1 parent 7c43146 commit dc7a2af
Show file tree
Hide file tree
Showing 8 changed files with 352 additions and 139 deletions.
2 changes: 2 additions & 0 deletions ebin/phoebus.app
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@
[{description,[]},
{vsn,"1"},
{modules,[algos,
serde,
external_store,
external_store_file,
external_store_hdfs,
hadoopfs_types,
msg_buffer,
phoebus_app,
Expand Down
2 changes: 2 additions & 0 deletions include/phoebus.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
-author('Arun Suresh <arun.suresh@gmail.com>').

-define(DEBUG(Str, Args), error_logger:info_report([Str|Args])).
-define(ERROR(Str, Args), error_logger:error_report([Str|Args])).
-define(WARN(Str, Args), error_logger:warning_report([Str|Args])).

-define(BASE_DIR(), phoebus_utils:get_env(store_dir, "/tmp/phoebus/")).
-define(JOB_DIR(JobId, WId),
Expand Down
3 changes: 2 additions & 1 deletion src/algos.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ create_binary_tree(Dir, NumFiles, NumRecs) ->
fun(N, [F|Rest]) ->
Line =
lists:concat(
[Fn(N),"\t",Fn(N),"\t1\t",Fn(N*2),"\t1\t",Fn((N*2)+1),"\t\n"]),
[Fn(N),"\t",Fn(N),"\t",
"1\t",Fn(N*2),"\t1\t",Fn((N*2)+1),"\t\r\n"]),
file:write(F, Line),
Rest ++ [F]
end, FDs, lists:seq(1, NumRecs)),
Expand Down
38 changes: 4 additions & 34 deletions src/external_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
read_vertices/2,
store_vertices/2,
destroy/1,
check_dir/1,
deserialize/1,
serialize/1
check_dir/1
]).

behaviour_info(callbacks) ->
Expand Down Expand Up @@ -62,7 +60,9 @@ init(URI) ->
end
end, {0, []}, RegisteredStores),
case StoreMod of
0 -> {error, enostores};
0 ->
?ERROR("No registered store for URI", [{uri, URI}]),
{error, enostores};
_ -> {ok, State}
end.

Expand All @@ -84,17 +84,6 @@ store_vertices(StoreState, Vertices) ->
destroy(StoreState) ->
Mod = proplists:get_value(store_module, StoreState),
Mod:destroy(StoreState).


%% TODO : make the serializer/deserializer a separate module
%% and handle it properly..
serialize(V) ->
worker_store:serialize_rec(vertex, V).

deserialize(Line) when is_list(Line) ->
deserialize(Line, #vertex{}, [], [], vname);
deserialize(Line) when is_binary(Line) ->
deserialize(binary_to_list(Line), #vertex{}, [], [], vname).


%%--------------------------------------------------------------------
Expand All @@ -111,22 +100,3 @@ check_dir(URI) ->
$/ -> true;
_ -> false
end.

deserialize([$\n | _], #vertex{vertex_id = nil}, _, _, _) -> nil;
deserialize([$\n | _], V, EList, _, _) ->
{V#vertex.vertex_name, V#vertex.vertex_value, EList};
deserialize([$\t | Rest], V, EList, Buffer, vname) ->
VName = lists:reverse(Buffer),
VId = erlang:phash2(VName, 4294967296),
deserialize(Rest, V#vertex{vertex_id = VId,
vertex_name = VName}, EList, [], vval);
deserialize([$\t | Rest], V, EList, Buffer, vval) ->
deserialize(Rest, V#vertex{vertex_value = lists:reverse(Buffer)},
EList, [], eval);
deserialize([$\t | Rest], V, EList, Buffer, eval) ->
deserialize(Rest, V, EList, [], {tvname, lists:reverse(Buffer)});
deserialize([$\t | Rest], V, EList, Buffer, {tvname, EVal}) ->
VName = lists:reverse(Buffer),
deserialize(Rest, V, [{EVal, VName}|EList], [], eval);
deserialize([X | Rest], V, EList, Buffer, Token) ->
deserialize(Rest, V, EList, [X|Buffer], Token).
58 changes: 36 additions & 22 deletions src/external_store_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ partition_input(State) ->
Base = proplists:get_value(uri, State),
{ok, [Base ++ F || F <- Files], State};
_ ->
{error, State}
?ERROR("URI not a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, enotdir}
end.

read_vertices(State, Recvr) ->
Expand All @@ -57,7 +59,9 @@ read_vertices(State, Recvr) ->
start_reading(proplists:get_value(type, State),
proplists:get_value(abs_path, State), Recvr, State);
_ ->
{error, State}
?ERROR("URI is a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, eisdir}
end.

store_vertices(State, Vertices) ->
Expand All @@ -72,16 +76,27 @@ store_vertices(State, Vertices) ->
F -> {F, State}
end,
lists:foreach(
fun(V) -> file:write(FD, external_store:serialize(V)) end, Vertices),
fun(V) ->
file:write(FD, serde:serialize_rec(vertex, V)) end, Vertices),
NewState;
_ ->
{error, State}
?ERROR("URI is a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, eisdir}
end.

destroy(State) ->
case proplists:get_value(open_file_ref, State) of
undefined -> ok;
FD -> file:close(FD)
FD ->
try
file:close(FD)
catch
E1:E2 ->
?WARN("Error while closing file handle..",
[{error, E1}, {reason, E2}]),
ok
end
end.

%%--------------------------------------------------------------------
Expand All @@ -94,27 +109,26 @@ destroy(State) ->
%%% Internal functions
%%%===================================================================
start_reading(file, File, Recvr, State) ->
RPid = spawn(fun() -> reader_loop({init, File}, Recvr, {State, []}) end),
RPid = spawn(fun() -> reader_loop({init, File}, Recvr,
{State, <<>>, []}) end),
{ok, RPid, State}.


reader_loop({init, File}, Pid, State) ->
reader_loop({init, File}, Pid, {StoreState, X, Y}) ->
{ok, FD} = file:open(File, [raw, read_ahead, binary]),
reader_loop(FD, Pid, State);
reader_loop(FD, Pid, {State, Buffer}) ->
case file:read_line(FD) of
{ok, Line} ->
case external_store:deserialize(Line) of
nil -> reader_loop(FD, Pid, {State, Buffer});
V ->
case length(Buffer) > 100 of
true ->
gen_fsm:send_event(
Pid, {vertices, [V|Buffer], self(), State}),
reader_loop(FD, Pid, {State, []});
_ ->
reader_loop(FD, Pid, {State, [V|Buffer]})
end
reader_loop(FD, Pid, {[{open_file_ref, FD} | StoreState], X, Y});
reader_loop(FD, Pid, {State, Rem, Buffer}) ->
case file:read(FD, 16384) of
{ok, Data} ->
{Vs, NewRem} = serde:deserialize_stream(vertex, Rem, Data),
NewBuffer = Vs ++ Buffer,
case length(NewBuffer) > 100 of
true ->
gen_fsm:send_event(
Pid, {vertices, NewBuffer, self(), State}),
reader_loop(FD, Pid, {State, NewRem, []});
_ ->
reader_loop(FD, Pid, {State, NewRem, NewBuffer})
end;
eof ->
gen_fsm:send_event(Pid, {vertices_done, Buffer, self(), State}),
Expand Down
195 changes: 195 additions & 0 deletions src/external_store_hdfs.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
%% -------------------------------------------------------------------
%%
%% Phoebus: A distributed framework for large scale graph processing.
%%
%% Copyright (c) 2010 Arun Suresh. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(external_store_hdfs).
-author('Arun Suresh <arun.suresh@gmail.com>').

-behaviour(external_store).
-include("phoebus.hrl").
-include("thriftHadoopFileSystem_thrift.hrl").

%% API
-export([init/1,
partition_input/1,
read_vertices/2,
store_vertices/2,
destroy/1]).

%%%===================================================================
%%% API
%%%===================================================================
init([$h, $d, $f, $s, $:, $/, $/ | Rest] = URI) ->
IsDir = external_store:check_dir(URI),
{ok, TFactory} =
thrift_socket_transport:new_transport_factory(
phoebus_utils:get_env(thriftfs_hostname, "localhost"),
phoebus_utils:get_env(thriftfs_port, 8899), []),
{ok, PFactory} =
thrift_binary_protocol:new_protocol_factory(TFactory, []),
{ok, Protocol} = PFactory(),
{ok, Client} =
thrift_client:new(Protocol, thriftHadoopFileSystem_thrift),
{NameNode, Port, AbsPath} = parse_path(Rest),
%% TODO : check if there is a registered thrift server for this NN
{true, [{uri, URI}, {namenode, NameNode}, {port, Port},
{abs_path, AbsPath}, {type, hdfs}, {is_dir, IsDir},
{thrift_client, Client}]};
init(_) -> {false, []}.


partition_input(State) ->
case proplists:get_value(is_dir, State) of
true ->
Client = proplists:get_value(thrift_client, State),
AbsPath = proplists:get_value(abs_path, State),
{_, {ok, FSs}} = thrift_client:call(Client, listStatus,
[#pathname{pathname = AbsPath}]),
Files =
lists:foldl(
fun(#fileStatus{isdir = false, path = P}, Acc) when is_binary(P) ->
[binary_to_list(P)|Acc];
(#fileStatus{isdir = false, path = P}, Acc) when is_list(P) ->
[P|Acc];
(_, Acc) -> Acc
end, [], FSs),
{ok, Files, State};
_ ->
?ERROR("URI not a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, enotdir}
end.

read_vertices(State, Recvr) ->
case proplists:get_value(is_dir, State) of
false ->
start_reading(proplists:get_value(type, State),
proplists:get_value(abs_path, State), Recvr, State);
_ ->
?ERROR("URI is a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, eisdir}
end.

store_vertices(State, Vertices) ->
case proplists:get_value(is_dir, State) of
false ->
{TC, TH, NewState} =
case proplists:get_value(thrift_handle, State) of
undefined ->
Client = proplists:get_value(thrift_client, State),
AbsPath = proplists:get_value(abs_path, State),
thrift_client:call(Client, rm,
[#pathname{pathname = AbsPath}, false]),
{_, {ok, THandle}} =
thrift_client:call(Client, create,
[#pathname{pathname = AbsPath}]),
{Client, THandle, [{thrift_handle, THandle}|State]};
THandle ->
{proplists:get_value(thrift_client, State), THandle, State}
end,
lists:foreach(
fun(V) ->
thrift_client:call(TC, write,
[TH, serde:serialize_rec(vertex, V)])
end, Vertices),
NewState;
_ ->
?ERROR("URI is a directory", [{uri, proplists:get_value(uri, State)}]),
destroy(State),
{error, eisdir}
end.

destroy(State) ->
case proplists:get_value(thrift_client, State) of
undefined -> ok;
Client ->
case proplists:get_value(thrift_handle, State) of
undefined -> void;
THandle ->
try
thrift_client:call(Client, close, [THandle])
catch
E1:E2 ->
?WARN("Error while closing hdfs file handle store..",
[{error, E1}, {reason, E2}]),
ok
end
end,
try
thrift_client:close(Client)
catch
E11:E21 ->
?WARN("Error while closing hdfs store..",
[{error, E11}, {reason, E21}]),
ok
end
end.

%%--------------------------------------------------------------------
%% @doc
%% @spec
%% @end
%%--------------------------------------------------------------------

%%%===================================================================
%%% Internal functions
%%%===================================================================
start_reading(hdfs, File, Recvr, StoreState) ->
RPid = spawn(fun() -> reader_loop({init, File}, Recvr,
{StoreState, <<>>, []}) end),
{ok, RPid, StoreState}.


reader_loop({init, AbsPath}, Pid, {StoreState, X, Y}) ->
Client = proplists:get_value(thrift_client, StoreState),
{_, {ok, THandle}} =
thrift_client:call(Client, open, [#pathname{pathname = AbsPath}]),
NewSState =
[{thrift_handle, THandle} | StoreState],
reader_loop({Client, THandle, 0}, Pid, {NewSState, X, Y});
reader_loop({Client, THandle, OffSet}, Pid, {StoreState, Rem, Buffer}) ->
ByteSize = phoebus_utils:get_env(hdfs_read_size, 16384),
{_, {ok, Data}} =
thrift_client:call(Client, read, [THandle, OffSet, ByteSize]),
{Vs, NewRem} = serde:deserialize_stream(vertex, Rem, Data),
NewBuffer = Vs ++ Buffer,
case size(Data) < ByteSize of
true ->
gen_fsm:send_event(Pid, {vertices_done, NewBuffer, self(), StoreState}),
thrift_client:close(Client, close, [THandle]);
_ ->
case length(NewBuffer) > 100 of
true ->
gen_fsm:send_event(
Pid, {vertices, NewBuffer, self(), StoreState}),
reader_loop({Client, THandle, OffSet + ByteSize}, Pid,
{StoreState, NewRem, []});
_ ->
reader_loop({Client, THandle, OffSet + ByteSize}, Pid,
{StoreState, NewRem, NewBuffer})
end
end.

parse_path(Path) ->
{match, [NN, P, AbsPath]} =
re:run(Path, "([^:]*):([^/]*)(.*)", [{capture, [1, 2, 3], list}]),
{NN, list_to_integer(P), AbsPath}.

Loading

0 comments on commit dc7a2af

Please sign in to comment.