From 74609e6294a344173fc3ed5e12502a504a24ce65 Mon Sep 17 00:00:00 2001 From: Valery Tikhonov Date: Thu, 29 May 2014 17:29:14 +0300 Subject: [PATCH] change orddict to ets, refactor process requests --- src/connection/mc_worker.erl | 53 ++++++++++++++++++------------------ 1 file changed, 27 insertions(+), 26 deletions(-) diff --git a/src/connection/mc_worker.erl b/src/connection/mc_worker.erl index 8b86a98b..743c9520 100644 --- a/src/connection/mc_worker.erl +++ b/src/connection/mc_worker.erl @@ -17,7 +17,7 @@ -record(state, { socket :: gen_tcp:socket(), - requests :: orddict:orddict(), %TODO ets me? + ets, buffer :: binary(), conn_state }). @@ -40,7 +40,8 @@ init([{Host, Port}, Options]) -> {ok, Socket} = connect_to_database(Host, Port, Options), State = proplists:get_value(state, Options), process_flag(trap_exit, true), - {ok, #state{socket = Socket, requests = [], buffer = <<>>, conn_state = State}}. + RequestStorage = ets:new(requests, [private]), + {ok, #state{socket = Socket, ets = RequestStorage, buffer = <<>>, conn_state = State}}. %% @hidden handle_call(NewState = #conn_state, _, State = #state{conn_state = OldState}) -> % update state, return old @@ -78,11 +79,12 @@ handle_call(Request, _, State = #state{socket = Socket, conn_state = ConnState#c {Packet, _Id} = encode_request(ConnState#conn_state.database, Request), ok = gen_tcp:send(Socket, Packet), {reply, ok, State}; -handle_call(Request, From, State = #state{socket = Socket, conn_state = ConnState}) - when is_record(Request, 'query'); is_record(Request, getmore) -> % read requests - {ok, Id} = make_request(Socket, ConnState#conn_state.database, Request), %TODO how to get data from mongo? +handle_call(Request, From, State = #state{socket = Socket, conn_state = ConnState, ets = Ets}) + when is_record(Request, 'query'); is_record(Request, getmore) -> % read requests + {ok, Id} = make_request(Socket, ConnState#conn_state.database, Request), inet:setopts(Socket, [{active, once}]), - {noreply, State#state{requests = [{Id, From} | State#state.requests]}}; % TODO saving big data in record? %TODO why only read? + true = ets:insert_new(Ets, {Id, From}), + {noreply, State}; handle_call({request, Request}, _, State = #state{socket = Socket, conn_state = ConnState}) -> % ordinary requests {ok, _} = make_request(Socket, ConnState#conn_state.database, Request), {reply, ok, State}; @@ -94,17 +96,15 @@ handle_cast(_, State) -> {noreply, State}. %% @hidden -handle_info({tcp, _Socket, Data}, State) -> +handle_info({tcp, _Socket, Data}, State = #state{ets = Ets}) -> Buffer = <<(State#state.buffer)/binary, Data/binary>>, {Responses, Pending} = decode_responses(Buffer), - Request = case process_responses(Responses, State#state.requests) of - [] -> - []; - Requests -> - inet:setopts(State#state.socket, [{active, once}]), - Requests - end, - {noreply, State#state{requests = Request, buffer = Pending}}; + process_responses(Responses, Ets), + case proplists:get_value(size, ets:info(Ets)) of + 0 -> ok; + _ -> inet:setopts(State#state.socket, [{active, once}]) + end, + {noreply, State#state{buffer = Pending}}; handle_info({tcp_closed, _Socket}, State) -> {stop, tcp_closed, State}; handle_info({tcp_error, _Socket, Reason}, State) -> @@ -126,7 +126,7 @@ encode_request(Database, Request) -> {<<(byte_size(Payload) + 4):32/little, Payload/binary>>, RequestId}. %% @private -decode_responses(Data) -> %TODO move me to mc_connection_man? +decode_responses(Data) -> decode_responses(Data, []). %% @private @@ -139,16 +139,17 @@ decode_responses(Data, Acc) -> {lists:reverse(Acc), Data}. %% @private -process_responses([], Requests) -> Requests; -process_responses([{Id, Response} | RemainingResponses], Requests) -> - case lists:keytake(Id, 1, Requests) of - false -> - % TODO: close any cursor that might be linked to this request ? - process_responses(RemainingResponses, Requests); - {value, {_Id, From}, RemainingRequests} -> - gen_server:reply(From, Response), - process_responses(RemainingResponses, RemainingRequests) - end. +process_responses(Responses, Ets) -> + lists:foreach( + fun({Id, Response}) -> + case ets:lookup(Ets, Id) of + [] -> % TODO: close any cursor that might be linked to this request ? + ok; + [{Id, From}] -> + ets:delete(Ets, Id), + gen_server:reply(From, Response) + end + end, Responses). %% @private connect_to_database(Port, Host, Options) ->