Skip to content

Commit

Permalink
Switch to recv() size 0 in receive_data - increases throughput
Browse files Browse the repository at this point in the history
In my simple testing, it increases the throughput of receiving large
emails by between 33 and 50%.
  • Loading branch information
Vagabond committed Apr 25, 2011
1 parent e743e3a commit 82cd234
Showing 1 changed file with 7 additions and 36 deletions.
43 changes: 7 additions & 36 deletions src/gen_smtp_server_session.erl
Expand Up @@ -187,7 +187,7 @@ handle_info({receive_data, Body, Rest}, #state{socket = Socket, readmessage = tr
end;
handle_info({_SocketType, Socket, Packet}, State) ->
case handle_request(parse_request(Packet), State) of
{ok, #state{envelope = Envelope, extensions = Extensions, options = Options, readmessage = true} = NewState} ->
{ok, #state{extensions = Extensions, options = Options, readmessage = true} = NewState} ->
MaxSize = case has_extension(Extensions, "SIZE") of
{true, Value} ->
list_to_integer(Value);
Expand All @@ -198,7 +198,7 @@ handle_info({_SocketType, Socket, Packet}, State) ->
Size = 0,
socket:setopts(Socket, [{packet, raw}]),
spawn_opt(fun() -> receive_data([],
Socket, {0, Envelope#envelope.expectedsize div 2}, Size, MaxSize, Session, Options) end,
Socket, 0, Size, MaxSize, Session, Options) end,
[link, {fullsweep_after, 0}]),
{noreply, NewState, ?TIMEOUT};
{ok, NewState} ->
Expand Down Expand Up @@ -709,24 +709,7 @@ try_auth(AuthType, Username, Credential, #state{module = Module, socket = Socket
receive_data(_Acc, _Socket, _, Size, MaxSize, Session, _Options) when MaxSize > 0, Size > MaxSize ->
io:format("message body size ~B exceeded maximum allowed ~B~n", [Size, MaxSize]),
Session ! {receive_data, {error, size_exceeded}};
receive_data(Acc, Socket, {OldCount, OldRecvSize}, Size, MaxSize, Session, Options) ->
{Count, RecvSize} = case Size of
Size when OldCount > 2, OldRecvSize =:= 262144 ->
%io:format("increasing receive size to ~B~n", [1048576]),
{0, 1048576};% 1m
Size when OldCount > 5, OldRecvSize =:= 65536 ->
%io:format("increasing receive size to ~B~n", [262144]),
{0, 262144};% 256k
Size when OldCount > 5, OldRecvSize =:= 8192 ->
%io:format("increasing receive size to ~B~n", [65536]),
{0, 65536};% 64k
Size when OldCount > 2, Size > 8192, OldRecvSize =:= 0 ->
%io:format("increasing receive size to ~B~n", [8192]),
{0, 8192}; % 8k
_ ->
{OldCount + 1, OldRecvSize} % don't change anything
end,
%socket:setopts(Socket, [{packet, raw}]),
receive_data(Acc, Socket, RecvSize, Size, MaxSize, Session, Options) ->
case socket:recv(Socket, RecvSize, 1000) of
{ok, Packet} when Acc == [] ->
case check_bare_crlf(Packet, <<>>, proplists:get_value(allow_bare_newlines, Options, false), 0) of
Expand All @@ -737,7 +720,7 @@ receive_data(Acc, Socket, {OldCount, OldRecvSize}, Size, MaxSize, Session, Optio
0 ->
%io:format("received ~B bytes; size is now ~p~n", [RecvSize, Size + size(Packet)]),
%io:format("memory usage: ~p~n", [erlang:process_info(self(), memory)]),
receive_data([FixedPacket | Acc], Socket, {Count, RecvSize}, Size + byte_size(FixedPacket), MaxSize, Session, Options);
receive_data([FixedPacket | Acc], Socket, RecvSize, Size + byte_size(FixedPacket), MaxSize, Session, Options);
Index ->
String = binstr:substr(FixedPacket, 1, Index - 1),
Rest = binstr:substr(FixedPacket, Index+5),
Expand All @@ -757,7 +740,7 @@ receive_data(Acc, Socket, {OldCount, OldRecvSize}, Size, MaxSize, Session, Optio
0 ->
%io:format("received ~B bytes; size is now ~p~n", [RecvSize, Size + size(Packet)]),
%io:format("memory usage: ~p~n", [erlang:process_info(self(), memory)]),
receive_data([FixedPacket | Acc], Socket, {Count, RecvSize}, Size + byte_size(FixedPacket), MaxSize, Session, Options);
receive_data([FixedPacket | Acc], Socket, RecvSize, Size + byte_size(FixedPacket), MaxSize, Session, Options);
Index ->
String = binstr:substr(FixedPacket, 1, Index - 1),
Rest = binstr:substr(FixedPacket, Index+5),
Expand All @@ -776,7 +759,7 @@ receive_data(Acc, Socket, {OldCount, OldRecvSize}, Size, MaxSize, Session, Optio
% uh-oh
%io:format("no data on socket, and no DATA terminator, retrying ~p~n", [Session]),
% eventually we'll either get data or a different error, just keep retrying
receive_data(Acc, Socket, {Count - 1, RecvSize}, Size, MaxSize, Session, Options);
receive_data(Acc, Socket, 0, Size, MaxSize, Session, Options);
Index ->
String = binstr:substr(Packet, 1, Index - 1),
Rest = binstr:substr(Packet, Index+5),
Expand All @@ -786,24 +769,12 @@ receive_data(Acc, Socket, {OldCount, OldRecvSize}, Size, MaxSize, Session, Optio
Session ! {receive_data, Result, Rest}
end;
{error, timeout} ->
NewRecvSize = adjust_receive_size_down(Size, RecvSize),
%io:format("timeout when trying to read ~B bytes, lowering receive size to ~B~n", [RecvSize, NewRecvSize]),
receive_data(Acc, Socket, {-5, NewRecvSize}, Size, MaxSize, Session, Options);
receive_data(Acc, Socket, 0, Size, MaxSize, Session, Options);
{error, Reason} ->
io:format("receive error: ~p~n", [Reason]),
exit(receive_error)
end.


adjust_receive_size_down(_Size, RecvSize) when RecvSize > 262144 ->
262144;
adjust_receive_size_down(_Size, RecvSize) when RecvSize > 65536 ->
65536;
adjust_receive_size_down(_Size, RecvSize) when RecvSize > 8192 ->
8192;
adjust_receive_size_down(_Size, _RecvSize) ->
0.

check_for_bare_crlf(Bin, Offset) ->
case {re:run(Bin, "(?<!\r)\n", [{capture, none}, {offset, Offset}]), re:run(Bin, "\r(?!\n)", [{capture, none}, {offset, Offset}])} of
{match, _} -> true;
Expand Down

0 comments on commit 82cd234

Please sign in to comment.