Skip to content

Commit

Permalink
Parse the code base with "mix format"
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 12, 2017
1 parent 2a2e02b commit 63eb7f9
Show file tree
Hide file tree
Showing 17 changed files with 303 additions and 183 deletions.
7 changes: 7 additions & 0 deletions .formatter.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[
inputs: [
"mix.exs",
"lib/**/*.ex",
"test/**/*.exs"
]
]
42 changes: 27 additions & 15 deletions lib/redix.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ defmodule Redix do
end

def child_spec([uri_or_redis_opts, connection_opts] = args)
when (is_binary(uri_or_redis_opts) or is_list(uri_or_redis_opts)) and is_list(connection_opts) do
when (is_binary(uri_or_redis_opts) or is_list(uri_or_redis_opts)) and
is_list(connection_opts) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, args},
type: :worker,
type: :worker
}
end

Expand Down Expand Up @@ -172,7 +173,7 @@ defmodule Redix do
{:ok, #PID<...>}
"""
@spec start_link(binary | Keyword.t, Keyword.t) :: GenServer.on_start
@spec start_link(binary | Keyword.t(), Keyword.t()) :: GenServer.on_start()
def start_link(uri_or_redis_opts \\ [], connection_opts \\ [])

def start_link(uri, other_opts) when is_binary(uri) and is_list(other_opts) do
Expand All @@ -197,7 +198,7 @@ defmodule Redix do
:ok
"""
@spec stop(GenServer.server, timeout) :: :ok
@spec stop(GenServer.server(), timeout) :: :ok
def stop(conn, timeout \\ :infinity) do
Redix.Connection.stop(conn, timeout)
end
Expand Down Expand Up @@ -245,8 +246,8 @@ defmodule Redix do
:closed
"""
@spec pipeline(GenServer.server, [command], Keyword.t) ::
{:ok, [Redix.Protocol.redis_value]} | {:error, atom}
@spec pipeline(GenServer.server(), [command], Keyword.t()) ::
{:ok, [Redix.Protocol.redis_value()]} | {:error, atom}
def pipeline(conn, commands, opts \\ []) do
assert_valid_pipeline_commands(commands)
Redix.Connection.pipeline(conn, commands, opts[:timeout] || @default_timeout)
Expand Down Expand Up @@ -289,11 +290,13 @@ defmodule Redix do
** (Redix.ConnectionError) :closed
"""
@spec pipeline!(GenServer.server, [command], Keyword.t) :: [Redix.Protocol.redis_value] | no_return
def pipeline!(conn, commands, opts \\ []) do
@spec pipeline!(GenServer.server(), [command], Keyword.t()) ::
[Redix.Protocol.redis_value()] | no_return
def pipeline!(conn, commands, opts \\ []) do
case pipeline(conn, commands, opts) do
{:ok, resp} ->
resp

{:error, error} ->
raise error
end
Expand Down Expand Up @@ -343,14 +346,16 @@ defmodule Redix do
:closed
"""
@spec command(GenServer.server, command, Keyword.t) ::
{:ok, Redix.Protocol.redis_value} | {:error, atom | Redix.Error.t}
@spec command(GenServer.server(), command, Keyword.t()) ::
{:ok, Redix.Protocol.redis_value()} | {:error, atom | Redix.Error.t()}
def command(conn, command, opts \\ []) do
case pipeline(conn, [command], opts) do
{:ok, [%Redix.Error{} = error]} ->
raise error

{:ok, [resp]} ->
{:ok, resp}

{:error, _reason} = error ->
error
end
Expand Down Expand Up @@ -391,11 +396,13 @@ defmodule Redix do
** (Redix.ConnectionError) :closed
"""
@spec command!(GenServer.server, command, Keyword.t) :: Redix.Protocol.redis_value | no_return
@spec command!(GenServer.server(), command, Keyword.t()) ::
Redix.Protocol.redis_value() | no_return
def command!(conn, command, opts \\ []) do
case command(conn, command, opts) do
{:ok, resp} ->
resp

{:error, error} ->
raise error
end
Expand All @@ -409,14 +416,19 @@ defmodule Redix do
Enum.each(commands, fn
[] ->
raise ArgumentError, "got an empty command ([]), which is not a valid Redis command"

[first | _] = command when first in ~w(SUBSCRIBE PSUBSCRIBE UNSUBSCRIBE PUNSUBSCRIBE) ->
raise ArgumentError, "Redix doesn't support Pub/Sub commands; use redix_pubsub " <>
"(https://github.com/whatyouhide/redix_pubsub) for Pub/Sub " <>
"functionality support. Offending command: #{inspect(command)}"
raise ArgumentError,
"Redix doesn't support Pub/Sub commands; use redix_pubsub " <>
"(https://github.com/whatyouhide/redix_pubsub) for Pub/Sub " <>
"functionality support. Offending command: #{inspect(command)}"

command when is_list(command) ->
:ok

other ->
raise ArgumentError, "expected a list of binaries as each Redis command, got: #{inspect(other)}"
raise ArgumentError,
"expected a list of binaries as each Redis command, got: #{inspect(other)}"
end)
end

Expand Down
99 changes: 68 additions & 31 deletions lib/redix/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,37 +13,35 @@ defmodule Redix.Connection do

@type state :: %__MODULE__{}

defstruct [
# The TCP socket that holds the connection to Redis
socket: nil,
# Options passed when the connection is started
opts: nil,
# The receiver process
receiver: nil,
# The shared state store process
shared_state: nil,
# The current backoff (used to compute the next backoff when reconnecting
# with exponential backoff)
backoff_current: nil,
]
# socket: The TCP socket that holds the connection to Redis
# opts: Options passed when the connection is started
# receiver: The receiver process
# shared_state: The shared state store process
# backoff_current: The current backoff (used to compute the next backoff when reconnecting
# with exponential backoff)
defstruct socket: nil,
opts: nil,
receiver: nil,
shared_state: nil,
backoff_current: nil

@backoff_exponent 1.5

## Public API

@spec start_link(Keyword.t, Keyword.t) :: GenServer.on_start
@spec start_link(Keyword.t(), Keyword.t()) :: GenServer.on_start()
def start_link(redis_opts, other_opts) do
{redix_opts, connection_opts} = Utils.sanitize_starting_opts(redis_opts, other_opts)
Connection.start_link(__MODULE__, redix_opts, connection_opts)
end

@spec stop(GenServer.server, timeout) :: :ok
@spec stop(GenServer.server(), timeout) :: :ok
def stop(conn, timeout) do
GenServer.stop(conn, :normal, timeout)
end

@spec pipeline(GenServer.server, [Redix.command], timeout) ::
{:ok, [Redix.Protocol.redis_value]} | {:error, atom}
@spec pipeline(GenServer.server(), [Redix.command()], timeout) ::
{:ok, [Redix.Protocol.redis_value()]} | {:error, atom}
def pipeline(conn, commands, timeout) do
request_id = make_ref()

Expand Down Expand Up @@ -103,31 +101,48 @@ defmodule Redix.Connection do
end

{:ok, %{state | shared_state: shared_state, receiver: receiver}}

{:error, reason} ->
log(state, :failed_connection, [
"Failed to connect to Redis (", Utils.format_host(state), "): ",
Exception.message(%ConnectionError{reason: reason}),
"Failed to connect to Redis (",
Utils.format_host(state),
"): ",
Exception.message(%ConnectionError{reason: reason})
])

next_backoff = calc_next_backoff(state.backoff_current || state.opts[:backoff_initial], state.opts[:backoff_max])
next_backoff =
calc_next_backoff(
state.backoff_current || state.opts[:backoff_initial],
state.opts[:backoff_max]
)

if state.opts[:exit_on_disconnection] do
{:stop, reason, state}
else
{:backoff, next_backoff, %{state | backoff_current: next_backoff}}
end
end

{:error, reason} ->
log(state, :failed_connection, [
"Failed to connect to Redis (", Utils.format_host(state), "): ",
Exception.message(%ConnectionError{reason: reason}),
"Failed to connect to Redis (",
Utils.format_host(state),
"): ",
Exception.message(%ConnectionError{reason: reason})
])

next_backoff = calc_next_backoff(state.backoff_current || state.opts[:backoff_initial], state.opts[:backoff_max])
next_backoff =
calc_next_backoff(
state.backoff_current || state.opts[:backoff_initial],
state.opts[:backoff_max]
)

if state.opts[:exit_on_disconnection] do
{:stop, reason, state}
else
{:backoff, next_backoff, %{state | backoff_current: next_backoff}}
end

{:stop, reason} ->
# {:stop, error} may be returned by Redix.Utils.connect/1 in case
# AUTH or SELECT fail (in that case, we don't want to try to reconnect
Expand All @@ -141,7 +156,10 @@ defmodule Redix.Connection do

def disconnect({:error, %ConnectionError{} = error} = _error, state) do
log(state, :disconnection, [
"Disconnected from Redis (", Utils.format_host(state), "): ", Exception.message(error),
"Disconnected from Redis (",
Utils.format_host(state),
"): ",
Exception.message(error)
])

:ok = :gen_tcp.close(state.socket)
Expand All @@ -165,7 +183,13 @@ defmodule Redix.Connection do
# the shared_state process.
:ok = SharedState.disconnect_clients_and_stop(state.shared_state)

state = %{state | socket: nil, shared_state: nil, backoff_current: state.opts[:backoff_initial]}
state = %{
state
| socket: nil,
shared_state: nil,
backoff_current: state.opts[:backoff_initial]
}

if state.opts[:exit_on_disconnection] do
{:stop, error, state}
else
Expand All @@ -187,9 +211,11 @@ defmodule Redix.Connection do
:ok = SharedState.enqueue(state.shared_state, {:commands, request_id, from, length(commands)})

data = Enum.map(commands, &Protocol.pack/1)

case :gen_tcp.send(state.socket, data) do
:ok ->
{:noreply, state}

{:error, reason} ->
{:disconnect, {:error, %ConnectionError{reason: reason}}, state}
end
Expand Down Expand Up @@ -219,12 +245,18 @@ defmodule Redix.Connection do
# because if we're receiving this message, it means the receiver died
# peacefully by itself (so we don't want to communicate with it anymore, in
# any way, before reconnecting and restarting it).
def handle_info({:receiver, pid, {:tcp_closed, socket}}, %{receiver: pid, socket: socket} = state) do
def handle_info(
{:receiver, pid, {:tcp_closed, socket}},
%{receiver: pid, socket: socket} = state
) do
state = %{state | receiver: nil}
{:disconnect, {:error, %ConnectionError{reason: :tcp_closed}}, state}
end

def handle_info({:receiver, pid, {:tcp_error, socket, reason}}, %{receiver: pid, socket: socket} = state) do
def handle_info(
{:receiver, pid, {:tcp_error, socket, reason}},
%{receiver: pid, socket: socket} = state
) do
state = %{state | receiver: nil}
{:disconnect, {:error, %ConnectionError{reason: reason}}, state}
end
Expand All @@ -243,25 +275,29 @@ defmodule Redix.Connection do
{:ok, socket} ->
state = %{state | socket: socket}
{:ok, shared_state} = SharedState.start_link()

case start_receiver_and_hand_socket(state.socket, shared_state) do
{:ok, receiver} ->
state = %{state | shared_state: shared_state, receiver: receiver}
{:ok, state}

{:error, reason} ->
{:stop, %ConnectionError{reason: reason}}
end

{error_or_stop, reason} when error_or_stop in [:error, :stop] ->
{:stop, %ConnectionError{reason: reason}}
end
end

defp start_receiver_and_hand_socket(socket, shared_state) do
{:ok, receiver} = Receiver.start_link(sender: self(), socket: socket, shared_state: shared_state)
{:ok, receiver} =
Receiver.start_link(sender: self(), socket: socket, shared_state: shared_state)

# We activate the socket after transferring control to the receiver
# process, so that we don't get any :tcp_closed messages before
# transferring control.
with :ok <- :gen_tcp.controlling_process(socket, receiver),
# We activate the socket after transferring control to the receiver
# process, so that we don't get any :tcp_closed messages before
# transferring control.
:ok <- :inet.setopts(socket, active: :once),
do: {:ok, receiver}
end
Expand Down Expand Up @@ -289,6 +325,7 @@ defmodule Redix.Connection do
state.opts
|> Keyword.fetch!(:log)
|> Keyword.fetch!(action)

Logger.log(level, message)
end
end
Loading

0 comments on commit 63eb7f9

Please sign in to comment.