Skip to content

Commit

Permalink
Make sure addresses are never charlists
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 10, 2023
1 parent 44c831b commit 1fc8011
Show file tree
Hide file tree
Showing 13 changed files with 107 additions and 79 deletions.
21 changes: 20 additions & 1 deletion lib/xandra.ex
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,25 @@ defmodule Xandra do
and its supported options. See `Xandra.Authenticator`.
"""
],
backoff_min: [
type: :non_neg_integer,
default: 1000,
doc: "The minimum backoff interval (in milliseconds)."
],
backoff_max: [
type: :non_neg_integer,
default: 30_000,
doc: "The maximum backoff interval (in milliseconds)."
],
backoff_type: [
type: {:in, [:stop, :exp, :rand, :rand_exp]},
default: :rand_exp,
doc: """
The backoff strategy. `:stop` means the connection will stop when a disconnection happens,
`:exp` means exponential backoff, `:rand` is random backoff, and `:rand_exp` is random
exponential backoff.
"""
],
compressor: [
type: {:custom, Xandra.OptionsValidators, :validate_module, ["compressor"]},
type_doc: "`t:module/0`",
Expand Down Expand Up @@ -476,7 +495,7 @@ defmodule Xandra do
{[], _opts} ->
raise ArgumentError, "the :nodes option can't be an empty list"

{[node], opts} ->
{[{_address, _port} = node], opts} ->
{node, opts}

{nodes, _opts} ->
Expand Down
19 changes: 12 additions & 7 deletions lib/xandra/cluster/control_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,8 @@ defmodule Xandra.Cluster.ControlConnection do
connection_opts = Keyword.fetch!(options, :connection_options)
{transport, connection_opts} = transport_from_connection_opts(connection_opts)

%Host{} =
contact_node =
case Keyword.fetch!(options, :contact_node) do
{host, port} when is_list(host) -> %Host{address: host, port: port}
{host, port} when is_tuple(host) -> %Host{address: host, port: port}
end
{contact_host, contact_port} = Keyword.fetch!(options, :contact_node)
contact_node = %Host{address: contact_host, port: contact_port}

state = %__MODULE__{
cluster_pid: Keyword.fetch!(options, :cluster_pid),
Expand Down Expand Up @@ -135,6 +131,10 @@ defmodule Xandra.Cluster.ControlConnection do

{:stop, {:shutdown, reason}}
end
catch
kind, reason ->
execute_telemetry(state, [:control_connection, :failed_to_connect], %{}, %{reason: reason})
{:stop, {:__caught__, kind, reason, __STACKTRACE__}}
end

@impl true
Expand Down Expand Up @@ -198,7 +198,12 @@ defmodule Xandra.Cluster.ControlConnection do
# A nil :protocol_version means "negotiate". A non-nil one means "enforce".
proto_vsn = Keyword.get(options, :protocol_version)

case Transport.connect(transport, host.address, host.port, @default_connect_timeout) do
case Transport.connect(
transport,
if(is_tuple(host.address), do: host.address, else: String.to_charlist(host.address)),
host.port,
@default_connect_timeout
) do
{:ok, transport} ->
state = %__MODULE__{state | transport: transport}

Expand Down
16 changes: 10 additions & 6 deletions lib/xandra/cluster/host.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Xandra.Cluster.Host do
* `:address` - the address of the host. It can be either an IP address
or a hostname. If Xandra managed to *connect* to this host, then the `:address` will
be the actual IP peer (see `:inet.peername/1`). Otherwise, the `:address` will be
the parsed IP or the charlist hostname. For example, if you pass `~c"10.0.2.1"` as
the parsed IP or the charlist hostname. For example, if you pass `"10.0.2.1"` as
the address, Xandra will normalize it to `{10, 0, 2, 1}`.
* `:port` - the port of the host.
Expand All @@ -41,7 +41,7 @@ defmodule Xandra.Cluster.Host do
"""
@typedoc since: "0.15.0"
@type t() :: %__MODULE__{
address: :inet.ip_address() | :inet.hostname(),
address: :inet.ip_address() | String.t(),
port: :inet.port_number(),
data_center: String.t(),
host_id: String.t(),
Expand Down Expand Up @@ -77,6 +77,10 @@ defmodule Xandra.Cluster.Host do
iex> Xandra.Cluster.Host.format_address(host)
"127.0.0.1:9042"
iex> host = %Xandra.Cluster.Host{address: "example.com", port: 9042}
iex> Xandra.Cluster.Host.format_address(host)
"example.com:9042"
"""
@doc since: "0.15.0"
@spec format_address(t()) :: String.t()
Expand All @@ -88,20 +92,20 @@ defmodule Xandra.Cluster.Host do

@doc false
@doc since: "0.15.0"
@spec to_peername(t()) :: {:inet.ip_address() | :inet.hostname(), :inet.port_number()}
@spec to_peername(t()) :: {:inet.ip_address() | String.t(), :inet.port_number()}
def to_peername(%__MODULE__{address: address, port: port}) do
{address, port}
end

@doc false
@doc since: "0.15.0"
@spec format_peername({:inet.ip_address() | :inet.hostname(), :inet.port_number()}) ::
@spec format_peername({:inet.ip_address() | String.t(), :inet.port_number()}) ::
String.t()
def format_peername({address, port}) do
def format_peername({address, port}) when is_integer(port) and port in 0..65_535 do
if ip_address?(address) do
"#{:inet.ntoa(address)}:#{port}"
else
"#{address}:#{port}"
address <> ":#{port}"
end
end

Expand Down
10 changes: 8 additions & 2 deletions lib/xandra/cluster/pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -640,8 +640,14 @@ defmodule Xandra.Cluster.Pool do
]

case data.control_conn_mod.start_link(control_conn_opts) do
{:ok, control_conn} -> {:ok, control_conn}
{:error, _reason} -> start_control_connection(data, hosts)
{:ok, control_conn} ->
{:ok, control_conn}

{:error, {:__caught__, kind, reason, stacktrace}} ->
:erlang.raise(kind, reason, stacktrace)

{:error, _reason} ->
start_control_connection(data, hosts)
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/xandra/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ defmodule Xandra.Connection do

# This type is just for documentation.
@type t() :: %__MODULE__{
address: term(),
address: String.t(),
atom_keys?: boolean(),
backoff: Backoff.t(),
buffer: binary(),
Expand All @@ -278,7 +278,7 @@ defmodule Xandra.Connection do
in_flight_requests: %{optional(stream_id()) => term()},
options: keyword(),
original_options: keyword(),
peername: term(),
peername: {:inet.ip_address(), :inet.port_number()},
port: term(),
prepared_cache: term(),
protocol_module: module(),
Expand Down Expand Up @@ -391,7 +391,7 @@ defmodule Xandra.Connection do
Backoff.new(Keyword.take(options, [:backoff_type, :backoff_min, :backoff_max]))
}

case Transport.connect(data.transport, data.address, data.port, data.connect_timeout) do
case Transport.connect(transport, String.to_charlist(address), port, data.connect_timeout) do
{:ok, transport} ->
{:ok, peername} = Transport.address_and_port(transport)
data = %__MODULE__{data | transport: transport, peername: peername}
Expand Down Expand Up @@ -461,7 +461,7 @@ defmodule Xandra.Connection do

def disconnected(:internal, {:failed_to_connect, reason}, %__MODULE__{} = data) do
ipfied_address =
case :inet.parse_address(data.address) do
case data.address |> String.to_charlist() |> :inet.parse_address() do
{:ok, ip} -> ip
{:error, _reason} -> data.address
end
Expand Down
16 changes: 10 additions & 6 deletions lib/xandra/options_validators.ex
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ defmodule Xandra.OptionsValidators do
{:error, "expected :authentication to be a {module, options} tuple, got: #{inspect(other)}"}
end

@spec validate_node(term()) :: {:ok, {charlist(), integer()}} | {:error, String.t()}
@spec validate_node(term()) :: {:ok, {String.t(), integer()}} | {:error, String.t()}
def validate_node(value) when is_binary(value) do
case String.split(value, ":", parts: 2) do
[address, port] ->
case Integer.parse(port) do
{port, ""} -> {:ok, {String.to_charlist(address), port}}
{port, ""} -> {:ok, {address, port}}
_ -> {:error, "invalid node: #{inspect(value)}"}
end

[address] ->
{:ok, {String.to_charlist(address), 9042}}
{:ok, {address, 9042}}
end
end

Expand All @@ -60,16 +60,20 @@ defmodule Xandra.OptionsValidators do
end
end

def validate_contact_node({hostname, port}) when is_list(hostname) do
case :inet.parse_address(hostname) do
def validate_contact_node({hostname, port}) when is_binary(hostname) do
case hostname |> String.to_charlist() |> :inet.parse_address() do
{:ok, ip} -> {:ok, {ip, port}}
{:error, :einval} -> {:ok, {hostname, port}}
end
end

def validate_contact_node({other, _port}) do
{:error,
"expected address in Host to be an IP tuple or a hostname charlist, got: #{inspect(other)}"}
"expected address in Host to be an IP tuple or a hostname string, got: #{inspect(other)}"}
end

def validate_contact_node(other) do
{:error, "expected :contact_node to be a {address_or_ip, port} tuple, got: #{inspect(other)}"}
end

@spec validate_binary(term(), atom()) :: {:ok, binary()} | {:error, String.t()}
Expand Down
1 change: 1 addition & 0 deletions test/integration/retry_strategies_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ defmodule Xandra.RetryStrategiesTest do

Process.register(self(), :all_nodes_strategy_test_pid)

start_options = Keyword.merge(start_options, sync_connect: 1000)
cluster = start_supervised!({Xandra.Cluster, start_options})

assert {:error, %Xandra.Error{reason: :invalid}} =
Expand Down
18 changes: 9 additions & 9 deletions test/integration/telemetry_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ defmodule TelemetryTest do

assert measurements == %{}
assert metadata.connection_name == :telemetry_test_connection
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
end
end
Expand All @@ -39,7 +39,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port

# Successive call to prepare uses cache.
Expand All @@ -49,7 +49,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port

assert {:ok, ^prepared} = Xandra.prepare(conn, statement, force: true)
Expand All @@ -58,7 +58,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
end

Expand All @@ -77,7 +77,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
assert metadata.extra_metadata == %{foo: :bar}
assert is_integer(system_time)
Expand All @@ -86,7 +86,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
assert metadata.extra_metadata == %{foo: :bar}
assert metadata.reprepared == false
Expand All @@ -99,7 +99,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
assert metadata.extra_metadata == %{foo: :bar}
assert metadata.reprepared == true
Expand All @@ -123,7 +123,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
assert metadata.extra_metadata == %{foo: :bar}
assert is_integer(system_time)
Expand All @@ -132,7 +132,7 @@ defmodule TelemetryTest do

assert metadata.query.statement == statement
assert metadata.connection_name == nil
assert metadata.address == ~c"127.0.0.1"
assert metadata.address == "127.0.0.1"
assert metadata.port == @port
assert metadata.extra_metadata == %{foo: :bar}
assert is_integer(duration)
Expand Down
6 changes: 3 additions & 3 deletions test/xandra/cluster/control_connection_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
refresh_topology_interval: 60_000,
autodiscovered_nodes_port: @port,
connection_options: [protocol_version: @protocol_version],
contact_node: {~c"127.0.0.1", @port}
contact_node: {"127.0.0.1", @port}
]

%{mirror_ref: mirror_ref, mirror: mirror, start_options: start_options}
Expand Down Expand Up @@ -86,7 +86,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
[:xandra, :cluster, :control_connection, :connected]
])

options = Keyword.put(start_options, :contact_node, {~c"localhost", @port})
options = Keyword.put(start_options, :contact_node, {"localhost", @port})
start_control_connection!(options)

assert_receive {^mirror_ref, {:discovered_hosts, [_local_peer]}}
Expand All @@ -102,7 +102,7 @@ defmodule Xandra.Cluster.ControlConnectionTest do
telemetry_event = [:xandra, :cluster, :control_connection, :failed_to_connect]
telemetry_ref = :telemetry_test.attach_event_handlers(self(), [telemetry_event])

options = Keyword.put(start_options, :contact_node, {~c"127.0.0.1", 9039})
options = Keyword.put(start_options, :contact_node, {"127.0.0.1", 9039})
assert {:error, _} = start_supervised({ControlConnection, options})

assert_receive {^telemetry_event, ^telemetry_ref, %{}, metadata}
Expand Down
2 changes: 1 addition & 1 deletion test/xandra/cluster/host_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ defmodule Xandra.Cluster.HostTest do
end

test "formats a hostname" do
host = %Host{address: ~c"cassandra.example.net", port: 9042}
host = %Host{address: "cassandra.example.net", port: 9042}
assert Host.format_address(host) == "cassandra.example.net:9042"
end
end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,10 @@ defmodule Xandra.Cluster.LoadBalancingPolicy.DCAwareRoundRobinTest do

defp host(address_and_port, dc) do
[address, port] = String.split(address_and_port, ":", parts: 2)
{:ok, ip} = address |> String.to_charlist() |> :inet.parse_address()

%Host{
address: address |> String.to_charlist() |> :inet.parse_address(),
address: ip,
port: String.to_integer(port),
data_center: dc
}
Expand Down
Loading

0 comments on commit 1fc8011

Please sign in to comment.