Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Re-write

  • Loading branch information...
commit 65000f453ec5a0c8f904f90733e0183053f7fda9 1 parent dbd6d8c
@lpgauth authored
View
24 Makefile
@@ -0,0 +1,24 @@
+.PHONY: deps doc
+
+all: deps compile
+
+compile:
+ ./rebar compile
+
+deps:
+ ./rebar get-deps
+
+clean:
+ ./rebar clean
+
+distclean: clean
+ ./rebar delete-deps
+
+test:
+ ./rebar eunit
+
+dialyzer: compile
+ @dialyzer -Wno_return -c ebin
+
+doc :
+ @./rebar doc skip_deps=true
View
8 rebar.config
@@ -1,7 +1,7 @@
{erl_opts, [debug_info]}.
{deps, [
- {cassandra_thrift, "19.4.0",
- {git, "https://github.com/lpgauth/cassandra-thrift-erlang.git", "19.4.0"}},
- {thrift, "0.6.0",
- {git, "https://github.com/lpgauth/thrift-erlang.git", "master"}}
+ {cassandra, "19.10.0",
+ {git, "https://github.com/lpgauth/cassandra-thrift-erlang.git", "0.7-dev"}},
+ {thrift, "0.7.0",
+ {git, "https://github.com/lpgauth/thrift-erlang.git", "0.7-dev"}}
]}.
View
17 src/cassanderl.app.src
@@ -1,13 +1,18 @@
{application, cassanderl, [
{description, "Cassandra client"},
- {vsn, "0.2"},
+ {vsn, "0.3"},
{registered, []},
- {applications, [kernel, stdlib]},
- {mod, {cassanderl_app, []}},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { cassanderl_app, []}},
{env, [
{hostname, "127.0.0.1"},
{port, 9160},
- {keyspace, "Keyspace1"},
- {worker_pool_size, 10}
+ {default_keyspace, "AdGear"},
+ {worker_pool_size, 400},
+ {low_pending, 3000},
+ {high_pending, 6000}
]}
-]}.
+]}.
View
189 src/cassanderl.erl
@@ -1,51 +1,123 @@
-module(cassanderl).
-
-behaviour(gen_server).
+-include("cassanderl.hrl").
+
+-define(SERVER, ?MODULE).
+-define(DEFAULT_TIMEOUT, 500).
+
+-record(state, {
+ hostname,
+ port,
+ client
+}).
-%% API
--export([start_link/1]).
+%% ------------------------------------------------------------------
+%% API Function Exports
+%% ------------------------------------------------------------------
+
+-export([start_link/0, call/3, set_keyspace/1, set_keyspace/2, get/5,
+ get/6, insert/8, insert/9, describe_keyspace/1, describe_keyspace/2]).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Exports
+%% ------------------------------------------------------------------
-%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(state, {conn}).
--define(SERVER, ?MODULE).
-
-%%====================================================================
-%% API
-%%====================================================================
+%% ------------------------------------------------------------------
+%% API Function Definitions
+%% ------------------------------------------------------------------
-start_link(Name) ->
- gen_server:start_link({local, Name}, ?MODULE, [], []).
+start_link() ->
+ gen_server:start_link(?MODULE, [], []).
+
+call(Function, Args, Timeout) ->
+ case red() of
+ true ->
+ {error, too_busy};
+ false ->
+ increase_pending_requests(),
+ Worker = cassanderl_sup:pick_worker(),
+ Response =
+ try gen_server:call(Worker, {call, Function, Args}, Timeout) of
+ Reply -> Reply
+ catch
+ exit:_ ->
+ {error, timeout}
+ end,
+ decrease_pending_requests(),
+ Response
+ end.
+
+set_keyspace(Keyspace) ->
+ set_keyspace(Keyspace, ?DEFAULT_TIMEOUT).
-%%====================================================================
-%% gen_server callbacks
-%%====================================================================
+set_keyspace(Keyspace, Timeout) ->
+ call(set_keyspace, [Keyspace], Timeout).
+
+get(Key, ColumnFamily, SuperColumn, Column, ConsistencyLevel) ->
+ get(Key, ColumnFamily, SuperColumn, Column, ConsistencyLevel, ?DEFAULT_TIMEOUT).
+
+get(Key, ColumnFamily, SuperColumn, Column, ConsistencyLevel, Timeout) ->
+ ColumnPath = #columnPath {
+ column_family = ColumnFamily,
+ super_column = SuperColumn,
+ column = Column
+ },
+ call(get, [Key, ColumnPath, ConsistencyLevel], Timeout).
+
+insert(Key, ColumnFamily, SuperColumn, Name, Value, Timestamp, Ttl, ConsistencyLevel) ->
+ insert(Key, ColumnFamily, SuperColumn, Name, Value, Timestamp, Ttl, ConsistencyLevel, ?DEFAULT_TIMEOUT).
+
+insert(Key, ColumnFamily, SuperColumn, Name, Value, Timestamp, Ttl, ConsistencyLevel, Timeout) ->
+ ColumnParent = #columnParent {
+ column_family = ColumnFamily,
+ super_column = SuperColumn
+ },
+ Column = #column {
+ name = Name,
+ value = Value,
+ timestamp = Timestamp,
+ ttl = Ttl
+ },
+ call(insert, [Key, ColumnParent, Column, ConsistencyLevel], Timeout).
+
+describe_keyspace(Keyspace) ->
+ describe_keyspace(Keyspace, ?DEFAULT_TIMEOUT).
+
+describe_keyspace(Keyspace, Timeout) ->
+ call(describe_keyspace, [Keyspace], Timeout).
+
+%% ------------------------------------------------------------------
+%% gen_server Function Definitions
+%% ------------------------------------------------------------------
-init([]) ->
- %% Open Connection
+init(_Args) ->
{ok, Hostname} = application:get_env(cassanderl, hostname),
{ok, Port} = application:get_env(cassanderl, port),
- {ok, Conn} = thrift_client_util:new(Hostname, Port, cassandra_thrift, [{framed, true}]), ok,
-
- %% Set KeySpace
- {ok, KeySpace} = application:get_env(keyspace),
- {Conn2, {ok, ok}} = thrift_client:call(Conn, set_keyspace, [KeySpace]),
- {ok, #state{conn = Conn2}}.
-
-handle_call({call, Function, Args}, _From, #state{conn=Conn}=State) ->
- try thrift_client:call(Conn, Function, Args) of
- {NewConn, Response} ->
- NewState = State#state{conn=NewConn},
- {reply, {ok, Response}, NewState}
- catch
- {NewConn, {exception, {Exception}}} ->
- NewState = State#state{conn=NewConn},
- {reply, {exception, Exception}, NewState}
+ State = #state {
+ hostname = Hostname,
+ port = Port
+ },
+ {ok, State}.
+
+handle_call(Msg, From, #state{hostname=Hostname, port=Port, client=undefined}=State) ->
+ case new_thrift_client(Hostname, Port) of
+ undefined ->
+ {reply, {error, econnrefused}, State};
+ Client ->
+ handle_call(Msg, From, State#state{client=Client})
+ end;
+handle_call({call, Function, Args}, _From, #state{client=Client}=State) ->
+ case thrift_client:call(Client, Function, Args) of
+ {error, Reason} ->
+ {reply, {error, Reason}, State#state{client=undefined}};
+ {Client2, Response} ->
+ {reply, Response, State#state{client=Client2}}
end;
handle_call(_Request, _From, State) ->
- {reply, ok, State}.
+ {noreply, ok, State}.
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -53,9 +125,52 @@ handle_cast(_Msg, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #state{conn=Conn}) ->
- thrift_client:close(Conn),
+terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
+ {ok, State}.
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+new_thrift_client(Hostname, Port) ->
+ case thrift_client_util:new(Hostname, Port, cassandra_thrift, [{framed, true}]) of
+ {ok, Client} ->
+ {Client2, {ok, ok}} = thrift_client:call(Client, set_keyspace, ["AdGear"]),
+ Client2;
+ {error, _} ->
+ undefined
+ end.
+
+red() ->
+ Requests = ets:lookup_element(cassanderl, pending_requests, 2),
+ Low = ets:lookup_element(cassanderl, low_pending, 2),
+ High = ets:lookup_element(cassanderl, high_pending, 2),
+ case Requests of
+ _ when Low >= Requests ->
+ false;
+ _ when Requests >= High ->
+ true;
+ _ ->
+ random_drop(High - Requests)
+ end.
+
+random_drop(Distribution) ->
+ case erlang:phash2({self(), now()}, Distribution) + 1 of
+ Distribution ->
+ true;
+ _ ->
+ false
+ end.
+
+increase_pending_requests() ->
+ ets:update_counter(cassanderl, pending_requests, 1).
+
+decrease_pending_requests() ->
+ ets:update_counter(cassanderl, pending_requests, -1).
+
+
+
+
View
70 src/cassanderl_sup.erl
@@ -1,10 +1,11 @@
-module(cassanderl_sup).
-
-behaviour(supervisor).
+-define(SUPERVISOR, ?MODULE).
+
%% API
--export([start_link/0, pick_worker/0, call/2]).
+-export([start_link/0, pick_worker/0, expire_workers_cache/0]).
%% Supervisor callbacks
-export([init/1]).
@@ -14,39 +15,56 @@
%% ===================================================================
start_link() ->
+ ets:new(cassanderl, [set, public, named_table, {read_concurrency, true}]),
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+pick_worker() ->
+ Workers = get_workers(),
+ Size = ets:lookup_element(cassanderl, worker_pool_size, 2),
+ Index0 = erlang:phash2({self(), now()}, Size),
+ element(Index0 + 1, Workers).
+
+expire_workers_cache() ->
+ ets:delete(cassanderl, workers).
+
%% ===================================================================
%% Supervisor callbacks
%% ===================================================================
init([]) ->
- {ok, Size} = application:get_env(cassanderl, worker_pool_size),
- Workers = [ worker_spec(I) || I <- lists:seq(1, Size) ],
+ bootstrap_ets(),
+ WorkerPoolSize = ets:lookup_element(cassanderl, worker_pool_size, 2),
+ Workers = [ worker_spec(I) || I <- lists:seq(1, WorkerPoolSize) ],
{ok, {{one_for_one, 10, 1}, Workers}}.
-
-%% ===================================================================
-%% Internal API
-%% ===================================================================
-
+
+%% ------------------------------------------------------------------
+%% Internal Function Definitions
+%% ------------------------------------------------------------------
+
+bootstrap_ets() ->
+ {ok, WorkerPoolSize} = application:get_env(cassanderl, worker_pool_size),
+ {ok, Low} = application:get_env(cassanderl, low_pending),
+ {ok, High} = application:get_env(cassanderl, high_pending),
+ ets:insert(cassanderl, {worker_pool_size, WorkerPoolSize}),
+ ets:insert(cassanderl, {low_pending, Low}),
+ ets:insert(cassanderl, {high_pending, High}),
+ ets:insert(cassanderl, {pending_requests, 0}).
+
worker_spec(N) ->
- Name = list_to_atom("cassanderl_" ++ integer_to_list(N)),
- {Name,
- {cassanderl, start_link, [Name]},
+ {{cassanderl, N},
+ {cassanderl, start_link, []},
permanent, 1000, worker,
[cassanderl]
}.
-
-pick_worker() ->
- random:seed(erlang:now()),
- {ok, Size} = application:get_env(cassanderl, worker_pool_size),
- RandomN = random:uniform(Size),
- list_to_existing_atom("cassanderl_" ++ integer_to_list(RandomN)).
-
-call(Function, Args) ->
- Worker = pick_worker(),
- gen_server:call(Worker, {call, Function, Args}).
-
-
-
-
+
+get_workers() ->
+ try
+ ets:lookup_element(cassanderl, workers, 2)
+ catch
+ error:badarg ->
+ L = supervisor:which_children(?SUPERVISOR),
+ Pids = [ Child || {_Id, Child, _Type, _Modules} <- L, is_pid(Child) ],
+ PidsTuple = list_to_tuple(Pids),
+ ets:insert(cassanderl, {workers, PidsTuple}),
+ PidsTuple
+ end.
Please sign in to comment.
Something went wrong with that request. Please try again.