Skip to content

Commit

Permalink
Merge pull request #38 from lpgauth/feature/basic-auth
Browse files Browse the repository at this point in the history
Add support for PasswordAuthenticator
  • Loading branch information
lpgauth committed Jul 10, 2018
2 parents b0a6d5e + e9da14d commit 1ef9022
Show file tree
Hide file tree
Showing 11 changed files with 229 additions and 66 deletions.
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ High-Performance Erlang Cassandra CQL Client
<td>undefined</td>
<td>default keyspace</td>
</tr>
<tr>
<td>password</td>
<td>binary()</td>
<td>undefined</td>
<td>password for authentication</td>
</tr>
<tr>
<td>pool_size</td>
<td>pos_integer()</td>
Expand Down Expand Up @@ -81,6 +87,12 @@ High-Performance Erlang Cassandra CQL Client
<td>token_aware</td>
<td>load balancing strategy across nodes</td>
</tr>
<tr>
<td>username</td>
<td>binary()</td>
<td>undefined</td>
<td>username for authentication</td>
</tr>
</table>

### Bootstraping
Expand Down
2 changes: 1 addition & 1 deletion doc/marina_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ frame_flag() = 0..1


<pre><code>
state() = #state{buffer = <a href="#type-buffer">buffer()</a>, frame_flags = <a href="#type-frame_flag">frame_flag()</a>, keyspace = binary() | undefined, requests = non_neg_integer()}
state() = #state{buffer = <a href="#type-buffer">buffer()</a>, frame_flags = <a href="#type-frame_flag">frame_flag()</a>, requests = non_neg_integer()}
</code></pre>

<a name="index"></a>
Expand Down
11 changes: 10 additions & 1 deletion doc/marina_request.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,13 +94,22 @@ values() = [<a href="#type-value">value()</a>]
## Function Index ##


<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#execute-4">execute/4</a></td><td></td></tr><tr><td valign="top"><a href="#prepare-3">prepare/3</a></td><td></td></tr><tr><td valign="top"><a href="#query-4">query/4</a></td><td></td></tr><tr><td valign="top"><a href="#startup-1">startup/1</a></td><td></td></tr></table>
<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#auth_response-3">auth_response/3</a></td><td></td></tr><tr><td valign="top"><a href="#execute-4">execute/4</a></td><td></td></tr><tr><td valign="top"><a href="#prepare-3">prepare/3</a></td><td></td></tr><tr><td valign="top"><a href="#query-4">query/4</a></td><td></td></tr><tr><td valign="top"><a href="#startup-1">startup/1</a></td><td></td></tr></table>


<a name="functions"></a>

## Function Details ##

<a name="auth_response-3"></a>

### auth_response/3 ###

<pre><code>
auth_response(FrameFlags::<a href="#type-frame_flag">frame_flag()</a>, Username::binary(), Password::binary()) -&gt; iolist()
</code></pre>
<br />

<a name="execute-4"></a>

### execute/4 ###
Expand Down
48 changes: 47 additions & 1 deletion doc/marina_utils.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ consistency_level() = 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10



### <a name="type-frame_flag">frame_flag()</a> ###


<pre><code>
frame_flag() = 0..1
</code></pre>




### <a name="type-query_opts">query_opts()</a> ###


Expand Down Expand Up @@ -54,13 +64,22 @@ values() = [<a href="#type-value">value()</a>]
## Function Index ##


<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#connect-2">connect/2</a></td><td></td></tr><tr><td valign="top"><a href="#pack-1">pack/1</a></td><td></td></tr><tr><td valign="top"><a href="#query-2">query/2</a></td><td></td></tr><tr><td valign="top"><a href="#query_opts-2">query_opts/2</a></td><td></td></tr><tr><td valign="top"><a href="#sync_msg-2">sync_msg/2</a></td><td></td></tr><tr><td valign="top"><a href="#timeout-2">timeout/2</a></td><td></td></tr><tr><td valign="top"><a href="#unpack-1">unpack/1</a></td><td></td></tr></table>
<table width="100%" border="1" cellspacing="0" cellpadding="2" summary="function index"><tr><td valign="top"><a href="#authenticate-1">authenticate/1</a></td><td></td></tr><tr><td valign="top"><a href="#connect-2">connect/2</a></td><td></td></tr><tr><td valign="top"><a href="#frame_flags-0">frame_flags/0</a></td><td></td></tr><tr><td valign="top"><a href="#pack-1">pack/1</a></td><td></td></tr><tr><td valign="top"><a href="#query-2">query/2</a></td><td></td></tr><tr><td valign="top"><a href="#query_opts-2">query_opts/2</a></td><td></td></tr><tr><td valign="top"><a href="#startup-1">startup/1</a></td><td></td></tr><tr><td valign="top"><a href="#sync_msg-2">sync_msg/2</a></td><td></td></tr><tr><td valign="top"><a href="#timeout-2">timeout/2</a></td><td></td></tr><tr><td valign="top"><a href="#unpack-1">unpack/1</a></td><td></td></tr><tr><td valign="top"><a href="#use_keyspace-1">use_keyspace/1</a></td><td></td></tr></table>


<a name="functions"></a>

## Function Details ##

<a name="authenticate-1"></a>

### authenticate/1 ###

<pre><code>
authenticate(Socket::<a href="inet.md#type-socket">inet:socket()</a>) -&gt; ok | {error, atom()}
</code></pre>
<br />

<a name="connect-2"></a>

### connect/2 ###
Expand All @@ -70,6 +89,15 @@ connect(Ip::<a href="inet.md#type-socket_address">inet:socket_address()</a> | <a
</code></pre>
<br />

<a name="frame_flags-0"></a>

### frame_flags/0 ###

<pre><code>
frame_flags() -&gt; <a href="#type-frame_flag">frame_flag()</a>
</code></pre>
<br />

<a name="pack-1"></a>

### pack/1 ###
Expand Down Expand Up @@ -97,6 +125,15 @@ query_opts(X1::atom(), QueryOpts::<a href="#type-query_opts">query_opts()</a>) -
</code></pre>
<br />

<a name="startup-1"></a>

### startup/1 ###

<pre><code>
startup(Socket::<a href="inet.md#type-socket">inet:socket()</a>) -&gt; {ok, binary() | undefined} | {error, atom()}
</code></pre>
<br />

<a name="sync_msg-2"></a>

### sync_msg/2 ###
Expand Down Expand Up @@ -124,3 +161,12 @@ unpack(X1::binary()) -&gt; {ok, binary()} | {error, term()}
</code></pre>
<br />

<a name="use_keyspace-1"></a>

### use_keyspace/1 ###

<pre><code>
use_keyspace(Socket::<a href="inet.md#type-socket">inet:socket()</a>) -&gt; ok | {error, atom()}
</code></pre>
<br />

2 changes: 1 addition & 1 deletion elvis.config
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
{elvis_style, macro_module_names},
{elvis_style, macro_names},
{elvis_style, module_naming_convention, #{regex => "^([a-z][a-z0-9]*_?)*(_SUITE)?$", ignore => []}},
{elvis_style, nesting_level, #{level => 3}},
{elvis_style, nesting_level, #{level => 4}},
{elvis_style, no_behavior_info},
{elvis_style, no_if_expression},
{elvis_style, no_spec_with_records},
Expand Down
11 changes: 8 additions & 3 deletions src/marina_body.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,15 @@ decode(#frame {flags = 1, body = Body, opcode = Opcode}) ->
decode(Opcode, Body2).

%% private
decode(?OP_READY, _) ->
{ok, undefined};
decode(?OP_ERROR, Body) ->
{Code, Rest} = marina_types:decode_int(Body),
{Msg, _Rest2} = marina_types:decode_string(Rest),
{error, {Code, Msg}};
decode(?OP_READY, _) ->
{ok, undefined};
decode(?OP_AUTHENTICATE, Body) ->
{Authenticator, <<>>} = marina_types:decode_string(Body),
{ok, Authenticator};
decode(?OP_RESULT, <<1:32/integer>>) ->
{ok, undefined};
decode(?OP_RESULT, <<2:32/integer, Rest/binary>>) ->
Expand Down Expand Up @@ -63,7 +66,9 @@ decode(?OP_RESULT, <<5:32/integer, Rest/binary>>) ->
{Option, Option2}
end,

{ok, {ChangeType, Target, Options}}.
{ok, {ChangeType, Target, Options}};
decode(?OP_AUTH_SUCCESS, _) ->
{ok, undefined}.

decode_columns(Bin, Count) ->
decode_columns(Bin, Count, []).
Expand Down
62 changes: 27 additions & 35 deletions src/marina_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,31 +16,45 @@
-record(state, {
buffer = marina_buffer:new() :: buffer(),
frame_flags = 0 :: frame_flag(),
keyspace = undefined :: binary() | undefined,
requests = 0 :: non_neg_integer()
}).

-type state() :: #state {}.

%% shackle_server callbacks
-spec init() -> {ok, state()}.
-spec init() ->
{ok, state()}.

init() ->
Keyspace = ?GET_ENV(keyspace, undefined),

{ok, #state {
frame_flags = frame_flags(),
keyspace = Keyspace
frame_flags = marina_utils:frame_flags()
}}.

-spec setup(inet:socket(), state()) -> {ok, state()} |
-spec setup(inet:socket(), state()) ->
{ok, state()} |
{error, atom(), state()}.

setup(Socket, #state {frame_flags = FrameFlags} = State) ->
Msg = marina_request:startup(FrameFlags),
case marina_utils:sync_msg(Socket, Msg) of
setup(Socket, State) ->
case marina_utils:startup(Socket) of
{ok, undefined} ->
set_keyspace(Socket, State);
case marina_utils:use_keyspace(Socket) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
{ok, <<"org.apache.cassandra.auth.PasswordAuthenticator">>} ->
case marina_utils:authenticate(Socket) of
ok ->
case marina_utils:use_keyspace(Socket) of
ok ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end;
{error, Reason} ->
{error, Reason, State}
end;
{error, Reason} ->
{error, Reason, State}
end.
Expand Down Expand Up @@ -82,30 +96,8 @@ handle_data(Data, #state {
buffer = Buffer2
}}.

-spec terminate(state()) -> ok.
-spec terminate(state()) ->
ok.

terminate(_State) ->
ok.

%% private
frame_flags() ->
case ?GET_ENV(compression, false) of
true -> 1;
_ -> 0
end.

set_keyspace(_Socket, #state {keyspace = undefined} = State) ->
{ok, State};
set_keyspace(Socket, #state {
frame_flags = FrameFlags,
keyspace = Keyspace
} = State) ->

Query = <<"USE \"", Keyspace/binary, "\"">>,
Msg = marina_request:query(0, FrameFlags, Query, #{}),
case marina_utils:sync_msg(Socket, Msg) of
{ok, Keyspace} ->
{ok, State};
{error, Reason} ->
{error, Reason, State}
end.
45 changes: 31 additions & 14 deletions src/marina_pool_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,28 @@ terminate(_Reason, #state {node_count = NodeCount}) ->
ok.

%% private
connect(Ip, Port) ->
case marina_utils:connect(Ip, Port) of
{ok, Socket} ->
case marina_utils:startup(Socket) of
{ok, undefined} ->
{ok, Socket};
{ok, <<"org.apache.cassandra.auth.PasswordAuthenticator">>} ->
case marina_utils:authenticate(Socket) of
ok ->
{ok, Socket};
{error, Reason} ->
gen_tcp:close(Socket),
{error, Reason}
end;
{error, Reason} ->
gen_tcp:close(Socket),
{error, Reason}
end;
{error, Reason} ->
{error, Reason}
end.

filter_datacenter([], _Datacenter) ->
[];
filter_datacenter([[RpcAddress, _Datacenter, Tokens] | T], undefined) ->
Expand Down Expand Up @@ -106,20 +128,15 @@ nodes([Ip | T], Port) ->
end.

peers(Ip, Port) ->
case marina_utils:connect(Ip, Port) of
case connect(Ip, Port) of
{ok, Socket} ->
Msg = marina_request:startup(0),
case marina_utils:sync_msg(Socket, Msg) of
{ok, undefined} ->
{ok, {result, _ , _, Rows}} =
marina_utils:query(Socket, ?LOCAL_QUERY),
[[_RpcAddress, Datacenter, _Tokens]] = Rows,
{ok, {result, _ , _, Rows2}} =
marina_utils:query(Socket, ?PEERS_QUERY),
{ok, Rows ++ Rows2, Datacenter};
{error, Reason} ->
{error, Reason}
end;
peers_query(Socket);
{error, Reason} ->
{error, Reason}
end.
end.

peers_query(Socket) ->
{ok, {result, _ , _, Rows}} = marina_utils:query(Socket, ?LOCAL_QUERY),
[[_RpcAddress, Datacenter, _Tokens]] = Rows,
{ok, {result, _ , _, Rows2}} = marina_utils:query(Socket, ?PEERS_QUERY),
{ok, Rows ++ Rows2, Datacenter}.
14 changes: 14 additions & 0 deletions src/marina_request.erl
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,27 @@
-compile({inline_size, 512}).

-export([
auth_response/3,
execute/4,
prepare/3,
query/4,
startup/1
]).

%% public
-spec auth_response(frame_flag(), binary(), binary()) -> iolist().

auth_response(FrameFlags, Username, Password) ->
Body = <<0, Username/binary, 0, Password/binary>>,
Body2 = encode_body(FrameFlags, [marina_types:encode_bytes(Body)]),

marina_frame:encode(#frame {
stream = ?DEFAULT_STREAM,
opcode = ?OP_AUTH_RESPONSE,
flags = FrameFlags,
body = Body2
}).

-spec execute(stream(), frame_flag(), statement_id(), query_opts()) -> iolist().

execute(Stream, FrameFlags, StatementId, QueryOpts) ->
Expand Down
Loading

0 comments on commit 1ef9022

Please sign in to comment.