Skip to content

Commit

Permalink
initial commit of chunk handling. Needs testing, but doesn't seem
Browse files Browse the repository at this point in the history
to break anything.
  • Loading branch information
Ville Tuulos committed May 14, 2008
1 parent afc3332 commit 31c32ac
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 73 deletions.
62 changes: 37 additions & 25 deletions ring/src/ringo_domain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ handle_call({get_file_handle, ExtFile}, _, #domain{home = Home} = D) ->

handle_cast({new_domain, Name, Chunk, From, Params},
#domain{id = DomainID} = D) ->


% Remember to update the flags in invalid_domain messages below
% if InfoPack changes!
InfoPack = [{nrepl, proplists:get_value(nrepl, Params)},
{keycache, proplists:is_defined(keycache, Params)},
{noindex, proplists:is_defined(noindex, Params)},
{keycache, proplists:get_value(keycache, Params, false)},
{noindex, proplists:get_value(noindex, Params, false)},
{chunk, Chunk},
{name, list_to_binary(Name)},
{id, DomainID}],
Expand Down Expand Up @@ -310,7 +312,8 @@ handle_cast({put, Key, Value, Flags, From}, #domain{owner = true,
handle_cast({put, _, _, _, From},
#domain{id = DomainID, owner = true, full = true} = D) ->

From ! {ringo_reply, DomainID, {error, domain_full}},
Chunk = proplists:get_value(chunk, D#domain.info),
From ! {ringo_reply, DomainID, {error, domain_full, Chunk}},
{noreply, D};

% Put to a replica: This happens if a get request is redirected to
Expand All @@ -328,7 +331,10 @@ handle_cast({put, _, _, _, _}, #domain{owner = false} = D) ->
handle_cast({redir_put, Owner, N, {put, _, _, _, From}},
#domain{id = DomainID} = D) when Owner == node(); N > ?MAX_RING_SIZE ->

From ! {ringo_reply, DomainID, {error, invalid_domain}},
% Make sure that the Flags list matches with the InfoPack definition
% above
Flags = lists:sublist(D#domain.info, 3),
From ! {ringo_reply, DomainID, {error, invalid_domain, Flags}},
{noreply, D};

% no domain on this node, forward
Expand All @@ -351,7 +357,8 @@ handle_cast({redir_put, Owner, _, {put, Key, Value, Flags, From}},
handle_cast({redir_put, _, _, {put, _, _, _, From}},
#domain{id = DomainID, full = true} = D) ->

From ! {ringo_reply, DomainID, {error, domain_full}},
Chunk = proplists:get_value(chunk, D#domain.info),
From ! {ringo_reply, DomainID, {error, domain_full, Chunk}},
{noreply, D};

%%%
Expand Down Expand Up @@ -382,8 +389,13 @@ handle_cast({get, _, _} = P, #domain{index = none, home = Home,
handle_cast(P, D#domain{index = S});

% Normal case
handle_cast({get, Key, From}, #domain{index = Index} = D) ->
handle_cast({get, Key, From}, #domain{index = Index, info = Info} = D) ->
%error_logger:info_report({"normal get", Key}),
if D#domain.full == true ->
Chunk = proplists:get_value(chunk, Info),
From ! {ringo_get, full, Chunk};
true -> ok
end,
gen_server:cast(Index, {get, Key, From}),
{noreply, D};

Expand Down Expand Up @@ -727,24 +739,24 @@ replicate({DServer, DomainID, EntryID, Entry, Nrepl} = _R, Prev, _Tries) ->
% If receive_repl_replies is run on the same process as ringo_domain, note that
% receive may have to select repl_reply replies amongst are large number of
% incoming messages, which is expensive.
receive_repl_replies(_, _, _, 0) -> ok;
receive_repl_replies({_, _, EntryID, _, _} = R, Prev, Tries, _N) ->
receive
{repl_reply, {ring_too_small, EntryID}} ->
ok;
{repl_reply, {ok, EntryID}} ->
% Opportunistic replication: If at least
% one replica succeeds, we are happy (and
% assume that quite likely more than one have
% succeeded)
ok
% For less opportunistic replication, uncomment
% the following line to wait for more replies.
%receive_repl_replies(R, Tries, N - 1)
after ?REPL_TIMEOUT ->
% Re-send policy: Disable this for faster operation.
replicate(R, Prev, Tries + 1)
end.
%receive_repl_replies(_, _, _, 0) -> ok;
%receive_repl_replies({_, _, EntryID, _, _} = R, Prev, Tries, _N) ->
% receive
% {repl_reply, {ring_too_small, EntryID}} ->
% ok;
% {repl_reply, {ok, EntryID}} ->
% % Opportunistic replication: If at least
% % one replica succeeds, we are happy (and
% % assume that quite likely more than one have
% % succeeded)
% ok
% % For less opportunistic replication, uncomment
% % the following line to wait for more replies.
% %receive_repl_replies(R, Tries, N - 1)
% after ?REPL_TIMEOUT ->
% % Re-send policy: Disable this for faster operation.
% replicate(R, Prev, Tries + 1)
% end.

open_or_clone(Req, From, D) ->
case catch open_domain(D) of
Expand Down
5 changes: 3 additions & 2 deletions ring/src/ringo_indexdomain.erl
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ send_entries(Offsets, From, DB, Home, Key) ->
% buffering and page caching.
lists:foreach(fun(Offset) ->
case ringo_index:fetch_entry(DB, Home, Key, Offset) of
{_Time, _Key, Value} -> From ! {entry, Value};
{_Time, _Key, Value} ->
From ! {ringo_get, {entry, Value}};
% ignore corruped entries -- might not be wise
invalid_entry -> ok;
ignore -> ok
end
end, Offsets),
From ! done.
From ! {ringo_get, done}.

%%%
%%% Iblock becomes full
Expand Down
120 changes: 78 additions & 42 deletions ringogw/src/handle_data.erl
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ op([C|_] = Domain, Params, _Data) when is_integer(C) ->
error_logger:info_report({"CREATE", Domain, "WITH", PParams}),
V = proplists:is_defined(create, PParams),
if V ->
{ok, DomainID} = ringo_send(Domain,
DomainID = ringo_util:domain_id(Domain, 0),
ok = ringo_send(DomainID,
{new_domain, Domain, 0, self(), Flags}),
T = proplists:get_value(timeout, PParams),
case ringo_receive(DomainID, T) of
Expand All @@ -57,26 +58,9 @@ op([_Domain, Key], _Params, _Value) when length(Key) > ?KEY_MAX ->
op([Domain, Key], Params, Value) ->
PParams = parse_params(Params, ?PUT_DEFAULTS),
Flags = parse_flags(PParams, ?PUT_FLAGS),
Msg = {put, list_to_binary(Key), Value, Flags, self()},
chunk_put(Domain, Msg, proplists:get_value(timeout, PParams));

% CHUNK FIX: If chunk 0 fails, try chunk C + 1 etc.
{ok, DomainID} = ringo_send(Domain, {put,
list_to_binary(Key), Value, Flags, self()}),

T = proplists:get_value(timeout, PParams),
case ringo_receive(DomainID, T) of
{ok, {Node, EntryID}} ->
{json, {ok, Node, formatid(DomainID),
formatid(EntryID)}};
{error, invalid_domain} ->
{json, {error, <<"Domain doesn't exist">>}};
{error, domain_full} ->
% CHUNK FIX: Try next chunk
{json, {error, <<"Domain full">>}};
Error ->
error_logger:warning_report(
{"Unknown put reply", Error}),
throw({'EXIT', Error})
end;

op(_, _, _) ->
throw({http_error, 400, <<"Invalid request">>}).
Expand All @@ -93,52 +77,97 @@ op(_, _, _) ->
% prefix, which makes it possible to view the value directly in the browser.
% (Consider supporting different mime-types, given a proper parameter).
op([Domain, Key], Params) ->
Msg = {get, list_to_binary(Key), self()},
Req = fun(Chunk) ->
DomainID = ringo_util:domain_id(Domain, Chunk),
ok = ringo_send(DomainID, Msg)
end,
Req(0),
PParams = parse_params(Params, ?GET_DEFAULTS),
{ok, _} = ringo_send(Domain,
{get, list_to_binary(Key), self()}),
proplists:get_value(timeout, PParams),
Single = proplists:get_value(single, PParams),
T = proplists:get_value(timeout, PParams),
if Single ->
ringo_receive_chunked(single, T);
ringo_receive_chunked(single, Req, 0, T);
true ->
ringo_receive_chunked(many, T)
ringo_receive_chunked(many, Req, 0, T)
end;

op(_, _) ->
throw({http_error, 400, <<"Invalid request">>}).

%%%
%%% Chunked put
%%%

chunk_put(Domain, Msg, T) ->
{Chunk, DomainID} = chunk_id(Domain),
ok = ringo_send(DomainID, Msg),
case ringo_receive(DomainID, T) of
% Entry put ok
{ok, {Node, EntryID}} ->
{json, {ok, Node, formatid(DomainID),
formatid(EntryID)}};
% Domain doesn't exist
{error, invalid_domain, _} when Chunk == 0 ->
{json, {error, <<"Domain doesn't exist">>}};
% Previous chunk was full and the next chunk is still
% non-existing. Create it.
{error, invalid_domain, Flags} ->
ok = ringo_send(DomainID,
{new_domain, Domain, Chunk, self(), Flags}),
{ok, _} = ringo_receive(DomainID, T),
chunk_put(Domain, Msg, T);
% Chunk is full. Try the next chunk.
{error, domain_full, Chunk} ->
chunk_full(Domain, Chunk),
chunk_put(Domain, Msg, T);
Error ->
error_logger:warning_report(
{"Unknown put reply", Error}),
throw({'EXIT', Error})
end.

%%%
%%% Ringo communication
%%%

ringo_send(Domain, Msg) ->
DomainID = get_chunk(ringo_util:domain_id(Domain, 0)),
ringo_send(DomainID, Msg) ->
case ringo_util:best_matching_node(DomainID, get_active_nodes()) of
{ok, Node} ->
gen_server:cast({ringo_node, Node}, {match,
DomainID, domain, self(), Msg});
{error, no_nodes} ->
throw({http_error, 503, <<"Empty ring">>});
Error -> throw({'EXIT', Error})
end,
{ok, DomainID}.
end, ok.

ringo_receive_chunked(Mode, Timeout) when Timeout > 60000 ->
ringo_receive_chunked(Mode, 60000);
ringo_receive_chunked(Mode, Req, N, Timeout) when Timeout > 60000 ->
ringo_receive_chunked(Mode, Req, N, 60000);

ringo_receive_chunked(single, Timeout) ->
ringo_receive_chunked(single, Req, N, Timeout) ->
receive
{entry, E} -> {data, E}
{ringo_get, {entry, E}} ->
{data, E};
{ringo_get, done} when N == 0 ->
throw({http_error, 404, <<"Not found">>});
{ringo_get, done} ->
ringo_receive_chunked(single, Req, N - 1, Timeout);
{ringo_get, full, Chunk} ->
Req(Chunk + 1),
ringo_receive_chunked(single, Req, N + 1, Timeout)
after Timeout ->
throw({http_error, 408, <<"Request timeout">>})
end;

ringo_receive_chunked(many, Timeout) ->
{chunked, fun() ->
ringo_receive_chunked(many, Req, _, Timeout) ->
{chunked, fun(N) ->
receive
{entry, _} = E -> E;
done -> done
{ringo_get, {entry, _} = E} -> E;
{ringo_get, done} when N == 0 -> done;
{ringo_get, done} -> {next, N - 1};
{ringo_get, full, Chunk} ->
Req(Chunk + 1),
{next, N + 1}
after Timeout -> timeout
end
end}.
Expand Down Expand Up @@ -210,14 +239,21 @@ update_active_nodes(Nodes) ->
active_nodes() ->
ringo_util:sort_nodes(ringo_util:ringo_nodes()).

get_chunk(DomainID) ->
%%%
%%% Chunk cache
%%%

chunk_id(Domain) ->
DomainID = ringo_util:domain_id(Domain, 0),
case ets:lookup(chunk_cache, DomainID) of
[] -> DomainID;
[{_, Chunk}] -> Chunk
[] -> {0, DomainID};
[{_, Chunk, ChunkID}] -> {Chunk, ChunkID}
end.

set_chunk(DomainID, Chunk) ->
ets:insert(chunk_cache, {DomainID, Chunk}).
chunk_full(Domain, Chunk) ->
DomainID = ringo_util:domain_id(Domain, 0),
ChunkID = ringo_util:domain_id(Domain, Chunk + 1),
ets:insert(chunk_cache, {DomainID, Chunk + 1, ChunkID}).

start_chunk_cache() ->
{ok, spawn_link(fun() ->
Expand Down
1 change: 0 additions & 1 deletion ringogw/src/mochi_dispatch.erl
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
-define(MAX_RECV_BODY, (10 * 1024 * 1024)).

request(Req) ->
T = now(),
{ok, Dyn} = application:get_env(dynroot),
{ok, Doc} = application:get_env(docroot),
P = Req:get(path),
Expand Down
9 changes: 6 additions & 3 deletions ringogw/src/ringogw_util.erl
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
-module(ringogw_util).
-export([chunked_reply/2, flush_inbox/0]).

chunked_reply(Sender, ReplyGen) ->
case catch ReplyGen() of
chunked_reply(Sender, ReplyGen) -> chunked_reply(Sender, ReplyGen, 0).
chunked_reply(Sender, ReplyGen, N) ->
case catch ReplyGen(N) of
{entry, Entry} ->
Sender(encode_chunk(Entry, <<"ok">>)),
chunked_reply(Sender, ReplyGen);
chunked_reply(Sender, ReplyGen, N);
{next, N0} ->
chunked_reply(Sender, ReplyGen, N0);
done ->
Sender(encode_chunk(done));
timeout ->
Expand Down

0 comments on commit 31c32ac

Please sign in to comment.