Skip to content

Commit

Permalink
Revise replicator and repairer#1
Browse files Browse the repository at this point in the history
  • Loading branch information
yosukehara committed Dec 17, 2013
1 parent 6fc1ce3 commit f761979
Show file tree
Hide file tree
Showing 5 changed files with 335 additions and 172 deletions.
13 changes: 13 additions & 0 deletions include/leo_storage.hrl
Expand Up @@ -81,6 +81,19 @@
-define(ERROR_COULD_MATCH, "Could not match").


%% @doc request parameter for READ
%%
-record(read_parameter, {
ref :: reference(),
addr_id :: integer(),
key :: string(),
etag = [] :: string(),
start_pos = 0 :: integer(),
end_pos = 0 :: integer(),
quorum :: integer(),
req_id :: integer()
}).

%% @doc Queue's Message.
%%
-record(inconsistent_data_message, {
Expand Down
242 changes: 151 additions & 91 deletions src/leo_storage_handler_object.erl
Expand Up @@ -36,7 +36,7 @@
-include_lib("leo_statistics/include/leo_statistics.hrl").
-include_lib("eunit/include/eunit.hrl").

-export([get/1, get/3, get/4, get/5,
-export([get/1, get/2, get/3, get/4, get/5,
put/1, put/2, put/3,
delete/1, delete/2, head/2,
copy/3,
Expand All @@ -50,19 +50,10 @@

-define(DEF_DELIMITER, <<"/">>).

-record(read_parameter, {
addr_id :: integer(),
key :: string(),
start_pos = 0 :: integer(),
end_pos = 0 :: integer(),
quorum :: integer(),
req_id :: integer()
}).

%%--------------------------------------------------------------------
%% API - GET
%%--------------------------------------------------------------------
%% @doc get object (from storage-node).
%% @doc get object (from storage-node#1).
%%
-spec(get({reference(), string()}) ->
{ok, reference(), binary(), binary(), binary()} |
Expand All @@ -82,13 +73,34 @@ get({Ref, Key}) ->
{error, Ref, ?ERROR_COULD_NOT_GET_REDUNDANCY}
end.

%% @doc get object (from storage-node#2).
%%
-spec(get(#read_parameter{}, list(#redundant_node{})) ->
{ok, reference(), binary(), binary(), binary()} |
{error, reference(), any()}).
get(#read_parameter{addr_id = AddrId} = ReadParameter, []) ->
case leo_redundant_manager_api:get_redundancies_by_addr_id(get, AddrId) of
{ok, #redundancies{nodes = Redundancies,
r = ReadQuorum}} ->
get(ReadParameter#read_parameter{quorum = ReadQuorum},
Redundancies);
_Error ->
{error, ?ERROR_COULD_NOT_GET_REDUNDANCY}
end;
get(ReadParameter, Redundancies) ->
_ = leo_statistics_req_counter:increment(?STAT_REQ_GET),
read_and_repair(ReadParameter, Redundancies).

%% @doc Retrieve an object which is requested from gateway.
%%
-spec(get(integer(), string(), integer()) ->
{ok, #metadata{}, binary()} |
{error, any()}).
get(AddrId, Key, ReqId) ->
get(AddrId, Key, 0, 0, ReqId).
get(#read_parameter{ref = make_ref(),
addr_id = AddrId,
key = Key,
req_id = ReqId}, []).

%% @doc Retrieve an object which is requested from gateway w/etag.
%%
Expand All @@ -97,49 +109,24 @@ get(AddrId, Key, ReqId) ->
{ok, match} |
{error, any()}).
get(AddrId, Key, ETag, ReqId) ->
case leo_object_storage_api:head({AddrId, Key}) of
{ok, MetaBin} ->
Metadata = binary_to_term(MetaBin),
case (Metadata#metadata.checksum == ETag) of
true ->
{ok, match};
false ->
get(AddrId, Key, ReqId)
end;
not_found = Cause ->
{error, Cause};
Error ->
Error
end.
get(#read_parameter{ref = make_ref(),
addr_id = AddrId,
key = Key,
etag = ETag,
req_id = ReqId}, []).

%% @doc Retrieve a part of an object.
%%
-spec(get(integer(), string(), integer(), integer(), integer()) ->
{ok, #metadata{}, binary()} |
{error, any()}).
get(AddrId, Key, StartPos, EndPos, ReqId) ->
_ = leo_statistics_req_counter:increment(?STAT_REQ_GET),

Ret = case leo_redundant_manager_api:get_redundancies_by_addr_id(get, AddrId) of
{ok, #redundancies{nodes = Redundancies,
r = ReadQuorum}} ->
ReadParameter = #read_parameter{addr_id = AddrId,
key = Key,
start_pos = StartPos,
end_pos = EndPos,
quorum = ReadQuorum,
req_id = ReqId},
read_and_repair(ReadParameter, Redundancies);
_Error ->
{error, ?ERROR_COULD_NOT_GET_REDUNDANCY}
end,

case Ret of
{ok, NewMeta, #object{data = Bin}} ->
{ok, NewMeta, Bin};
Error ->
Error
end.
get(#read_parameter{ref = make_ref(),
addr_id = AddrId,
key = Key,
start_pos = StartPos,
end_pos = EndPos,
req_id = ReqId}, []).


%% @doc read data (common).
Expand Down Expand Up @@ -513,67 +500,140 @@ find_uploaded_objects_by_key(OriginalKey) ->
read_and_repair(_, []) ->
{error, ?ERROR_COULD_NOT_GET_DATA};

read_and_repair(#read_parameter{addr_id = AddrId,
read_and_repair(#read_parameter{ref = Ref,
addr_id = AddrId,
key = Key,
etag = [],
start_pos = StartPos,
end_pos = EndPos,
quorum = ReadQuorum,
req_id = ReqId} = ReadParameter, [_|T] = Redundancies) ->
Ref = make_ref(),

case get_fun(Ref, AddrId, Key, StartPos, EndPos) of
{ok, Ref, Metadata, Object} when T =:= [] ->
{ok, Metadata, Object};
{ok, Ref, Metadata, Object} when T =/= [] ->
F = fun(ok) ->
{ok, Metadata, Object};
({error,_Cause}) ->
{error, ?ERROR_RECOVER_FAILURE}
end,
leo_storage_read_repairer:repair(
ReadQuorum -1, Redundancies, Metadata, ReqId, F);

{error, Ref, not_found = Cause} ->
{error, Cause};
{error, Ref, _Cause} ->
case (erlang:length(T) >= ReadQuorum) of
true ->
read_and_repair(ReadParameter, T);
false ->
{error, ?ERROR_COULD_NOT_GET_DATA}
end
end.
end_pos = EndPos} = ReadParameter,
[#redundant_node{node = Node,
available = true}|T]
) when Node == erlang:node() ->
read_and_repair_1(
get_fun(Ref, AddrId, Key, StartPos, EndPos), ReadParameter, T);

read_and_repair(#read_parameter{ref = Ref,
addr_id = AddrId,
key = Key,
etag = ETag,
start_pos = StartPos,
end_pos = EndPos} = ReadParameter,
[#redundant_node{node = Node,
available = true}|T]
) when Node == erlang:node(),
ETag /= [] ->
%% Retrieve an head of object,
%% then compare it with requested 'Etag'
Ret = case leo_object_storage_api:head({AddrId, Key}) of
{ok, MetaBin} ->
Metadata = binary_to_term(MetaBin),
case Metadata#metadata.checksum of
ETag ->
{ok, match};
_ ->
[]
end;
_ ->
[]
end,

%% If the result is 'match', then response it,
%% not the case, retrieve an object by key
case Ret of
{ok, match} = Reply ->
Reply;
_ ->
read_and_repair_1(
get_fun(Ref, AddrId, Key, StartPos, EndPos), ReadParameter, T)
end;

read_and_repair(#read_parameter{ref = Ref} = ReadParameter,
[#redundant_node{node = Node,
available = true}|_] = Redundancies) ->
RPCKey = rpc:async_call(Node, ?MODULE, get, [ReadParameter, Redundancies]),
Reply = case rpc:nb_yield(RPCKey, ?DEF_REQ_TIMEOUT) of
{value, {ok, Ref,_,_} = Ret} ->
Ret;
{value, {error, Cause}} ->
{error, Ref, Cause};
{value, {badrpc, Cause}} ->
{error, Ref, Cause};
timeout = Cause ->
{error, Ref, Cause};
{badrpc, Cause} ->
{error, Ref, Cause}
end,
read_and_repair_1(Reply, ReadParameter, []).


%% @private
read_and_repair_1({ok, Ref, Metadata, Object},
#read_parameter{ref = Ref}, []) ->
{ok, Metadata, Object};

read_and_repair_1({ok, Ref, Metadata, Object},
#read_parameter{ref = Ref,
quorum = Quorum} = ReadParameter, Redundancies) ->
Fun = fun(ok) ->
{ok, Metadata, Object};
({error,_Cause}) ->
{error, ?ERROR_RECOVER_FAILURE}
end,
ReadParameter_1 = ReadParameter#read_parameter{quorum = Quorum - 1},
leo_storage_read_repairer:repair(ReadParameter_1, Redundancies, Metadata, Fun);

read_and_repair_1({error, Ref, timeout = Cause},
#read_parameter{ref = Ref}, _Redundancies) ->
{error, Cause};
read_and_repair_1({error, Ref, Cause},
#read_parameter{ref = Ref}, []) ->
{error, Cause};

read_and_repair_1({error, Ref,_Cause},
#read_parameter{ref = Ref,
quorum = ReadQuorum} = ReadParameter, Redundancies) ->
NumOfNodes = erlang:length([N || #redundant_node{node = N,
can_read_repair = true}
<- Redundancies]),
case (NumOfNodes >= ReadQuorum) of
true ->
read_and_repair(ReadParameter, Redundancies);
false ->
{error, ?ERROR_COULD_NOT_GET_DATA}
end;
read_and_repair_1(_,_,_) ->
{error, invalid_request}.


%% @doc Replicate an object from local-node to remote node
%% @private
-spec(replicate(replication(), put | delete, integer(), #object{}) ->
ok | {error, any()}).
replicate(?REP_LOCAL, Method, AddrId, Object0) ->
replicate(?REP_LOCAL, Method, AddrId, Object_1) ->
case leo_redundant_manager_api:get_redundancies_by_addr_id(put, AddrId) of
{ok, #redundancies{nodes = Redundancies,
w = WriteQuorum,
d = DeleteQuorum,
ring_hash = RingHash}} ->
Object1 = Object0#object{ring_hash = RingHash},
Object_2 = Object_1#object{ring_hash = RingHash},
Quorum = case Method of
?CMD_PUT -> WriteQuorum;
?CMD_DELETE -> DeleteQuorum
end,

F = fun({ok, ETag}) when Method == ?CMD_PUT ->
{ok, ETag};
({ok,_ETag}) when Method == ?CMD_DELETE ->
ok;
({error, Cause}) ->
case lists:keyfind(not_found, 2, Cause) of
false ->
{error, ?ERROR_REPLICATE_FAILURE};
_ ->
{error, not_found}
end
end,
leo_storage_replicator:replicate(Quorum, Redundancies, Object1, F);
Fun = fun({ok, ETag}) when Method == ?CMD_PUT ->
{ok, ETag};
({ok,_ETag}) when Method == ?CMD_DELETE ->
ok;
({error, Cause}) ->
case lists:keyfind(not_found, 2, Cause) of
false ->
{error, ?ERROR_REPLICATE_FAILURE};
_ ->
{error, not_found}
end
end,
leo_storage_replicator:replicate(Quorum, Redundancies, Object_2, Fun);
_Error ->
{error, ?ERROR_META_NOT_FOUND}
end;
Expand Down

0 comments on commit f761979

Please sign in to comment.