Skip to content

Commit

Permalink
Refactor a few functions into Xandra.Protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
whatyouhide committed Oct 23, 2023
1 parent 5aa2a0f commit b006240
Show file tree
Hide file tree
Showing 5 changed files with 176 additions and 436 deletions.
151 changes: 151 additions & 0 deletions lib/xandra/protocol/protocol.ex
Original file line number Diff line number Diff line change
Expand Up @@ -430,4 +430,155 @@ defmodule Xandra.Protocol do
@spec encode_paging_state(binary() | nil) :: iodata()
def encode_paging_state(value) when is_binary(value), do: [<<byte_size(value)::32>>, value]
def encode_paging_state(nil), do: []

@spec rewrite_type({atom(), list()} | atom(), keyword()) :: {atom(), list()}
def rewrite_type(type, options)

def rewrite_type({parent_type, types}, options) do
{parent_type, Enum.map(types, &rewrite_type(&1, options))}
end

def rewrite_type(:date, options) do
{:date, [Keyword.get(options, :date_format, :date)]}
end

def rewrite_type(:time, options) do
{:time, [Keyword.get(options, :time_format, :time)]}
end

def rewrite_type(:timestamp, options) do
{:timestamp, [Keyword.get(options, :timestamp_format, :datetime)]}
end

def rewrite_type(:decimal, options) do
{:decimal, [Keyword.get(options, :decimal_format, :tuple)]}
end

def rewrite_type(:uuid, options) do
{:uuid, [Keyword.get(options, :uuid_format, :string)]}
end

def rewrite_type(:timeuuid, options) do
{:timeuuid, [Keyword.get(options, :timeuuid_format, :string)]}
end

def rewrite_type(type, _options), do: type

@spec encode_batch_type(:logged | :unlogged | :counter) :: non_neg_integer()
def encode_batch_type(:logged), do: 0
def encode_batch_type(:unlogged), do: 1
def encode_batch_type(:counter), do: 2

error_codes = %{
0x0000 => :server_failure,
0x000A => :protocol_violation,
0x0100 => :invalid_credentials,
0x1000 => :unavailable,
0x1001 => :overloaded,
0x1002 => :bootstrapping,
0x1003 => :truncate_failure,
0x1100 => :write_timeout,
0x1200 => :read_timeout,
# Only present in native protocol v4+
0x1300 => :read_failure,
# Only present in native protocol v4+
0x1400 => :function_failure,
# Only present in native protocol v4+
0x1500 => :write_failure,
# Only present in native protocol v4+
0x1600 => :cdc_write_failure,
# Only present in native protocol v4+
0x1700 => :cas_write_unknown,
0x2000 => :invalid_syntax,
0x2100 => :unauthorized,
0x2200 => :invalid,
0x2300 => :invalid_config,
0x2400 => :already_exists,
0x2500 => :unprepared
}

@spec decode_error_reason(binary()) :: {atom(), binary()}
def decode_error_reason(binary)

for {code, reason} <- error_codes do
def decode_error_reason(<<unquote(code)::32-signed, buffer::bytes>>) do
{unquote(reason), buffer}
end
end

@spec decode_uuid(binary(), :binary | :string) :: binary()
def decode_uuid(value, format)

def decode_uuid(value, :binary), do: value

def decode_uuid(<<part1::32, part2::16, part3::16, part4::16, part5::48>>, :string) do
IO.iodata_to_binary([
Base.encode16(<<part1::32>>, case: :lower),
?-,
Base.encode16(<<part2::16>>, case: :lower),
?-,
Base.encode16(<<part3::16>>, case: :lower),
?-,
Base.encode16(<<part4::16>>, case: :lower),
?-,
Base.encode16(<<part5::48>>, case: :lower)
])
end

@spec encode_uuid(binary()) :: binary()
def encode_uuid(value)

def encode_uuid(value) when byte_size(value) == 16, do: value

def encode_uuid(value) when byte_size(value) == 36 do
<<
part1::8-bytes,
?-,
part2::4-bytes,
?-,
part3::4-bytes,
?-,
part4::4-bytes,
?-,
part5::12-bytes
>> = value

<<
Base.decode16!(part1, case: :mixed)::4-bytes,
Base.decode16!(part2, case: :mixed)::2-bytes,
Base.decode16!(part3, case: :mixed)::2-bytes,
Base.decode16!(part4, case: :mixed)::2-bytes,
Base.decode16!(part5, case: :mixed)::6-bytes
>>
end

@spec varint_byte_size(integer()) :: pos_integer()
def varint_byte_size(value) when value > 127, do: 1 + varint_byte_size(value >>> 8)
def varint_byte_size(value) when value < -128, do: varint_byte_size(-value - 1)
def varint_byte_size(_value), do: 1

@spec decode_paging_state(bitstring(), Xandra.Page.t(), 0 | 1) :: {Xandra.Page.t(), bitstring()}
def decode_paging_state(buffer, page, has_more_pages)

def decode_paging_state(<<buffer::bits>>, page, _has_more_pages = 0) do
{page, buffer}
end

def decode_paging_state(<<buffer::bits>>, page, _has_more_pages = 1) do
<<size::32, paging_state::size(size)-bytes, buffer::bits>> = buffer
{%{page | paging_state: paging_state}, buffer}
end

# Only supported in native protocol v4+.
# pk = partition key
@spec decode_pk_index(bitstring(), non_neg_integer()) :: {indexes :: list(), bitstring()}
def decode_pk_index(buffer, pk_count, acc \\ [])

def decode_pk_index(buffer, 0, acc) do
{Enum.reverse(acc), buffer}
end

def decode_pk_index(<<index::16-unsigned, buffer::bits>>, pk_count, acc) do
decode_pk_index(buffer, pk_count - 1, [index | acc])
end
end
143 changes: 7 additions & 136 deletions lib/xandra/protocol/v3.ex
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
defmodule Xandra.Protocol.V3 do
@moduledoc false

import Bitwise

import Xandra.Protocol,
only: [decode_from_proto_type: 2, decode_from_proto_type: 3, encode_to_type: 2, is_decimal: 1]

Expand Down Expand Up @@ -100,7 +98,7 @@ defmodule Xandra.Protocol.V3 do
encoded_queries = [<<length(queries)::16>>] ++ Enum.map(queries, &encode_query_in_batch/1)

body = [
encode_batch_type(type),
Proto.encode_batch_type(type),
encoded_queries,
encode_to_type(consistency, "[consistency]"),
flags,
Expand All @@ -120,10 +118,6 @@ defmodule Xandra.Protocol.V3 do
%Frame{frame | body: Proto.encode_event(event)}
end

defp encode_batch_type(:logged), do: 0
defp encode_batch_type(:unlogged), do: 1
defp encode_batch_type(:counter), do: 2

defp encode_params(columns, values, options, default_consistency, skip_metadata?) do
consistency = Keyword.get(options, :consistency, default_consistency)
page_size = Keyword.get(options, :page_size, 10_000)
Expand Down Expand Up @@ -347,90 +341,22 @@ defmodule Xandra.Protocol.V3 do
end

defp encode_value(type, value) when type in [:uuid, :timeuuid] and is_binary(value) do
case byte_size(value) do
16 ->
value

36 ->
<<
part1::8-bytes,
?-,
part2::4-bytes,
?-,
part3::4-bytes,
?-,
part4::4-bytes,
?-,
part5::12-bytes
>> = value

<<
decode_base16(part1)::4-bytes,
decode_base16(part2)::2-bytes,
decode_base16(part3)::2-bytes,
decode_base16(part4)::2-bytes,
decode_base16(part5)::6-bytes
>>
end
Proto.encode_uuid(value)
end

defp encode_value(type, value) when type in [:varchar, :text] and is_binary(value) do
value
end

defp encode_value(:varint, value) when is_integer(value) do
size = varint_byte_size(value)
size = Proto.varint_byte_size(value)
<<value::size(size)-unit(8)>>
end

defp encode_value({:tuple, types}, value) when length(types) == tuple_size(value) do
for {type, item} <- Enum.zip(types, Tuple.to_list(value)), do: encode_query_value(type, item)
end

defp varint_byte_size(value) when value > 127 do
1 + varint_byte_size(value >>> 8)
end

defp varint_byte_size(value) when value < -128 do
varint_byte_size(-value - 1)
end

defp varint_byte_size(_value), do: 1

@compile {:inline, decode_base16: 1}
defp decode_base16(value) do
Base.decode16!(value, case: :mixed)
end

@compile {:inline, encode_base16: 1}
defp encode_base16(value) do
Base.encode16(value, case: :lower)
end

error_codes = %{
0x0000 => :server_failure,
0x000A => :protocol_violation,
0x0100 => :invalid_credentials,
0x1000 => :unavailable,
0x1001 => :overloaded,
0x1002 => :bootstrapping,
0x1003 => :truncate_failure,
0x1100 => :write_timeout,
0x1200 => :read_timeout,
0x2000 => :invalid_syntax,
0x2100 => :unauthorized,
0x2200 => :invalid,
0x2300 => :invalid_config,
0x2400 => :already_exists,
0x2500 => :unprepared
}

for {code, reason} <- error_codes do
defp decode_error_reason(<<unquote(code)::32-signed, buffer::bytes>>) do
{unquote(reason), buffer}
end
end

defp decode_error_message(_reason, buffer) do
decode_from_proto_type(message <- buffer, "[string]")
_ = buffer
Expand All @@ -445,7 +371,7 @@ defmodule Xandra.Protocol.V3 do
def decode_response(frame, query \\ nil, options \\ [])

def decode_response(%Frame{kind: :error, body: body}, _query, _options) do
{reason, buffer} = decode_error_reason(body)
{reason, buffer} = Proto.decode_error_reason(body)
# Warnings are not supported in native protocol v3.
Error.new(reason, decode_error_message(reason, buffer), _warnings = [])
end
Expand Down Expand Up @@ -544,40 +470,10 @@ defmodule Xandra.Protocol.V3 do

defp rewrite_column_types(columns, options) do

Check warning on line 471 in lib/xandra/protocol/v3.ex

View workflow job for this annotation

GitHub Actions / Lint

unused_fun

Function rewrite_column_types/2 will never be called.
Enum.map(columns, fn {_, _, _, type} = column ->
put_elem(column, 3, rewrite_type(type, options))
put_elem(column, 3, Proto.rewrite_type(type, options))
end)
end

defp rewrite_type({parent_type, types}, options) do
{parent_type, Enum.map(types, &rewrite_type(&1, options))}
end

defp rewrite_type(:date, options) do
{:date, [Keyword.get(options, :date_format, :date)]}
end

defp rewrite_type(:time, options) do
{:time, [Keyword.get(options, :time_format, :time)]}
end

defp rewrite_type(:timestamp, options) do
{:timestamp, [Keyword.get(options, :timestamp_format, :datetime)]}
end

defp rewrite_type(:decimal, options) do
{:decimal, [Keyword.get(options, :decimal_format, :tuple)]}
end

defp rewrite_type(:uuid, options) do
{:uuid, [Keyword.get(options, :uuid_format, :string)]}
end

defp rewrite_type(:timeuuid, options) do
{:timeuuid, [Keyword.get(options, :timeuuid_format, :string)]}
end

defp rewrite_type(type, _options), do: type

defp decode_change_options(<<buffer::bits>>, "KEYSPACE") do
decode_from_proto_type(keyspace <- buffer, "[string]")

Expand Down Expand Up @@ -613,7 +509,7 @@ defmodule Xandra.Protocol.V3 do
atom_keys?
) do
<<_::29, no_metadata::1, has_more_pages::1, global_table_spec::1>> = flags
{page, buffer} = decode_paging_state(buffer, page, has_more_pages)
{page, buffer} = Proto.decode_paging_state(buffer, page, has_more_pages)

Check warning on line 512 in lib/xandra/protocol/v3.ex

View workflow job for this annotation

GitHub Actions / Lint

call

The function call decode_paging_state will not succeed.

cond do
no_metadata == 1 ->
Expand All @@ -634,15 +530,6 @@ defmodule Xandra.Protocol.V3 do
end
end

defp decode_paging_state(<<buffer::bits>>, page, 0) do
{page, buffer}
end

defp decode_paging_state(<<buffer::bits>>, page, 1) do
<<size::32, paging_state::size(size)-bytes, buffer::bits>> = buffer
{%{page | paging_state: paging_state}, buffer}
end

defp decode_page_content(<<row_count::32-signed, buffer::bits>>, columns) do

Check warning on line 533 in lib/xandra/protocol/v3.ex

View workflow job for this annotation

GitHub Actions / Lint

unused_fun

Function decode_page_content/2 will never be called.
decode_page_content(buffer, row_count, columns, columns, [[]])
end
Expand Down Expand Up @@ -712,23 +599,7 @@ defmodule Xandra.Protocol.V3 do

defp decode_value(<<value::16-bytes>>, {uuid_type, [format]})
when uuid_type in [:uuid, :timeuuid] do
case format do
:binary ->
value

:string ->
<<part1::32, part2::16, part3::16, part4::16, part5::48>> = value

encode_base16(<<part1::32>>) <>
"-" <>
encode_base16(<<part2::16>>) <>
"-" <>
encode_base16(<<part3::16>>) <>
"-" <>
encode_base16(<<part4::16>>) <>
"-" <>
encode_base16(<<part5::48>>)
end
Proto.decode_uuid(value, format)
end

defp decode_value(<<scale::32-signed, data::bits>>, {:decimal, [format]}) do
Expand Down
Loading

0 comments on commit b006240

Please sign in to comment.