Skip to content
This repository has been archived by the owner on Jan 21, 2022. It is now read-only.

Commit

Permalink
change orddict to ets, refactor process requests
Browse files Browse the repository at this point in the history
  • Loading branch information
comtihon committed May 29, 2014
1 parent ca6bfeb commit 74609e6
Showing 1 changed file with 27 additions and 26 deletions.
53 changes: 27 additions & 26 deletions src/connection/mc_worker.erl
Expand Up @@ -17,7 +17,7 @@

-record(state, {
socket :: gen_tcp:socket(),
requests :: orddict:orddict(), %TODO ets me?
ets,
buffer :: binary(),
conn_state
}).
Expand All @@ -40,7 +40,8 @@ init([{Host, Port}, Options]) ->
{ok, Socket} = connect_to_database(Host, Port, Options),
State = proplists:get_value(state, Options),
process_flag(trap_exit, true),
{ok, #state{socket = Socket, requests = [], buffer = <<>>, conn_state = State}}.
RequestStorage = ets:new(requests, [private]),
{ok, #state{socket = Socket, ets = RequestStorage, buffer = <<>>, conn_state = State}}.

%% @hidden
handle_call(NewState = #conn_state, _, State = #state{conn_state = OldState}) -> % update state, return old
Expand Down Expand Up @@ -78,11 +79,12 @@ handle_call(Request, _, State = #state{socket = Socket, conn_state = ConnState#c
{Packet, _Id} = encode_request(ConnState#conn_state.database, Request),
ok = gen_tcp:send(Socket, Packet),
{reply, ok, State};
handle_call(Request, From, State = #state{socket = Socket, conn_state = ConnState})
when is_record(Request, 'query'); is_record(Request, getmore) -> % read requests
{ok, Id} = make_request(Socket, ConnState#conn_state.database, Request), %TODO how to get data from mongo?
handle_call(Request, From, State = #state{socket = Socket, conn_state = ConnState, ets = Ets})
when is_record(Request, 'query'); is_record(Request, getmore) -> % read requests
{ok, Id} = make_request(Socket, ConnState#conn_state.database, Request),
inet:setopts(Socket, [{active, once}]),
{noreply, State#state{requests = [{Id, From} | State#state.requests]}}; % TODO saving big data in record? %TODO why only read?
true = ets:insert_new(Ets, {Id, From}),
{noreply, State};
handle_call({request, Request}, _, State = #state{socket = Socket, conn_state = ConnState}) -> % ordinary requests
{ok, _} = make_request(Socket, ConnState#conn_state.database, Request),
{reply, ok, State};
Expand All @@ -94,17 +96,15 @@ handle_cast(_, State) ->
{noreply, State}.

%% @hidden
handle_info({tcp, _Socket, Data}, State) ->
handle_info({tcp, _Socket, Data}, State = #state{ets = Ets}) ->
Buffer = <<(State#state.buffer)/binary, Data/binary>>,
{Responses, Pending} = decode_responses(Buffer),
Request = case process_responses(Responses, State#state.requests) of
[] ->
[];
Requests ->
inet:setopts(State#state.socket, [{active, once}]),
Requests
end,
{noreply, State#state{requests = Request, buffer = Pending}};
process_responses(Responses, Ets),
case proplists:get_value(size, ets:info(Ets)) of
0 -> ok;
_ -> inet:setopts(State#state.socket, [{active, once}])
end,
{noreply, State#state{buffer = Pending}};
handle_info({tcp_closed, _Socket}, State) ->
{stop, tcp_closed, State};
handle_info({tcp_error, _Socket, Reason}, State) ->
Expand All @@ -126,7 +126,7 @@ encode_request(Database, Request) ->
{<<(byte_size(Payload) + 4):32/little, Payload/binary>>, RequestId}.

%% @private
decode_responses(Data) -> %TODO move me to mc_connection_man?
decode_responses(Data) ->
decode_responses(Data, []).

%% @private
Expand All @@ -139,16 +139,17 @@ decode_responses(Data, Acc) ->
{lists:reverse(Acc), Data}.

%% @private
process_responses([], Requests) -> Requests;
process_responses([{Id, Response} | RemainingResponses], Requests) ->
case lists:keytake(Id, 1, Requests) of
false ->
% TODO: close any cursor that might be linked to this request ?
process_responses(RemainingResponses, Requests);
{value, {_Id, From}, RemainingRequests} ->
gen_server:reply(From, Response),
process_responses(RemainingResponses, RemainingRequests)
end.
process_responses(Responses, Ets) ->
lists:foreach(
fun({Id, Response}) ->
case ets:lookup(Ets, Id) of
[] -> % TODO: close any cursor that might be linked to this request ?
ok;
[{Id, From}] ->
ets:delete(Ets, Id),
gen_server:reply(From, Response)
end
end, Responses).

%% @private
connect_to_database(Port, Host, Options) ->
Expand Down

0 comments on commit 74609e6

Please sign in to comment.