Skip to content

Commit

Permalink
add fsm
Browse files Browse the repository at this point in the history
  • Loading branch information
jkvor committed Jun 20, 2011
1 parent 455f861 commit 70608be
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 85 deletions.
139 changes: 54 additions & 85 deletions src/lockstep.erl
Expand Up @@ -34,13 +34,13 @@
-export([start_link/2, start_link/3]).

-record(state, {socket,
mod,
fsm,
tid=?MODULE,
disk=false,
order_by = <<"updated_at">>,
uri,
schema,
transfer,
contentlength,
writes=0}).

-define(RECONNECT_TIME, 3000).
Expand All @@ -62,76 +62,63 @@ start_link(Uri, Schema, Opts) when is_list(Uri),
%%====================================================================
init([Uri, Schema, Opts]) ->
State = init_state([{uri, Uri}, {schema, Schema} | Opts]),
{ok, Fsm} = lockstep_fsm:start_link(),
setup_tables(State),
erlang:send_after(?STAT_FREQ * 1000, self(), stats),
{ok, State, 0}.
Self = self(),
spawn(fun() -> timer:sleep(?STAT_FREQ * 1000), gen_server:cast(Self, stats) end),
{ok, State#state{fsm=Fsm}, 0}.

handle_call(_Message, _From, State) ->
{reply, error, State}.

handle_cast(stats, State) ->
io:format("Stats: ~p writes/sec~n",[State#state.writes/?STAT_FREQ]),
Self = self(),
spawn(fun() -> timer:sleep(?STAT_FREQ * 1000), gen_server:cast(Self, stats) end),
{noreply, State#state{writes=0}};

handle_cast(_Message, State) ->
{noreply, State}.

handle_info({http, Sock, Http}, State) ->
State2 = case Http of
{ http_response, {1,1}, 200, <<"OK">> } ->
State;
{ http_header, _, _Key = 'Content-Length', _, Size } ->
io:format("Header: ~p: ~p~n",[_Key, Size]),
State#state{ contentlength=list_to_integer(binary_to_list(Size)) };
{ http_header, _, _Key = 'Transfer-Encoding', _, _Value = <<"chunked">> } ->
io:format("Header: ~p: ~p~n",[_Key, _Value]),
State#state{ transfer=chunked };
{ http_header, _, _Key, _, _Value } ->
io:format("Header: ~p: ~p~n",[_Key, _Value]),
State;
http_eoh ->
inet:setopts(Sock, [{packet, line}]),
State
end,
inet:setopts(Sock, [{active, once}]),
{noreply, State2};
handle_info({tcp, _Sock, <<"0\r\n">> }, #state{transfer=chunked}=State) ->
{noreply, disconnect(State), ?RECONNECT_TIME};
handle_info({tcp, Sock, Line}, #state{writes=Writes, transfer=chunked}=State) ->
Size = read_chunk_size(Line),
case read_chunk(Sock, Size, State) of
State2 when is_record(State2, state) ->
{noreply, State2#state{writes=Writes+1}};
{error, closed} ->
{noreply, disconnect(State), ?RECONNECT_TIME}
end;
handle_info({tcp, Sock, Line}, #state{writes=Writes}=State) ->
Size = size(Line),
Remaining = State#state.contentlength - Size,
State2 = process_chunk(Line, State),
case Remaining < 1 of
true ->
{noreply, disconnect(State2#state{writes=Writes+1}), ?RECONNECT_TIME};
false ->
inet:setopts(Sock, [{active, once}, { packet, line }]),
{noreply, State2#state{contentlength=Remaining, writes=Writes+1}}
end;
handle_info({tcp_closed, _Sock}, State) ->
{noreply, State#state{socket=undefined}, 0};
handle_info({tcp_error, _Sock, Err}, State) ->
io:format("tcp_error/~p~n",[Err]),
{noreply, State#state{socket=undefined}, 0};
handle_info(timeout, #state{socket=undefined}=State) ->
handle_info(timeout, State) ->
(catch gen_tcp:close(State#state.socket)),
case connect(State) of
{ok, Sock} ->
{noreply, State#state{socket=Sock, transfer=undefined}};
{ok, Mod, Sock} ->
{noreply, State#state{socket=Sock, mod=Mod}};
Err ->
io:format("error/~p~n",[Err]),
{noreply, State#state{socket=undefined}, ?RECONNECT_TIME}
end;
handle_info(stats, State) ->
io:format("Stats: ~p writes/sec~n",[State#state.writes/?STAT_FREQ]),
erlang:send_after(?STAT_FREQ * 1000, self(), stats),
{noreply, State#state{writes=0}, 0};
handle_info(Message, State) ->
io:format("unhandled info: ~p~n", [Message]),
{noreply, State}.

handle_info({Closed, _Sock}, State) when Closed == tcp_closed; Closed == ssl_closed ->
{noreply, State#state{socket=undefined}, 0};

handle_info({Err, _Sock, Err}, State) when Err == tcp_error; Err == ssl_error ->
io:format("error/~p~n", [Err]),
{noreply, State#state{socket=undefined}, 0};

handle_info(Packet, #state{socket=Sock, mod=Mod, fsm=Fsm, writes=Writes}=State) ->
Packet1 =
case Packet of
{ssl, Sock, Bin} when is_binary(Bin) -> Bin;
{tcp, Bin} when is_binary(Bin) -> Bin;
{ssl, Sock, P} -> P;
P -> P
end,
case lockstep_fsm:parse(Fsm, Packet1) of
ok ->
Mod:setopts(Sock, [{active, once}]),
{noreply, State};
eoh ->
Mod:setopts(Sock, [{active, once}, {packet, line}]),
{noreply, State};
{ok, Msg} ->
process_msg(Msg, State),
Mod:setopts(Sock, [{active, once}, {packet, line}]),
{noreply, State#state{writes=Writes+1}};
closed ->
{noreply, disconnect(State), 0}
end.

terminate(_Reason, _State) -> ok.

Expand Down Expand Up @@ -161,41 +148,23 @@ init_state([{schema, Schema}|Tail], State) ->
init_state([_|Tail], State) ->
init_state(Tail, State).

read_chunk_size(Line) ->
{ ok, [Bytes], [] } = io_lib:fread("~16u\r\n", binary_to_list(Line)),
Bytes.

read_chunk(Socket, Bytes, State) ->
inet:setopts(Socket, [{active, false}, { packet, raw }]),
case gen_tcp:recv(Socket, Bytes + 2) of
{ ok, <<Data:Bytes/binary,"\r\n">> } ->
inet:setopts(Socket, [{active, once}, { packet, line }]),
process_chunk(Data, State);
_ ->
{error, closed}
end.

process_chunk(Chunk, State) ->
case mochijson2:decode(Chunk) of
{ struct, Props } -> process_proplist(Props, State)
end,
State.

process_proplist(Props, #state{schema=Schema, order_by=OrderBy}=State) ->
{ ok, Meta, Record } = analyze(OrderBy, Props, Schema),
process_msg(Msg, #state{schema=Schema, order_by=OrderBy}=State) ->
{struct, Props} = mochijson2:decode(Msg),
{ok, Meta, Record} = analyze(OrderBy, Props, Schema),
perform_update(Meta, Record, State).

analyze(OrderBy, Props, Schema) ->
BlankRecord = erlang:make_tuple( length(Schema), undefined),
BlankMeta = { set, 0 },
BlankRecord = erlang:make_tuple(length(Schema), undefined),
BlankMeta = {set, 0},
analyze(OrderBy, Props, Schema, BlankRecord, BlankMeta).

analyze(OrderBy, [ P | Props], Schema, Record, Meta) ->
NewMeta = build_meta(OrderBy, P, Meta),
NewRecord = build_record(P, Schema, Record, 1),
analyze(OrderBy, Props, Schema, NewRecord, NewMeta);

analyze(_OrderBy, [], _, Record, Meta) ->
{ ok, Meta, Record }.
{ok, Meta, Record}.

head(State) ->
case ets:lookup(State#state.tid, lockstep_head) of
Expand Down Expand Up @@ -269,7 +238,7 @@ connect(#state{uri={Proto, Pass, Host, Port, Path, _}}=State) ->
Req = req(Pass, Host, Path, integer_to_list(head(State))),
io:format("Sending ~p~n", [Req]),
ok = Mod:send(Sock1, Req),
{ok, Sock};
{ok, Mod, Sock1};
{error, Error} ->
{error, Error}
end.
Expand Down
113 changes: 113 additions & 0 deletions src/lockstep_fsm.erl
@@ -0,0 +1,113 @@
%% Copyright (c) 2011
%% Orion Henry <orion@heroku.com>
%% Jacob Vorreuter <jacob.vorreuter@gmail.com>
%%
%% Permission is hereby granted, free of charge, to any person
%% obtaining a copy of this software and associated documentation
%% files (the "Software"), to deal in the Software without
%% restriction, including without limitation the rights to use,
%% copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the
%% Software is furnished to do so, subject to the following
%% conditions:
%%
%% The above copyright notice and this permission notice shall be
%% included in all copies or substantial portions of the Software.
%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
%% EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
%% OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
%% NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
%% HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
%% WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
%% FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
%% OTHER DEALINGS IN THE SOFTWARE.
-module(lockstep_fsm).
-behaviour(gen_fsm).

-export([init/1, handle_event/3, handle_info/3,
handle_sync_event/4, code_change/4, terminate/3]).

-export([start_link/0, parse/2, request_line/3,
header/3, chunk_size/3, chunk_line/3, body/3]).

-record(state, {transfer, content_len}).

start_link() ->
gen_fsm:start_link(?MODULE, [], []).

parse(Pid, Packet) ->
gen_fsm:sync_send_event(Pid, Packet).

init(_) ->
{ok, request_line, #state{}}.

handle_event(Event, _StateName, State) ->
{stop, {unexpected_event, Event}, State}.

handle_info(Info, _StateName, State) ->
{stop, {unexpected_info, Info}, State}.

handle_sync_event(Event, _From, _StateName, State) ->
{stop, {unexpected_sync_event, Event}, State}.

code_change(_OldVsn, StateName, State, _Extra) ->
{ok, StateName, State}.

terminate(_Reason, _StateName, _State) ->
ok.

request_line({http_response, {1,1}, Success, _}, _From, State) when Success >= 200, Success < 300 ->
{reply, ok, header, State}.

header({http_header, _, 'Content-Length', _, Size}, _From, State) ->
{reply, ok, header, State#state{content_len=list_to_integer(binary_to_list(Size))}};

header({http_header, _, 'Transfer-Encoding', _, <<"chunked">>}, _From, State) ->
{reply, ok, header, State#state{transfer=chunked}};

header({http_header, _, _Key, _, _Value}, _From, State) ->
{reply, ok, header, State};

header(http_eoh, _From, #state{transfer=chunked}=State) ->
{reply, eoh, chunk_size, State};

header(http_eoh, _From, State) ->
{reply, eoh, body, State}.

chunk_size(<<"0\r\n">>, _From, _State) ->
{reply, closed, request_line, #state{}};

chunk_size(Line, _From, State) ->
{ok, Size} = parse_chunk_size(Line),
{reply, ok, chunk_line, State#state{content_len=Size}}.

chunk_line(Line, _From, #state{content_len=Size}=State) ->
case Line of
<<Body:Size/binary, Rest/binary>> ->
Rest =/= <<"\r\n">> andalso io:format("recv'd extra chunk body size=~w body=~p~n", [Size, <<Body/binary, Rest/binary>>]),
{reply, {ok, Body}, chunk_size, State#state{content_len=undefined}};
Other ->
io:format("recv'd poorly formatted chunk body size=~w body=~p~n", [Size, Other]),
{reply, closed, request_line, #state{}}
end.

body(<<"\n">>, _From, State) ->
{reply, ok, body, State};

body(Packet, _From, State) ->
{reply, {ok, Packet}, body, State}.

parse_chunk_size(Line) ->
Size0 = parse_chunk_size(Line, []),
{ok, [Size], []} = io_lib:fread("~16u", Size0),
{ok, Size}.

parse_chunk_size(<<"\r\n">>, Acc) ->
lists:reverse(Acc);

parse_chunk_size(<<$;, _/binary>>, Acc) ->
lists:reverse(Acc);

parse_chunk_size(<<C, Rest/binary>>, Acc) ->
parse_chunk_size(Rest, [C|Acc]).

0 comments on commit 70608be

Please sign in to comment.