Skip to content

Commit

Permalink
feat: Release (#449)
Browse files Browse the repository at this point in the history
* fix: don't update syn meta if the node is mismatched (#433)

* update multiplayer demo MAX_ROOM_USERS to 50

* fix: drop LHR and GRU (#436)

* feat: footer improvements (#429)

* feat: display the count of connected tenants in metrics (#418)

* fix: don't try to stop the not alive pid in the resolver (#417)

* fix: increase read replica pool size

* feat: default realtime-dev JWT secret to API_JWT_SECRET env var value

* chore(deps): bump json5 from 1.0.1 to 1.0.2 in /demo

Bumps [json5](https://github.com/json5/json5) from 1.0.1 to 1.0.2.
- [Release notes](https://github.com/json5/json5/releases)
- [Changelog](https://github.com/json5/json5/blob/main/CHANGELOG.md)
- [Commits](json5/json5@v1.0.1...v1.0.2)

---
updated-dependencies:
- dependency-name: json5
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* feat: default Repo with_dynamic_repo name to nil and pool_size to 1 (#423)

* fix: looks for the PR number after # symbol (#425)

* Add defaults to current fields to reflect db schema

* Add max_bytes_per_second and max_channels_per_client fields

* Add max_joins_per_second to schema

* Add db limits to socket

* Use db limits in channel

* fix: tests

* fix: do not block a registration process (#426)

* fix: add http checks

* feat: stop connection to a tenant's db if there are no connected users for 10 minutes. (#428)

* feat: add short_node_id helper

* Use new helper in Realtime.PromEx

* feat: add version to footer and use short node id

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: abc3 <sts@abc3.dev>
Co-authored-by: Wen Bo Xie <wenbo.xie3@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Wen Bo Xie <5532241+w3b6x9@users.noreply.github.com>

* fix: switch to aws roles

* chore: update staging branch to main and prod branch to release

* Add sjc replica (#441)

* fix: load sjc replica from DB_HOST_REPLICA_SJC env var (#443)

* Version is compiled now (#444)

* Bump version

* fix: replace replication poller process sleep with send_after (#447)

* feat: show latency on db events on inspector (#450)

* Show latency on db events on inspector

* Round to seconds for now

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: abc3 <sts@abc3.dev>
Co-authored-by: Chase Granberry <chase@logflare.app>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Inian <inian1234@gmail.com>
  • Loading branch information
5 people committed Jan 19, 2023
1 parent 9280db3 commit f6bb0c2
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 41 deletions.
18 changes: 15 additions & 3 deletions assets/js/app.js
Expand Up @@ -27,10 +27,11 @@ Hooks.payload = {
if (payload.extension === 'postgres_changes' && payload.status === 'ok') {
this.pushEventTo("#conn_info", "postgres_subscribed", {})
}

let ts = new Date();
let line =
`<tr class="bg-white border-b hover:bg-gray-50">
<td class="py-4 px-6">SYSTEM</td>
<td class="py-4 px-6">${ts.toISOString()}</td>
<td class="py-4 px-6">${JSON.stringify(payload)}</td>
</tr>`
let list = document.querySelector("#plist")
Expand All @@ -41,9 +42,11 @@ Hooks.payload = {
// The event name can by anything
// Match on specific event names to filter for only those types of events and do something with them
this.channel.on("broadcast", { event: "*" }, payload => {
let ts = new Date();
let line =
`<tr class="bg-white border-b hover:bg-gray-50">
<td class="py-4 px-6">BROADCAST</td>
<td class="py-4 px-6">${ts.toISOString()}</td>
<td class="py-4 px-6">${JSON.stringify(payload)}</td>
</tr>`
let list = document.querySelector("#plist")
Expand All @@ -53,9 +56,11 @@ Hooks.payload = {
// Listen for all (`*`) `presence` events
this.channel.on("presence", { event: "*" }, payload => {
this.pushEventTo("#conn_info", "presence_subscribed", {})
let line =
let ts = new Date();
let line =
`<tr class="bg-white border-b hover:bg-gray-50">
<td class="py-4 px-6">PRESENCE</td>
<td class="py-4 px-6">${ts.toISOString()}</td>
<td class="py-4 px-6">${JSON.stringify(payload)}</td>
</tr>`
let list = document.querySelector("#plist")
Expand All @@ -64,10 +69,17 @@ Hooks.payload = {

// Listen for all (`*`) `postgres_changes` events on tables in the `public` schema
this.channel.on("postgres_changes", { event: "*", schema: schema, table: table }, payload => {
let ts = new Date();
let payload_ts = Date.parse(payload.commit_timestamp)
let latency = (ts.getTime() / 1000) - (payload_ts / 1000)
let line =
`<tr class="bg-white border-b hover:bg-gray-50">
<td class="py-4 px-6">POSTGRES</td>
<td class="py-4 px-6">${JSON.stringify(payload)}</td>
<td class="py-4 px-6">${ts.toISOString()}</td>
<td class="py-4 px-6">
<div class="pb-3">${JSON.stringify(payload)}</div>
<div class="pt-3 border-t hover:bg-gray-50">Latency: ${Math.round(latency)} sec</div>
</td>
</tr>`
let list = document.querySelector("#plist")
list.innerHTML = line + list.innerHTML;
Expand Down
70 changes: 38 additions & 32 deletions lib/extensions/postgres_cdc_rls/replication_poller.ex
Expand Up @@ -43,7 +43,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
backoff:
Backoff.new(
backoff_min: 100,
backoff_max: 120_000,
backoff_max: 5_000,
backoff_type: :rand_exp
),
conn: conn,
Expand All @@ -56,37 +56,21 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
max_changes: args["poll_max_changes"],
max_record_bytes: args["poll_max_record_bytes"],
poll_interval_ms: args["poll_interval_ms"],
poll_ref: make_ref(),
poll_ref: nil,
publication: args["publication"],
retry_ref: nil,
slot_name: args["slot_name"] <> slot_name_suffix(),
tenant: args["id"]
}

Logger.metadata(external_id: state.tenant, project: state.tenant)

{:ok, state, {:continue, :prepare_replication}}
{:ok, state, {:continue, :prepare}}
end

@impl true
def handle_continue(
:prepare_replication,
%{
backoff: backoff,
conn: conn,
slot_name: slot_name
} = state
) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
{:noreply, state}

{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
Process.sleep(timeout)
{:noreply, %{state | backoff: backoff}, {:continue, :prepare_replication}}
end
def handle_continue(:prepare, state) do
{:noreply, prepare_replication(state)}
end

@impl true
Expand All @@ -97,6 +81,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
poll_interval_ms: poll_interval_ms,
poll_ref: poll_ref,
publication: publication,
retry_ref: retry_ref,
slot_name: slot_name,
max_record_bytes: max_record_bytes,
max_changes: max_changes,
Expand All @@ -105,6 +90,7 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
} = state
) do
cancel_timer(poll_ref)
cancel_timer(retry_ref)

try do
Replications.list_changes(conn, slot_name, publication, max_changes, max_record_bytes)
Expand Down Expand Up @@ -167,12 +153,18 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
Logger.error("Error polling replication: #{inspect(reason, pretty: true)}")

{timeout, backoff} = Backoff.backoff(backoff)
Process.sleep(timeout)
retry_ref = Process.send_after(self(), :retry, timeout)

{:noreply, %{state | backoff: backoff}, {:continue, :prepare_replication}}
{:noreply, %{state | backoff: backoff, retry_ref: retry_ref}}
end
end

@impl true
def handle_info(:retry, %{retry_ref: retry_ref} = state) do
cancel_timer(retry_ref)
{:noreply, prepare_replication(state)}
end

def generate_record([
{"wal",
%{
Expand Down Expand Up @@ -248,11 +240,22 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do

def generate_record(_), do: nil

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
end
end

defp convert_errors([_ | _] = errors), do: errors

defp convert_errors(_), do: nil

def connect_db(host, port, name, user, pass, socket_opts) do
defp connect_db(host, port, name, user, pass, socket_opts) do
{host, port, name, user, pass} = decrypt_creds(host, port, name, user, pass)

Postgrex.start_link(
Expand All @@ -269,14 +272,17 @@ defmodule Extensions.PostgresCdcRls.ReplicationPoller do
)
end

def slot_name_suffix() do
case System.get_env("SLOT_NAME_SUFFIX") do
nil ->
""
defp prepare_replication(%{backoff: backoff, conn: conn, slot_name: slot_name} = state) do
case Replications.prepare_replication(conn, slot_name) do
{:ok, _} ->
send(self(), :poll)
state

value ->
Logger.debug("Using slot name suffix: " <> value)
"_" <> value
{:error, error} ->
Logger.error("Prepare replication error: #{inspect(error)}")
{timeout, backoff} = Backoff.backoff(backoff)
retry_ref = Process.send_after(self(), :retry, timeout)
%{state | backoff: backoff, retry_ref: retry_ref}
end
end
end
13 changes: 7 additions & 6 deletions lib/realtime_web/live/inspector_live/index.html.heex
Expand Up @@ -20,7 +20,7 @@
/>
</.modal>
<% end %>

<div id="conn_info" class="mb-5">
<%= if @broadcast_subscribed do %>
<p>Connected to <code><%= @connected_to %></code></p>
Expand Down Expand Up @@ -59,10 +59,10 @@
<aside class="w-full sm:w-1/3 md:w-1/4 pr-2 mb-8">
<.h3>Broadcast an Event</.h3>
<div class="mb-6">
<.form
<.form
id="message_form"
:let={m}
for={@changeset}
:let={m}
for={@changeset}
class="bg-white rounded mt-4"
phx-submit="send_message"
>
Expand Down Expand Up @@ -109,7 +109,8 @@
<table class="table-fixed w-full text-md text-left text-gray-700">
<thead class="text-xs text-gray-700 uppercase bg-gray-50">
<tr>
<th scope="col" class="w-36 py-3 px-6">Extension</th>
<th scope="col" class="w-32 py-3 px-6">Extension</th>
<th scope="col" class="w-64 py-3 px-6">Timestamp</th>
<th scope="col" class="py-3 px-6">Payload</th>
</tr>
</thead>
Expand All @@ -118,4 +119,4 @@
</table>
</div>
</div>
</div>
</div>

0 comments on commit f6bb0c2

Please sign in to comment.