Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Major refactor #1

Open
wants to merge 7 commits into
base: db_connection_2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions bench/query_performance.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,13 @@ Benchee.run(%{
end,
"Insert nullable null" => fn ->
insert.("nullable_u64_val", nil)
end,
"Insert date" => fn ->
insert.("date_val", date)
end,
"Insert datetime" => fn ->
insert.("datetime_val", date_time)
end
# "Insert date" => fn ->
# {:ok, _, _} = CH.query(client, "INSERT INTO #{table} (date_val) VALUES (?)", [date])
# end,
# "Insert datetime" => fn ->
# {:ok, _, _} = CH.query(client, "INSERT INTO #{table} (datetime_val) VALUES (?)", [date_time])
# end
})

Benchee.run(%{
Expand All @@ -74,6 +74,13 @@ Benchee.run(%{
"Select strings" => fn ->
select.("string_val", "5050505050")
end,
"selecting nulls" => fn ->
select.("nullable_u64_val", nil)
end,
"selecting non null" => fn ->
{:ok, _, _} =
CH.query(client, "SELECT * from #{table} WHERE nullable_u64_val IS NOT NULL", [])
end,
"selecting all" => fn ->
{:ok, _, _} = CH.query(client, "SELECT * from #{table}", [])
end
Expand Down
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
use Mix.Config

config :clickhousex, codec: Clickhousex.Codec.RowBinary
1 change: 1 addition & 0 deletions lib/clickhousex.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ defmodule Clickhousex do

@spec start_link(Keyword.t()) :: {:ok, pid} | {:error, term}
def start_link(opts \\ []) do
opts = Keyword.put(opts, :show_sensitive_data_on_connection_error, true)
DBConnection.start_link(Clickhousex.Protocol, opts)
end

Expand Down
12 changes: 12 additions & 0 deletions lib/clickhousex/codec.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
defmodule Clickhousex.Codec do
@type row :: tuple
@type query :: Clickhousex.Query.t()
@type param :: any
@type param_replacements :: iodata
@type select_response :: %{column_names: [String.t()], rows: [row], row_count: non_neg_integer}

@callback response_format() :: String.t()
@callback request_format() :: String.t()
@callback decode(any) :: {:ok, select_response} | {:error, any}
@callback encode(query, param_replacements, [param]) :: iodata
end
182 changes: 182 additions & 0 deletions lib/clickhousex/codec/binary.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
defmodule Clickhousex.Codec.Binary do
@compile {:bin_opt_info, true}
use Bitwise

def encode(:varint, num) when num < 128, do: <<num>>
def encode(:varint, num), do: <<1::1, num::7, encode(:varint, num >>> 7)::binary>>

def encode(:string, str) when is_bitstring(str) do
[encode(:varint, byte_size(str)), str]
end

def encode(:u8, i) when is_integer(i) do
<<i::little-unsigned-size(8)>>
end

def encode(:u16, i) do
<<i::little-unsigned-size(16)>>
end

def encode(:u32, i) do
<<i::little-unsigned-size(32)>>
end

def encode(:u64, i) do
<<i::little-unsigned-size(64)>>
end

def encode(:i8, i) do
<<i::little-signed-size(8)>>
end

def encode(:i16, i) do
<<i::little-signed-size(16)>>
end

def encode(:i32, i) do
<<i::little-signed-size(32)>>
end

def encode(:i64, i) do
<<i::little-signed-size(64)>>
end

def encode(:boolean, true) do
encode(:u8, 1)
end

def encode(:boolean, false) do
encode(:u8, 0)
end

def decode(bytes, {:nullable, type}) do
case decode(bytes, :u8) do
{:ok, 0, rest} -> decode(rest, type)
{:ok, 1, rest} -> {:ok, nil, rest}
end
end

def decode(bytes, :struct, struct_module) do
decode_struct(bytes, struct_module.decode_spec(), struct(struct_module))
end

def decode(bytes, :varint) do
decode_varint(bytes, 0, 0)
end

def decode(bytes, :string) do
with {:ok, byte_count, rest} <- decode(bytes, :varint),
true <- byte_size(rest) >= byte_count do
<<decoded_str::binary-size(byte_count), rest::binary>> = rest
{:ok, decoded_str, rest}
end
end

def decode(<<1::little-unsigned-size(8), rest::binary>>, :boolean) do
{:ok, true, rest}
end

def decode(<<0::little-unsigned-size(8), rest::binary>>, :boolean) do
{:ok, false, rest}
end

def decode(bytes, {:list, data_type}) do
{:ok, count, rest} = decode(bytes, :varint)
decode_list(rest, data_type, count, [])
end

def decode(<<decoded::little-signed-size(64), rest::binary>>, :i64) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(32), rest::binary>>, :i32) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(16), rest::binary>>, :i16) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(8), rest::binary>>, :i8) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(64), rest::binary>>, :u64) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(32), rest::binary>>, :u32) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(16), rest::binary>>, :u16) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-size(8), rest::binary>>, :u8) do
{:ok, decoded, rest}
end

def decode(<<days_since_epoch::little-unsigned-size(16), rest::binary>>, :date) do
{:ok, date} = Date.new(1970, 01, 01)
date = Date.add(date, days_since_epoch)

{:ok, date, rest}
end

def decode(<<seconds_since_epoch::little-unsigned-size(32), rest::binary>>, :datetime) do
{:ok, date_time} = NaiveDateTime.new(1970, 1, 1, 0, 0, 0)
date_time = NaiveDateTime.add(date_time, seconds_since_epoch)

{:ok, date_time, rest}
end

def decode(<<0, rest::binary>>, :boolean) do
{:ok, false, rest}
end

def decode(<<1, rest::binary>>, :boolean) do
{:ok, true, rest}
end

def decode(<<decoded::little-signed-float-size(64), rest::binary>>, :f64) do
{:ok, decoded, rest}
end

def decode(<<decoded::little-signed-float-size(32), rest::binary>>, :f32) do
{:ok, decoded, rest}
end

defp decode_list(rest, _, 0, accum) do
{:ok, Enum.reverse(accum), rest}
end

defp decode_list(bytes, data_type, count, accum) do
case decode(bytes, data_type) do
{:ok, decoded, rest} -> decode_list(rest, data_type, count - 1, [decoded | accum])
other -> other
end
end

defp decode_varint(<<0::size(1), byte::size(7), rest::binary>>, result, shift) do
{:ok, result ||| byte <<< shift, rest}
end

defp decode_varint(<<1::1, byte::7, rest::binary>>, result, shift) do
decode_varint(rest, result ||| byte <<< shift, shift + 7)
end

defp decode_struct(rest, [], struct) do
{:ok, struct, rest}
end

defp decode_struct(rest, [{field_name, type} | specs], struct) do
case decode(rest, type) do
{:ok, decoded, rest} ->
decode_struct(rest, specs, Map.put(struct, field_name, decoded))

{:error, _} = err ->
err
end
end
end
101 changes: 101 additions & 0 deletions lib/clickhousex/codec/json.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
defmodule Clickhousex.Codec.JSON do
@behaviour Clickhousex.Codec

defdelegate encode(query, replacements, params), to: Clickhousex.Codec.Values

@impl Clickhousex.Codec
def request_format do
"Values"
end

@impl Clickhousex.Codec
def response_format do
"JSONCompact"
end

@impl Clickhousex.Codec
def decode(response) do
case Jason.decode(response) do
{:ok, %{"meta" => meta, "data" => data, "rows" => row_count}} ->
column_names = Enum.map(meta, & &1["name"])
column_types = Enum.map(meta, & &1["type"])

rows =
for row <- data do
for {raw_value, column_type} <- Enum.zip(row, column_types) do
to_native(column_type, raw_value)
end
|> List.to_tuple()
end

{:ok, %{column_names: column_names, rows: rows, count: row_count}}
end
end

defp to_native(_, nil) do
nil
end

defp to_native(<<"Nullable(", type::binary>>, value) do
type = String.replace_suffix(type, ")", "")
to_native(type, value)
end

defp to_native(<<"Array(", type::binary>>, value) do
type = String.replace_suffix(type, ")", "")
Enum.map(value, &to_native(type, &1))
end

defp to_native("Float" <> _, value) when is_integer(value) do
1.0 * value
end

defp to_native("Int64", value) do
String.to_integer(value)
end

defp to_native("Date", value) do
{:ok, date} = to_date(value)
date
end

defp to_native("DateTime", value) do
[date, time] = String.split(value, " ")

with {:ok, date} <- to_date(date),
{:ok, time} <- to_time(time),
{:ok, naive} <- NaiveDateTime.new(date, time) do
naive
end
end

defp to_native("UInt" <> _, value) when is_bitstring(value) do
String.to_integer(value)
end

defp to_native("Int" <> _, value) when is_bitstring(value) do
String.to_integer(value)
end

defp to_native(_, value) do
value
end

defp to_date(date_string) do
[year, month, day] =
date_string
|> String.split("-")
|> Enum.map(&String.to_integer/1)

Date.new(year, month, day)
end

defp to_time(time_string) do
[h, m, s] =
time_string
|> String.split(":")
|> Enum.map(&String.to_integer/1)

Time.new(h, m, s)
end
end
Loading