Skip to content

Commit

Permalink
Adds configurable message_size and batch_size options.
Browse files Browse the repository at this point in the history
  • Loading branch information
schwink committed Dec 24, 2010
1 parent 721a89e commit 9518f81
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 16 deletions.
11 changes: 10 additions & 1 deletion server/kanaloa/include/kanaloa.hrl
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@

%% @hidden
-record(kanaloa_settings, {handler, http_content_type, parse_jsonrpc, batch_interval, batch_check_interval, batch_count, connection_message_timeout}).
-record(kanaloa_settings, {handler,
http_content_type,
parse_jsonrpc,
batch_interval,
batch_check_interval,
batch_count,
connection_message_timeout,
message_size,
batch_size_cutoff
}).
34 changes: 25 additions & 9 deletions server/kanaloa/src/kanaloa.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ ensure_started(App) ->
%% ===batch_count===
%% An integer() representing the number of batches to send before forcing the client to make a fresh connection.
%% This is necessary to enforce memory leak protection on the client (so they don't try to hold a big unused buffer in memory). Defaults to 32.
%% ===message_size===
%% An integer() representing the maximum message size in bytes. Defaults to 512.
%% ===batch_size===
%% A integer() representing the maximum size in bytes of a batch of messages.
%% Values above the default value of 1024 have been shown to cause batches to be split across multiple XHR onReadyStateChanged events when streaming, causing
%% parse errors on the client. Must be greater than message_size.
%%
%% batch_interval * batch_count = maximum duration of an HTTP connection between client and server.
%% (Note that HTTP keep-alive should make the TCP connection duration much longer.)
Expand Down Expand Up @@ -83,21 +89,31 @@ get_kanaloa_settings(Options) ->
BC when is_integer(BC) andalso (BC > 16) ->
BC
end,

ConnectionTimeout = case proplists:get_value(connection_message_timeout, Options, 100) of
CT when is_integer(CT) ->
CT
end,
MessageSize = case proplists:get_value(message_size, Options, 512) of
MS when is_integer(MS) ->
MS
end,
BatchSize = case proplists:get_value(batch_size, Options, 1024) of
BS when is_integer(BS) andalso BS > MS ->
BS
end,
BatchSizeCutoff = BatchSize - MessageSize,

#kanaloa_settings{
handler = HandlerFun,
http_content_type = ContentType,
parse_jsonrpc = ParseJsonRpc,
batch_interval = BatchInterval,
batch_check_interval = BatchCheckInterval,
batch_count = BatchCount,
connection_message_timeout = ConnectionTimeout
}.
handler = HandlerFun,
http_content_type = ContentType,
parse_jsonrpc = ParseJsonRpc,
batch_interval = BatchInterval,
batch_check_interval = BatchCheckInterval,
batch_count = BatchCount,
connection_message_timeout = ConnectionTimeout,
message_size = MessageSize,
batch_size_cutoff = BatchSizeCutoff
}.

%% @doc Replaces the specified entry in a proplist.
replace_option(Key, Options, Value) ->
Expand Down
13 changes: 7 additions & 6 deletions server/kanaloa/src/kanaloa_connection.erl
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ loop(Owner, Count) ->

% Accumulate messages for the batch interval.
Timeout = now_ms() + Settings#kanaloa_settings.batch_interval,
{Status, Messages} = loop_accumulate(Owner, [], Timeout),
SizeCutoff = Settings#kanaloa_settings.batch_size_cutoff,
{Status, Messages} = loop_accumulate(Owner, [], Timeout, SizeCutoff),

% Send the batch.
case catch send_batch(Messages) of
Expand Down Expand Up @@ -113,9 +114,9 @@ loop(Owner, Count) ->
exit(Status)
end.

%% @spec loop_accumulate(BatchSettings::batch_settings(), Messages::list(), Timeout::integer()) -> {Status::ok | owner_exit, Messages::list()}
%% @spec loop_accumulate(BatchSettings::batch_settings(), Messages::list(), Timeout::integer(), SizeCutoff::integer()) -> {Status::ok | owner_exit, Messages::list()}
%% @doc Loops to accumulate messages, for one timeout period.
loop_accumulate(Owner, Messages, Timeout) when is_list(Messages) andalso is_integer(Timeout)->
loop_accumulate(Owner, Messages, Timeout, SizeCutoff) when is_list(Messages) andalso is_integer(Timeout) andalso is_integer(SizeCutoff) ->
Now = now_ms(),
Size = iolist_size(Messages),
io:format("The size of the batch is ~w\n", [Size]),
Expand All @@ -125,15 +126,15 @@ loop_accumulate(Owner, Messages, Timeout) when is_list(Messages) andalso is_inte

% Observing problems with large chunks being split across multiple onReadyStateChanged callbacks, which makes parsing on the client more difficult.
% So, limit the batch size. Note that the size of each message is limited by send_json.
Size > 512 ->
Size > SizeCutoff ->
io:format("Batch size exceeds limit\n", []),
{ok, Messages};

true ->
receive
{send, Message} ->
Owner ! send_queued,
loop_accumulate(Owner, [Message | Messages], Timeout);
loop_accumulate(Owner, [Message | Messages], Timeout, SizeCutoff);

{'EXIT', Owner, _Reason} ->
{owner_exit, Messages};
Expand All @@ -142,7 +143,7 @@ loop_accumulate(Owner, Messages, Timeout) when is_list(Messages) andalso is_inte
{close, Messages}

after Settings#kanaloa_settings.batch_check_interval ->
loop_accumulate(Owner, Messages, Timeout)
loop_accumulate(Owner, Messages, Timeout, SizeCutoff)
end
end.

Expand Down

0 comments on commit 9518f81

Please sign in to comment.