Skip to content
Browse files

Implement Load balancing over multiple servers for Cassanderl.

Rationale: If you run Cassandra locally on the same hosts where your program is
running, then you have no kind of single-point of failure. But if you run your system
with a load-balancer in front, then the SPOF is pretty obvious. This change let
cassanderl connect to multiple hosts automatically and also fault out hosts which
fails.

Design: A new process, the watchdog is introduced. The watchdog is queried by dispatchers
in cassanderl. The dispatchers are handed hosts in a round-robin fashion. If a host fails,
this is reported into the watchdog which then times out the given host for a little while.
  • Loading branch information...
1 parent da83557 commit 3d31237d64abb0bd2826dfe13352c68d47c2726b @jlouis jlouis committed Oct 3, 2012
Showing with 290 additions and 43 deletions.
  1. +8 −1 Makefile
  2. +25 −2 README.md
  3. +9 −0 integration.config
  4. +4 −0 integration.spec
  5. +1 −1 rebar.config
  6. +5 −1 src/cassanderl.erl
  7. +16 −1 src/cassanderl_app.erl
  8. +42 −24 src/cassanderl_dispatch.erl
  9. +3 −13 src/cassanderl_sup.erl
  10. +79 −0 src/cassanderl_watchdog.erl
  11. +98 −0 test/main_SUITE.erl
View
9 Makefile
@@ -20,5 +20,12 @@ test:
dialyzer: compile
@dialyzer -Wno_return -c ebin
-doc :
+doc:
@./rebar doc skip_deps=true
+
+common-test:
+ mkdir -p test_logs
+ ct_run -pa deps/*/ebin ebin \
+ -spec integration.spec \
+ -ct_hooks cth_surefire '[{path,"common_test_report.xml"}]' \
+ -erl_args -config integration.config
View
27 README.md
@@ -3,13 +3,14 @@
## Configuration ##
To start cassanderl, you will need to configure a section in your
-application config file:
+application config file this is the basic configuration:
{cassanderl,
[{hostname, "cassandra.talented-startup.com"},
{port, 9160},
{default_keyspace, "big_data"},
- {worker_pool_size, 10}
+ {worker_pool_size, 10},
+ {recycle_count, 5000}
]},
The options are as follows:
@@ -20,12 +21,34 @@ The options are as follows:
will be used for a newly formed connection.
* `worker_pool_size` - The size of the worker pool. How many concurrent
resource-connections do you allow to the cassandra cluster.
+* `recycle_count` - After this many ops on the connection, it is recycled. It is useful because
+ it can avoid leakage of memory and because in a cluster-load-balancing operation you want progress
+ so timed-out hosts get into the round-robin consideration again. Default is 150, but it does not hurt
+ to up it a lot.
Second, you will need to start up cassanderl (or make it part of your
boot script):
application:start(cassanderl).
+### Configuring for Load-balanced parallel cluster access ###
+
+Like above, you need configuration into your application:
+
+ {cassanderl,
+ [{hostnames, [{"host-001.cassandra.talented-startup.com", 9160},
+ {"host-002.cassandra.talented-startup.com", 9160},
+ ...,
+ {"host-099.cassandra.talented-startup.com", 9160}]}, % Scale to 11
+ {default_keyspace, "bigger_data"},
+ {worker_pool_size, 30},
+ {recycle_count, 1500}]},
+
+Note the omission of a `hostname` and `port` key, but the inclusion of the key `hostnames`. This
+means that cassanderl should be configured for multiple load-balanced connections in the cluster. Note that
+the value of `recycle_count` matter. Suppose that a node has been down for a while. Then the recycle_count ensures
+that if the node comes up again, it will be used by the system rather than sit idle to the side.
+
## Example of Usage ##
To use Cassanderl, you must first ask it for the current
View
9 integration.config
@@ -0,0 +1,9 @@
+[{cassanderl,
+ [{hostnames, [{"host-1", 9160},
+ {"host-2", 9160},
+ {"host-3", 9160},
+ {"host-4", 9160},
+ {"host-5", 9160},
+ {"localhost", 9160}]},
+ {default_keyspace, "Keyspace"},
+ {worker_pool_size, 10}]}].
View
4 integration.spec
@@ -0,0 +1,4 @@
+{logdir, "test_logs"}.
+%% {config, "integration.config"}.
+{alias, test, "test"}.
+{suites, test, [main_SUITE]}.
View
2 rebar.config
@@ -2,7 +2,7 @@
{deps, [
{cassandra_thrift, "19\.30\.0",
{git, "https://github.com/lpgauth/cassandra-thrift-erlang.git", {tag, "19.30.0"}}},
- {dispcount, "0\.1\.1",
+ {dispcount, "0\.1\.[0-9]+",
{git, "https://github.com/ferd/dispcount.git", "r14"}},
{thrift, "0\.8\.0",
{git, "https://github.com/lpgauth/thrift-erlang.git", {tag, "0.8.0"}}}
View
6 src/cassanderl.erl
@@ -4,7 +4,7 @@
%% ------------------------------------------------------------------
%% API Function Exports
%% ------------------------------------------------------------------
--export([column_parent/1, column_parent/2,
+-export([column_parent/1, column_parent/2, column_path/2,
get_info/0,
slice/2
]).
@@ -33,6 +33,10 @@ column_parent(Super, Family) ->
#columnParent { super_column = Super,
column_family = Family }.
+column_path(Family, Column) ->
+ #columnPath { column_family = Family,
+ column = Column }.
+
%% @doc Create a slice_range object for slice queries
%% @end
slice(Start, End) ->
View
17 src/cassanderl_app.erl
@@ -11,7 +11,22 @@
start(_StartType, _StartArgs) ->
application:start(dispcount),
- cassanderl_sup:start_link().
+ {ok, Pid} = cassanderl_sup:start_link(),
+ start_dispatcher(),
+ {ok, Pid}.
stop(_State) ->
ok.
+
+%% ===================================================================
+start_dispatcher() ->
+ {ok, Num} = application:get_env(cassanderl,worker_pool_size),
+ ok = dispcount:start_dispatch(
+ cassanderl_dispatch,
+ {cassanderl_dispatch, []},
+ [{restart,permanent},
+ {shutdown,1000},
+ {maxr,10},
+ {maxt,1},
+ {resources,Num}]
+ ).
View
66 src/cassanderl_dispatch.erl
@@ -5,40 +5,40 @@
-export([init/1, checkout/2, checkin/2, dead/1, handle_info/2,
code_change/3, terminate/2]).
--record(state, {hostname,
- port,
- keyspace,
- client}).
+-define(RECYCLE_COUNT, 150). %% This is picked somewhat arbitrarily.
+
+-record(state, { keyspace,
+ client,
+ recycle_count,
+ usage_credits }).
%%%%%%%%%%%%%%%%%%%%%%%%
%%% CALLBACK MODULES %%%
%%%%%%%%%%%%%%%%%%%%%%%%
init([]) ->
- Hostname = env(cassanderl, hostname),
- Port = env(cassanderl, port),
Keyspace = env(cassanderl, default_keyspace, undefined),
+ Recycle = env(cassanderl, recycle_count, ?RECYCLE_COUNT),
Client =
- case start_client(Hostname, Port, Keyspace) of
+ case start_client(Keyspace) of
{ok, Client2} ->
Client2;
{error, _Error} ->
undefined
end,
State = #state {
- hostname = Hostname,
- port = Port,
keyspace = Keyspace,
- client = Client
+ client = Client,
+ recycle_count = Recycle,
+ usage_credits = Recycle
},
{ok, State}.
-checkout(_From, State = #state{hostname=HostName,
- port=Port,
- keyspace=Keyspace,
- client=undefined}) ->
- case start_client(HostName, Port, Keyspace) of
+checkout(_From, State = #state{keyspace=Keyspace,
+ client=undefined,
+ recycle_count = RC}) ->
+ case start_client(Keyspace) of
{ok, Client} ->
- {ok, Client, State#state{client=Client}};
+ {ok, Client, State#state{client=Client, usage_credits = RC }};
{error, Error} ->
{error, Error, State}
end;
@@ -50,7 +50,7 @@ checkin(died, State) ->
dead(State);
%% The client is alive.
checkin(Client, State) ->
- {ok, State#state{client=Client}}.
+ {ok, recycle(State#state{client=Client})}.
dead(State) ->
{ok, State#state{client=undefined}}.
@@ -67,31 +67,49 @@ code_change(_OldVsn, State, _Extra) ->
%%%%%%%%%%%%%%%%%%%%%%%
%%% PRIVATE/HELPERS %%%
%%%%%%%%%%%%%%%%%%%%%%%
-start_client(Hostname, Port, Keyspace) ->
+start_client(Keyspace) ->
+ start_client(Keyspace, 3).
+
+start_client(_Keyspace, 0) ->
+ {error, no_cluster_connection};
+start_client(Keyspace, N) when N > 0 ->
+ {ok, {Hostname, Port} = HP} = cassanderl_watchdog:request_resource(),
case thrift_client_util:new(Hostname, Port, cassandra_thrift, [{framed, true}]) of
{ok, Client} ->
case Keyspace of
undefined ->
{ok, Client};
_ ->
case cassanderl:call(Client, set_keyspace, [Keyspace]) of
+ {undefined, {error, _Reason}} ->
+ %% Assume the Cassandra Node is down/unavailable
+ cassanderl_watchdog:fail_resource(HP),
+ start_client(Keyspace, N-1);
{undefined, Response} ->
Response;
{Client2, {ok, ok}} ->
{ok, Client2};
- {Client2, _} ->
+ {Client2, Otherwise} ->
{ok, Client2}
end
end;
+ {error, econnrefused} ->
+ cassanderl_watchdog:fail_resource(HP),
+ start_client(Keyspace, N-1);
{error, Error} ->
{error, Error}
end.
-env(Application, Par) ->
- case application:get_env(Application, Par) of
- {ok, Value} -> Value
- end.
-
+recycle(#state { usage_credits = 0, client = Client } = State) ->
+ case Client of
+ undefined -> State;
+ TC ->
+ {_, ok} = thrift_client:close(TC),
+ State#state { client = undefined }
+ end;
+recycle(#state { usage_credits = C } = State) when C > 0 ->
+ State#state { usage_credits = C-1 }.
+
env(Application, Par, Default) ->
case application:get_env(Application, Par) of
{ok, Value} -> Value;
View
16 src/cassanderl_sup.erl
@@ -8,6 +8,7 @@
%% Supervisor callbacks
-export([init/1]).
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
%% ===================================================================
%% API functions
@@ -24,20 +25,9 @@ get_info() ->
%% ===================================================================
init([]) ->
- start_dispatcher(),
- {ok, {{one_for_one, 10, 1}, []}}.
+ WatchDog = ?CHILD(cassanderl_watchdog, worker),
+ {ok, {{one_for_one, 10, 1}, [WatchDog]}}.
%% ------------------------------------------------------------------
%% Internal Function Definitions
%% ------------------------------------------------------------------
-start_dispatcher() ->
- {ok, Num} = application:get_env(cassanderl,worker_pool_size),
- ok = dispcount:start_dispatch(
- cassanderl_dispatch,
- {cassanderl_dispatch, []},
- [{restart,permanent},
- {shutdown,1000},
- {maxr,10},
- {maxt,1},
- {resources,Num}]
- ).
View
79 src/cassanderl_watchdog.erl
@@ -0,0 +1,79 @@
+-module(cassanderl_watchdog).
+
+-behaviour(gen_server).
+
+-export([start_link/0, request_resource/0, fail_resource/1]).
+
+-export([init/1, handle_cast/2, handle_call/3, code_change/3, terminate/2, handle_info/2]).
+
+-define(SERVER, ?MODULE).
+-define(TIMEOUT, timer:minutes(5)).
+
+-record(state, { servers,
+ walk }).
+
+%% ----------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+
+request_resource() ->
+ gen_server:call(?SERVER, request_resource).
+
+fail_resource(Resource) ->
+ gen_server:cast(?SERVER, {fail_resource, Resource}).
+
+%% ----------------------------------------------------------------------
+init([]) ->
+ Servers = case application:get_env(cassanderl, hostnames) of
+ {ok, HNs} -> HNs;
+ undefined ->
+ {ok, Hostname} = application:get_env(cassanderl, hostname),
+ {ok, Port} = application:get_env(cassanderl, port),
+ [{Hostname, Port}]
+ end,
+
+ {ok, #state { servers = Servers, walk = Servers }}.
+
+handle_cast({fail_resource, Resource}, #state { servers = Servers, walk = Walk} = State) ->
+ error_logger:info_report([{failing_resource, Resource}, {timeout, ?TIMEOUT}]),
+ case lists:member(Resource, Servers) of
+ false ->
+ %% Resource already in time-out
+ {noreply, State};
+ true ->
+ erlang:send_after(?TIMEOUT, self(), {reinsert, Resource}),
+ {noreply,
+ State#state { servers = Servers -- [Resource],
+ walk = Walk -- [Resource] } }
+ end;
+handle_cast(Msg, State) ->
+ lager:error("Unknown Msg: ~p", [{Msg, State}]),
+ {noreply, State}.
+
+handle_call(request_resource, From, #state { walk = [], servers = Servers } = State) ->
+ case Servers of
+ [] -> {reply, {error, no_servers_alive}, State};
+ _ -> handle_call(request_resource, From, State#state { walk = Servers })
+ end;
+handle_call(request_resource, _From, #state { walk = [HP | Next] } = State) ->
+ {reply, {ok, HP}, State#state { walk = Next }};
+handle_call(Msg, _From, State) ->
+ lager:error("Unknown Msg: ~p", [{Msg, State}]),
+ {reply, {error, wrong_msg}, State}.
+
+handle_info({reinsert, Resource}, #state { servers = Servers } = State) ->
+ {noreply, State#state { servers = [Resource | Servers] }};
+handle_info(Msg, State) ->
+ lager:error("Unknown Info Msg: ~p", [{Msg, State}]),
+ {noreply, State}.
+
+terminate(_What, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+
+
View
98 test/main_SUITE.erl
@@ -0,0 +1,98 @@
+-module(main_SUITE).
+
+-export([all/0, groups/0, suite/0,
+ init_per_suite/1, end_per_suite/1,
+ init_per_group/2, end_per_group/2,
+ init_per_testcase/2, end_per_testcase/2
+ ]).
+
+-export([cassanderl_startup_test/1,
+ cassanderl_connection_test/1
+ ]).
+-include_lib("common_test/include/ct.hrl").
+
+
+%% ----------------------------------------------------------------------
+
+
+-spec suite() -> proplists:proplist().
+suite() ->
+ [{'timetrap', {'minutes', 5}}].
+
+-spec groups() -> proplists:proplist().
+groups() ->
+ [{integration_test_group, [], [cassanderl_startup_test, cassanderl_connection_test]} ].
+
+-spec all() -> proplists:proplist().
+all() ->
+ [{group, integration_test_group}].
+
+-spec init_per_group(atom(), proplists:proplist()) -> proplists:proplist().
+init_per_group(_Group, Config) ->
+ Config.
+
+-spec end_per_group(atom(), proplists:proplist()) -> 'ok'.
+end_per_group(_, _Config) ->
+ ok.
+
+-spec init_per_suite(proplists:proplist()) ->
+ proplists:proplist().
+init_per_suite(Config) ->
+ ok = application:start(dispcount),
+ ok = application:start(thrift),
+ ok = application:start(cassandra_thrift),
+ ok = application:start(cassanderl),
+ ct:log("Started all applications"),
+ timer:sleep(500),
+ Config.
+
+-spec end_per_suite(proplists:proplist()) -> 'ok'.
+end_per_suite(_) ->
+ application:stop(cassanderl),
+ application:stop(cassandra_thrift),
+ application:stop(thrift),
+ application:stop(dispcount),
+ ok.
+
+-spec init_per_testcase(atom(), proplists:proplist()) -> proplists:proplist().
+init_per_testcase(_Case, Config) ->
+ Config.
+
+-spec end_per_testcase(atom(), proplists:proplist()) -> 'ok'.
+end_per_testcase(_Case, _Config) ->
+ ok.
+
+%% ----------------------------------------------------------------------
+
+cassanderl_connection_test(_Config) ->
+ Parent = self(),
+ Pids = [spawn_link(fun () ->
+ cassandra_worker(100),
+ Parent ! {done, self()}
+ end) || _ <- lists:seq(1, 30) ],
+ collect(Pids),
+ ok.
+
+cassanderl_startup_test(_Config) ->
+ ok.
+%% ----------------------------------------------------------------------
+
+collect([]) ->
+ ok;
+collect(Pids) when is_list(Pids) ->
+ receive
+ {done, Pid} ->
+ collect(Pids -- [Pid])
+ end.
+
+cassandra_worker(0) -> ok;
+cassandra_worker(N) when N > 0 ->
+ case cassanderl:call(get, [<<"Rowkey">>,
+ cassanderl:column_path(<<"ColumnFamily">>, <<"ColumnKey">>), 1]) of
+ {ok, _Row} -> cassandra_worker(N-1);
+ {error, busy} ->
+ timer:sleep(5),
+ cassandra_worker(N)
+ end.
+
+

1 comment on commit 3d31237

@ferd
ferd commented on 3d31237 Nov 26, 2012

Haven't tried it, but read it. That's a pretty nifty patch.

Please sign in to comment.
Something went wrong with that request. Please try again.