Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 45 additions & 105 deletions lib/realtime/adapters/postgres/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -136,44 +136,36 @@ defmodule Realtime.Adapters.Postgres.Decoder do

@pg_epoch DateTime.from_iso8601("2000-01-01T00:00:00Z")

alias Messages.{
Begin,
Commit,
Origin,
Relation,
Relation.Column,
Insert,
Update,
Delete,
Truncate,
Type,
Unsupported
}
alias Messages.Begin
alias Messages.Commit
alias Messages.Origin
alias Messages.Relation
alias Messages.Relation.Column
alias Messages.Insert
alias Messages.Type
alias Messages.Unsupported

alias Realtime.Adapters.Postgres.OidDatabase

@doc """
Parses logical replication messages from Postgres

## Examples

iex> decode_message(<<73, 0, 0, 96, 0, 78, 0, 2, 116, 0, 0, 0, 3, 98, 97, 122, 116, 0, 0, 0, 3, 53, 54, 48>>)
%Realtime.Adapters.Postgres.Decoder.Messages.Insert{relation_id: 24576, tuple_data: {"baz", "560"}}

"""
def decode_message(message) when is_binary(message) do
decode_message_impl(message)
def decode_message(message, relations) when is_binary(message) do
decode_message_impl(message, relations)
end

defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>) do
defp decode_message_impl(<<"B", lsn::binary-8, timestamp::integer-64, xid::integer-32>>, _relations) do
%Begin{
final_lsn: decode_lsn(lsn),
commit_timestamp: pgtimestamp_to_timestamp(timestamp),
xid: xid
}
end

defp decode_message_impl(<<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>>) do
defp decode_message_impl(
<<"C", _flags::binary-1, lsn::binary-8, end_lsn::binary-8, timestamp::integer-64>>,
_relations
) do
%Commit{
flags: [],
lsn: decode_lsn(lsn),
Expand All @@ -183,14 +175,14 @@ defmodule Realtime.Adapters.Postgres.Decoder do
end

# TODO: Verify this is correct with real data from Postgres
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>) do
defp decode_message_impl(<<"O", lsn::binary-8, name::binary>>, _relations) do
%Origin{
origin_commit_lsn: decode_lsn(lsn),
name: name
}
end

defp decode_message_impl(<<"R", id::integer-32, rest::binary>>) do
defp decode_message_impl(<<"R", id::integer-32, rest::binary>>, _relations) do
[
namespace
| [name | [<<replica_identity::binary-1, _number_of_columns::integer-16, columns::binary>>]]
Expand All @@ -214,70 +206,17 @@ defmodule Realtime.Adapters.Postgres.Decoder do
}
end

defp decode_message_impl(<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
end

defp decode_message_impl(<<"U", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>) do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

%Update{relation_id: relation_id, tuple_data: decoded_tuple_data}
end

defp decode_message_impl(
<<"U", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, tuple_data::binary>>
)
when key_or_old == "O" or key_or_old == "K" do
{<<"N", new_number_of_columns::integer-16, new_tuple_binary::binary>>, old_decoded_tuple_data} =
decode_tuple_data(tuple_data, number_of_columns)

{<<>>, decoded_tuple_data} = decode_tuple_data(new_tuple_binary, new_number_of_columns)

base_update_msg = %Update{relation_id: relation_id, tuple_data: decoded_tuple_data}

case key_or_old do
"K" -> Map.put(base_update_msg, :changed_key_tuple_data, old_decoded_tuple_data)
"O" -> Map.put(base_update_msg, :old_tuple_data, old_decoded_tuple_data)
end
end

defp decode_message_impl(
<<"D", relation_id::integer-32, key_or_old::binary-1, number_of_columns::integer-16, tuple_data::binary>>
)
when key_or_old == "K" or key_or_old == "O" do
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns)

base_delete_msg = %Delete{relation_id: relation_id}

case key_or_old do
"K" -> Map.put(base_delete_msg, :changed_key_tuple_data, decoded_tuple_data)
"O" -> Map.put(base_delete_msg, :old_tuple_data, decoded_tuple_data)
end
end

defp decode_message_impl(<<"T", number_of_relations::integer-32, options::integer-8, column_ids::binary>>) do
truncated_relations =
for relation_id_bin <- column_ids |> :binary.bin_to_list() |> Enum.chunk_every(4),
do: relation_id_bin |> :binary.list_to_bin() |> :binary.decode_unsigned()

decoded_options =
case options do
0 -> []
1 -> [:cascade]
2 -> [:restart_identity]
3 -> [:cascade, :restart_identity]
end
<<"I", relation_id::integer-32, "N", number_of_columns::integer-16, tuple_data::binary>>,
relations
) do
relation = relations |> Map.get(relation_id) |> Map.get(:columns)
{<<>>, decoded_tuple_data} = decode_tuple_data(tuple_data, number_of_columns, relation)

%Truncate{
number_of_relations: number_of_relations,
options: decoded_options,
truncated_relations: truncated_relations
}
%Insert{relation_id: relation_id, tuple_data: decoded_tuple_data}
end

defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>) do
defp decode_message_impl(<<"Y", data_type_id::integer-32, namespace_and_name::binary>>, _relations) do
[namespace, name_with_null] = :binary.split(namespace_and_name, <<0>>)
name = String.slice(name_with_null, 0..-2//1)

Expand All @@ -288,52 +227,53 @@ defmodule Realtime.Adapters.Postgres.Decoder do
}
end

defp decode_message_impl(binary), do: %Unsupported{data: binary}
defp decode_message_impl(binary, _relations), do: %Unsupported{data: binary}

defp decode_tuple_data(binary, columns_remaining, accumulator \\ [])
defp decode_tuple_data(binary, columns_remaining, relations, accumulator \\ [])

defp decode_tuple_data(remaining_binary, 0, accumulator) when is_binary(remaining_binary),
defp decode_tuple_data(remaining_binary, 0, _relations, accumulator) when is_binary(remaining_binary),
do: {remaining_binary, accumulator |> Enum.reverse() |> List.to_tuple()}

defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, [nil | accumulator])
defp decode_tuple_data(<<"n", rest::binary>>, columns_remaining, [_ | relations], accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, relations, [nil | accumulator])

defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, [:unchanged_toast | accumulator])
defp decode_tuple_data(<<"u", rest::binary>>, columns_remaining, [_ | relations], accumulator),
do: decode_tuple_data(rest, columns_remaining - 1, relations, [:unchanged_toast | accumulator])

@start_date "2000-01-01T00:00:00Z"
defp decode_tuple_data(
<<"b", column_length::integer-32, rest::binary>>,
columns_remaining,
[%Column{type: type} | relations],
accumulator
) do
data = :erlang.binary_part(rest, {0, column_length})
remainder = :erlang.binary_part(rest, {byte_size(rest), -(byte_size(rest) - column_length)})

data =
case data do
<<1>> ->
true
case type do
"bool" ->
data == <<1>>

<<0>> ->
false
"jsonb" ->
<<1, rest::binary>> = data
rest

<<uuid_binary::binary-16>> ->
UUID.binary_to_string!(uuid_binary)
"timestamp" ->
<<microseconds::signed-big-64>> = data

<<microseconds::signed-big-64>> ->
@start_date
|> NaiveDateTime.from_iso8601!()
|> NaiveDateTime.add(microseconds, :microsecond)

<<1, binary::binary-size(column_length - 1)>> ->
binary

data when is_binary(data) ->
"text" ->
data

"uuid" ->
UUID.binary_to_string!(data)
end

decode_tuple_data(remainder, columns_remaining - 1, [data | accumulator])
decode_tuple_data(remainder, columns_remaining - 1, relations, [data | accumulator])
end

defp decode_columns(binary, accumulator \\ [])
Expand Down
17 changes: 9 additions & 8 deletions lib/realtime/tenants/replication_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,8 +267,7 @@ defmodule Realtime.Tenants.ReplicationConnection do

def handle_data(data, state) when is_write(data) do
%Write{message: message} = parse(data)
message |> decode_message() |> then(&send(self(), &1))
{:noreply, [], state}
message |> decode_message(state.relations) |> then(&handle_message(&1, state))
end

def handle_data(e, state) do
Expand All @@ -277,12 +276,16 @@ defmodule Realtime.Tenants.ReplicationConnection do
end

@impl true
def handle_info(%Decoder.Messages.Begin{commit_timestamp: commit_timestamp}, state) do

def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :shutdown}
def handle_info(_, state), do: {:noreply, state}

defp handle_message(%Decoder.Messages.Begin{commit_timestamp: commit_timestamp}, state) do
latency_committed_at = NaiveDateTime.utc_now() |> NaiveDateTime.diff(commit_timestamp, :millisecond)
{:noreply, %{state | latency_committed_at: latency_committed_at}}
end

def handle_info(%Decoder.Messages.Relation{} = msg, state) do
defp handle_message(%Decoder.Messages.Relation{} = msg, state) do
%Decoder.Messages.Relation{id: id, namespace: namespace, name: name, columns: columns} = msg
%{relations: relations} = state
relation = %{name: name, columns: columns, namespace: namespace}
Expand All @@ -298,7 +301,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:noreply, state}
end

def handle_info(%Decoder.Messages.Insert{} = msg, state) do
defp handle_message(%Decoder.Messages.Insert{} = msg, state) do
%Decoder.Messages.Insert{relation_id: relation_id, tuple_data: tuple_data} = msg
%{relations: relations, tenant_id: tenant_id, latency_committed_at: latency_committed_at} = state

Expand Down Expand Up @@ -351,9 +354,7 @@ defmodule Realtime.Tenants.ReplicationConnection do
{:noreply, state}
end

def handle_info({:DOWN, _, :process, _, _}, _), do: {:disconnect, :shutdown}
def handle_info(_, state), do: {:noreply, state}

defp handle_message(_, state), do: {:noreply, state}
@impl true
def handle_disconnect(state) do
Logger.warning("Disconnecting broadcast changes handler in the step : #{inspect(state.step)}")
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Realtime.MixProject do
def project do
[
app: :realtime,
version: "2.65.2",
version: "2.65.3",
elixir: "~> 1.18",
elixirc_paths: elixirc_paths(Mix.env()),
start_permanent: Mix.env() == :prod,
Expand Down
Loading