From 70608beb2a58d36b845d93df4ede3c5350cf703a Mon Sep 17 00:00:00 2001 From: Jacob Vorreuter Date: Sun, 19 Jun 2011 15:44:53 -0700 Subject: [PATCH] add fsm --- src/lockstep.erl | 139 +++++++++++++++++-------------------------- src/lockstep_fsm.erl | 113 +++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 85 deletions(-) create mode 100644 src/lockstep_fsm.erl diff --git a/src/lockstep.erl b/src/lockstep.erl index 0f5da6c..011ada3 100644 --- a/src/lockstep.erl +++ b/src/lockstep.erl @@ -34,13 +34,13 @@ -export([start_link/2, start_link/3]). -record(state, {socket, + mod, + fsm, tid=?MODULE, disk=false, order_by = <<"updated_at">>, uri, schema, - transfer, - contentlength, writes=0}). -define(RECONNECT_TIME, 3000). @@ -62,76 +62,63 @@ start_link(Uri, Schema, Opts) when is_list(Uri), %%==================================================================== init([Uri, Schema, Opts]) -> State = init_state([{uri, Uri}, {schema, Schema} | Opts]), + {ok, Fsm} = lockstep_fsm:start_link(), setup_tables(State), - erlang:send_after(?STAT_FREQ * 1000, self(), stats), - {ok, State, 0}. + Self = self(), + spawn(fun() -> timer:sleep(?STAT_FREQ * 1000), gen_server:cast(Self, stats) end), + {ok, State#state{fsm=Fsm}, 0}. handle_call(_Message, _From, State) -> {reply, error, State}. +handle_cast(stats, State) -> + io:format("Stats: ~p writes/sec~n",[State#state.writes/?STAT_FREQ]), + Self = self(), + spawn(fun() -> timer:sleep(?STAT_FREQ * 1000), gen_server:cast(Self, stats) end), + {noreply, State#state{writes=0}}; + handle_cast(_Message, State) -> {noreply, State}. -handle_info({http, Sock, Http}, State) -> - State2 = case Http of - { http_response, {1,1}, 200, <<"OK">> } -> - State; - { http_header, _, _Key = 'Content-Length', _, Size } -> - io:format("Header: ~p: ~p~n",[_Key, Size]), - State#state{ contentlength=list_to_integer(binary_to_list(Size)) }; - { http_header, _, _Key = 'Transfer-Encoding', _, _Value = <<"chunked">> } -> - io:format("Header: ~p: ~p~n",[_Key, _Value]), - State#state{ transfer=chunked }; - { http_header, _, _Key, _, _Value } -> - io:format("Header: ~p: ~p~n",[_Key, _Value]), - State; - http_eoh -> - inet:setopts(Sock, [{packet, line}]), - State - end, - inet:setopts(Sock, [{active, once}]), - {noreply, State2}; -handle_info({tcp, _Sock, <<"0\r\n">> }, #state{transfer=chunked}=State) -> - {noreply, disconnect(State), ?RECONNECT_TIME}; -handle_info({tcp, Sock, Line}, #state{writes=Writes, transfer=chunked}=State) -> - Size = read_chunk_size(Line), - case read_chunk(Sock, Size, State) of - State2 when is_record(State2, state) -> - {noreply, State2#state{writes=Writes+1}}; - {error, closed} -> - {noreply, disconnect(State), ?RECONNECT_TIME} - end; -handle_info({tcp, Sock, Line}, #state{writes=Writes}=State) -> - Size = size(Line), - Remaining = State#state.contentlength - Size, - State2 = process_chunk(Line, State), - case Remaining < 1 of - true -> - {noreply, disconnect(State2#state{writes=Writes+1}), ?RECONNECT_TIME}; - false -> - inet:setopts(Sock, [{active, once}, { packet, line }]), - {noreply, State2#state{contentlength=Remaining, writes=Writes+1}} - end; -handle_info({tcp_closed, _Sock}, State) -> - {noreply, State#state{socket=undefined}, 0}; -handle_info({tcp_error, _Sock, Err}, State) -> - io:format("tcp_error/~p~n",[Err]), - {noreply, State#state{socket=undefined}, 0}; -handle_info(timeout, #state{socket=undefined}=State) -> +handle_info(timeout, State) -> + (catch gen_tcp:close(State#state.socket)), case connect(State) of - {ok, Sock} -> - {noreply, State#state{socket=Sock, transfer=undefined}}; + {ok, Mod, Sock} -> + {noreply, State#state{socket=Sock, mod=Mod}}; Err -> io:format("error/~p~n",[Err]), {noreply, State#state{socket=undefined}, ?RECONNECT_TIME} end; -handle_info(stats, State) -> - io:format("Stats: ~p writes/sec~n",[State#state.writes/?STAT_FREQ]), - erlang:send_after(?STAT_FREQ * 1000, self(), stats), - {noreply, State#state{writes=0}, 0}; -handle_info(Message, State) -> - io:format("unhandled info: ~p~n", [Message]), - {noreply, State}. + +handle_info({Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed -> + {noreply, State#state{socket=undefined}, 0}; + +handle_info({Err, _Sock, Err}, State) when Err == tcp_error; Err == ssl_error -> + io:format("error/~p~n", [Err]), + {noreply, State#state{socket=undefined}, 0}; + +handle_info(Packet, #state{socket=Sock, mod=Mod, fsm=Fsm, writes=Writes}=State) -> + Packet1 = + case Packet of + {ssl, Sock, Bin} when is_binary(Bin) -> Bin; + {tcp, Bin} when is_binary(Bin) -> Bin; + {ssl, Sock, P} -> P; + P -> P + end, + case lockstep_fsm:parse(Fsm, Packet1) of + ok -> + Mod:setopts(Sock, [{active, once}]), + {noreply, State}; + eoh -> + Mod:setopts(Sock, [{active, once}, {packet, line}]), + {noreply, State}; + {ok, Msg} -> + process_msg(Msg, State), + Mod:setopts(Sock, [{active, once}, {packet, line}]), + {noreply, State#state{writes=Writes+1}}; + closed -> + {noreply, disconnect(State), 0} + end. terminate(_Reason, _State) -> ok. @@ -161,41 +148,23 @@ init_state([{schema, Schema}|Tail], State) -> init_state([_|Tail], State) -> init_state(Tail, State). -read_chunk_size(Line) -> - { ok, [Bytes], [] } = io_lib:fread("~16u\r\n", binary_to_list(Line)), - Bytes. - -read_chunk(Socket, Bytes, State) -> - inet:setopts(Socket, [{active, false}, { packet, raw }]), - case gen_tcp:recv(Socket, Bytes + 2) of - { ok, <> } -> - inet:setopts(Socket, [{active, once}, { packet, line }]), - process_chunk(Data, State); - _ -> - {error, closed} - end. - -process_chunk(Chunk, State) -> - case mochijson2:decode(Chunk) of - { struct, Props } -> process_proplist(Props, State) - end, - State. - -process_proplist(Props, #state{schema=Schema, order_by=OrderBy}=State) -> - { ok, Meta, Record } = analyze(OrderBy, Props, Schema), +process_msg(Msg, #state{schema=Schema, order_by=OrderBy}=State) -> + {struct, Props} = mochijson2:decode(Msg), + {ok, Meta, Record} = analyze(OrderBy, Props, Schema), perform_update(Meta, Record, State). analyze(OrderBy, Props, Schema) -> - BlankRecord = erlang:make_tuple( length(Schema), undefined), - BlankMeta = { set, 0 }, + BlankRecord = erlang:make_tuple(length(Schema), undefined), + BlankMeta = {set, 0}, analyze(OrderBy, Props, Schema, BlankRecord, BlankMeta). analyze(OrderBy, [ P | Props], Schema, Record, Meta) -> NewMeta = build_meta(OrderBy, P, Meta), NewRecord = build_record(P, Schema, Record, 1), analyze(OrderBy, Props, Schema, NewRecord, NewMeta); + analyze(_OrderBy, [], _, Record, Meta) -> - { ok, Meta, Record }. + {ok, Meta, Record}. head(State) -> case ets:lookup(State#state.tid, lockstep_head) of @@ -269,7 +238,7 @@ connect(#state{uri={Proto, Pass, Host, Port, Path, _}}=State) -> Req = req(Pass, Host, Path, integer_to_list(head(State))), io:format("Sending ~p~n", [Req]), ok = Mod:send(Sock1, Req), - {ok, Sock}; + {ok, Mod, Sock1}; {error, Error} -> {error, Error} end. diff --git a/src/lockstep_fsm.erl b/src/lockstep_fsm.erl new file mode 100644 index 0000000..c441654 --- /dev/null +++ b/src/lockstep_fsm.erl @@ -0,0 +1,113 @@ +%% Copyright (c) 2011 +%% Orion Henry +%% Jacob Vorreuter +%% +%% Permission is hereby granted, free of charge, to any person +%% obtaining a copy of this software and associated documentation +%% files (the "Software"), to deal in the Software without +%% restriction, including without limitation the rights to use, +%% copy, modify, merge, publish, distribute, sublicense, and/or sell +%% copies of the Software, and to permit persons to whom the +%% Software is furnished to do so, subject to the following +%% conditions: +%% +%% The above copyright notice and this permission notice shall be +%% included in all copies or substantial portions of the Software. +%% +%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR +%% OTHER DEALINGS IN THE SOFTWARE. +-module(lockstep_fsm). +-behaviour(gen_fsm). + +-export([init/1, handle_event/3, handle_info/3, + handle_sync_event/4, code_change/4, terminate/3]). + +-export([start_link/0, parse/2, request_line/3, + header/3, chunk_size/3, chunk_line/3, body/3]). + +-record(state, {transfer, content_len}). + +start_link() -> + gen_fsm:start_link(?MODULE, [], []). + +parse(Pid, Packet) -> + gen_fsm:sync_send_event(Pid, Packet). + +init(_) -> + {ok, request_line, #state{}}. + +handle_event(Event, _StateName, State) -> + {stop, {unexpected_event, Event}, State}. + +handle_info(Info, _StateName, State) -> + {stop, {unexpected_info, Info}, State}. + +handle_sync_event(Event, _From, _StateName, State) -> + {stop, {unexpected_sync_event, Event}, State}. + +code_change(_OldVsn, StateName, State, _Extra) -> + {ok, StateName, State}. + +terminate(_Reason, _StateName, _State) -> + ok. + +request_line({http_response, {1,1}, Success, _}, _From, State) when Success >= 200, Success < 300 -> + {reply, ok, header, State}. + +header({http_header, _, 'Content-Length', _, Size}, _From, State) -> + {reply, ok, header, State#state{content_len=list_to_integer(binary_to_list(Size))}}; + +header({http_header, _, 'Transfer-Encoding', _, <<"chunked">>}, _From, State) -> + {reply, ok, header, State#state{transfer=chunked}}; + +header({http_header, _, _Key, _, _Value}, _From, State) -> + {reply, ok, header, State}; + +header(http_eoh, _From, #state{transfer=chunked}=State) -> + {reply, eoh, chunk_size, State}; + +header(http_eoh, _From, State) -> + {reply, eoh, body, State}. + +chunk_size(<<"0\r\n">>, _From, _State) -> + {reply, closed, request_line, #state{}}; + +chunk_size(Line, _From, State) -> + {ok, Size} = parse_chunk_size(Line), + {reply, ok, chunk_line, State#state{content_len=Size}}. + +chunk_line(Line, _From, #state{content_len=Size}=State) -> + case Line of + <> -> + Rest =/= <<"\r\n">> andalso io:format("recv'd extra chunk body size=~w body=~p~n", [Size, <>]), + {reply, {ok, Body}, chunk_size, State#state{content_len=undefined}}; + Other -> + io:format("recv'd poorly formatted chunk body size=~w body=~p~n", [Size, Other]), + {reply, closed, request_line, #state{}} + end. + +body(<<"\n">>, _From, State) -> + {reply, ok, body, State}; + +body(Packet, _From, State) -> + {reply, {ok, Packet}, body, State}. + +parse_chunk_size(Line) -> + Size0 = parse_chunk_size(Line, []), + {ok, [Size], []} = io_lib:fread("~16u", Size0), + {ok, Size}. + +parse_chunk_size(<<"\r\n">>, Acc) -> + lists:reverse(Acc); + +parse_chunk_size(<<$;, _/binary>>, Acc) -> + lists:reverse(Acc); + +parse_chunk_size(<>, Acc) -> + parse_chunk_size(Rest, [C|Acc]).