Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Tree: 33d6c9eb5c
Fetching contributors…

Cannot retrieve contributors at this time

120 lines (105 sloc) 3.694 kB
-module(cassanderl_dispatch).
-behaviour(dispcount).
-include("cassanderl.hrl").
-export([init/1, checkout/2, checkin/2, dead/1, handle_info/2,
code_change/3, terminate/2]).
-define(RECYCLE_COUNT, 150). %% This is picked somewhat arbitrarily.
-record(state, { keyspace,
client,
recycle_count,
usage_credits }).
%%%%%%%%%%%%%%%%%%%%%%%%
%%% CALLBACK MODULES %%%
%%%%%%%%%%%%%%%%%%%%%%%%
init([]) ->
Keyspace = env(cassanderl, default_keyspace, undefined),
Recycle = env(cassanderl, recycle_count, ?RECYCLE_COUNT),
Client =
case start_client(Keyspace) of
{ok, Client2} ->
Client2;
{error, _Error} ->
undefined
end,
State = #state {
keyspace = Keyspace,
client = Client,
recycle_count = Recycle,
usage_credits = Recycle
},
{ok, State}.
checkout(_From, State = #state{keyspace=Keyspace,
client=undefined,
recycle_count = RC}) ->
case start_client(Keyspace) of
{ok, Client} ->
{ok, Client, State#state{client=Client, usage_credits = RC }};
{error, Error} ->
{error, Error, State}
end;
checkout(_From, State = #state{client=Client}) ->
{ok, Client, State}.
%% The caller killed the client and tells us
checkin(died, State) ->
dead(State);
%% The client is alive.
checkin(Client, State) ->
{ok, recycle(State#state{client=Client})}.
dead(State) ->
{ok, State#state{client=undefined}}.
handle_info(_Msg, State) ->
{ok, State}.
terminate(_,_) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%%%%%%%%%%%%%%%%%%%%%
%%% PRIVATE/HELPERS %%%
%%%%%%%%%%%%%%%%%%%%%%%
start_client(Keyspace) ->
start_client(Keyspace, 3).
start_client(_Keyspace, 0) ->
{error, no_cluster_connection};
start_client(Keyspace, N) when N > 0 ->
{ok, {Hostname, Port} = HP} = cassanderl_watchdog:request_resource(),
case thrift_client_util:new(Hostname, Port, cassandra_thrift, [{framed, true}]) of
{ok, Client} ->
case Keyspace of
undefined ->
{ok, Client};
_ ->
case cassanderl:call(Client, set_keyspace, [Keyspace]) of
{undefined, {error, _Reason}} ->
%% Assume the Cassandra Node is down/unavailable
cassanderl_watchdog:fail_resource(HP),
%% Recycle the client
thrift_client:close(Client),
start_client(Keyspace, N-1);
{undefined, Response} ->
Response;
{Client2, {ok, ok}} ->
{ok, Client2};
{Client2, Otherwise} ->
{ok, Client2}
end
end;
{error, econnrefused} ->
cassanderl_watchdog:fail_resource(HP),
start_client(Keyspace, N-1);
{error, Error} ->
{error, Error}
end.
recycle(#state { usage_credits = 0, client = Client } = State) ->
case Client of
undefined -> State;
TC ->
{_, ok} = thrift_client:close(TC),
State#state { client = undefined }
end;
recycle(#state { usage_credits = C } = State) when C > 0 ->
State#state { usage_credits = C-1 }.
env(Application, Par, Default) ->
case application:get_env(Application, Par) of
{ok, Value} -> Value;
undefined -> Default
end.
Jump to Line
Something went wrong with that request. Please try again.