Skip to content

Commit

Permalink
Feat: simpler callback registration directly on SocketState
Browse files Browse the repository at this point in the history
Problem: 
- current :telemetry inspired way to handle live query callbacks makes handling reconnections much harder, than it should be
- also we have quite some modules just for this feature

Solution: 
- just store the callback function directly in SocketState struct
- reconnection becomes much simpler, since all the info is directly there, and we also do not have to deal with garbage cleanup on process termination in a manual way
  • Loading branch information
mindreframer committed Nov 11, 2023
1 parent b190089 commit c87fe9b
Show file tree
Hide file tree
Showing 12 changed files with 67 additions and 208 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ Surrealix.query(pid, "SELECT * FROM type::table($table);", %{table: "person"})

```elixir
## Example with live query callbacks
Surrealix.live_query(pid, "LIVE DIFF SELECT * FROM user;", fn event, data, config ->
IO.inspect({event, data, config}, label: "callback")
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)

## Example with live query with DIFF
Surrealix.live_query(pid, "LIVE SELECT DIFF FROM user;", fn event, data, config ->
IO.inspect({event, data, config}, label: "callback")
Surrealix.live_query(pid, "LIVE SELECT DIFF FROM user;", fn data, query_id ->
IO.inspect({data, query_id}, label: "callback")
end)


Expand Down
4 changes: 1 addition & 3 deletions lib/surrealix/api.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ defmodule Surrealix.Api do
with {:sql_live_check, true} <- {:sql_live_check, Util.is_live_query_stmt(sql)},
{:ok, res} <- query(pid, sql, vars),
%{"result" => [%{"result" => lq_id}]} <- res do
event = [:live_query, lq_id]
:ok = Surrealix.Dispatch.attach("#{lq_id}_main", event, callback)
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id})
:ok = WebSockex.cast(pid, {:register_lq, sql, lq_id, callback})
{:ok, res}
else
{:sql_live_check, false} -> {:error, "Not a live query: `#{sql}`!"}
Expand Down
4 changes: 1 addition & 3 deletions lib/surrealix/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ defmodule Surrealix.Application do

@impl true
def start(_type, _args) do
children = [
{Surrealix.HandlerTable, []}
]
children = []

opts = [strategy: :one_for_one, name: Surrealix.Supervisor]
Supervisor.start_link(children, opts)
Expand Down
9 changes: 0 additions & 9 deletions lib/surrealix/attach_error.ex

This file was deleted.

44 changes: 0 additions & 44 deletions lib/surrealix/dispatch.ex

This file was deleted.

50 changes: 0 additions & 50 deletions lib/surrealix/dispatch_test.exs

This file was deleted.

69 changes: 0 additions & 69 deletions lib/surrealix/handler_table.ex

This file was deleted.

10 changes: 7 additions & 3 deletions lib/surrealix/socket.ex
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ defmodule Surrealix.Socket do
exit(:normal)
end

def handle_cast({:register_lq, sql, query_id}, state) do
state = SocketState.add_lq(state, sql, query_id)
def handle_cast({:register_lq, sql, query_id, callback}, state) do
state = SocketState.add_lq(state, sql, query_id, callback)
{:ok, state}
end

Expand All @@ -73,7 +73,11 @@ defmodule Surrealix.Socket do
if is_nil(task) do
# No registered task for this ID, must be a live query update
lq_id = get_in(json, ["result", "id"])
Surrealix.Dispatch.execute([:live_query, lq_id], json)
lq_item = SocketState.get_lq(state, lq_id)

if(!is_nil(lq_item)) do
lq_item.callback.(json, lq_id)
end
else
if Process.alive?(task.pid) do
Process.send(task.pid, {:ok, json, id}, [])
Expand Down
22 changes: 13 additions & 9 deletions lib/surrealix/socket_state.ex
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ defmodule Surrealix.SocketState do
@doc """
Register a SQL statement for a particular LiveQuery ID
"""
def add_lq(state = %SocketState{}, sql, query_id) do
lq_sql = MapSet.put(state.lq_sql, sql)
item = %{sql: sql, query_id: query_id}
def add_lq(state = %SocketState{}, sql, query_id, callback) do
lq_sql = MapSet.put(state.lq_sql, {sql, callback})
item = %{sql: sql, query_id: query_id, callback: callback}

state
|> put_in([:lq_running, query_id], item)
Expand All @@ -65,7 +65,7 @@ defmodule Surrealix.SocketState do
{item, state} = pop_in(state, [:lq_running, query_id])

if item do
lq_sql = MapSet.delete(state.lq_sql, item.sql)
lq_sql = MapSet.delete(state.lq_sql, {item.sql, item.callback})

state
|> Map.put(:lq_sql, lq_sql)
Expand All @@ -78,12 +78,16 @@ defmodule Surrealix.SocketState do
Remove a LiveQuery by SQL
"""
def delete_lq_by_sql(state = %SocketState{}, sql) do
lq_running =
Enum.reject(state.lq_running, fn {_id, value} -> Map.get(value, :sql) == sql end)
|> Map.new()
found = Enum.find(state.lq_running, fn {_id, value} -> Map.get(value, :sql) == sql end)

lq_sql = MapSet.delete(state.lq_sql, sql)
Map.put(state, :lq_running, lq_running) |> Map.put(:lq_sql, lq_sql)
if !is_nil(found) do
{key, item} = found
lq_running = Map.delete(state.lq_running, key)
lq_sql = MapSet.delete(state.lq_sql, {item.sql, item.callback})
Map.put(state, :lq_running, lq_running) |> Map.put(:lq_sql, lq_sql)
else
state
end
end

@doc """
Expand Down
52 changes: 39 additions & 13 deletions lib/surrealix/socket_state_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule Surrealix.SocketStateTest do

alias Surrealix.SocketState

def dummy_callback(), do: fn -> nil end

describe "tasks" do
test "add_task / get_task / delete_task" do
state = SocketState.new()
Expand All @@ -24,40 +26,64 @@ defmodule Surrealix.SocketStateTest do
describe "lq" do
test "add_lq / get_lq" do
state = SocketState.new()
state = state |> SocketState.add_lq("select * from person", "11-22")
cb = dummy_callback()
state = state |> SocketState.add_lq("select * from person", "11-22", cb)

assert SocketState.get_lq(state, "11-22") == %{
callback: cb,
query_id: "11-22",
sql: "select * from person"
}
end

test "all_lq" do
state = SocketState.new()
state = state |> SocketState.add_lq("select * from person", "11-22")
state = state |> SocketState.add_lq("select * from user", "11-23")
assert SocketState.all_lq(state) == ["select * from person", "select * from user"]
cb = dummy_callback()
state = state |> SocketState.add_lq("select * from person", "11-22", cb)
state = state |> SocketState.add_lq("select * from user", "11-23", cb)

assert SocketState.all_lq(state) == [
{"select * from person", cb},
{"select * from user", cb}
]
end

test "delete_lq_by_id" do
state = SocketState.new()
state = state |> SocketState.add_lq("select * from person", "11-22")
state = state |> SocketState.add_lq("select * from user", "11-23")
assert SocketState.all_lq(state) == ["select * from person", "select * from user"]
cb = dummy_callback()
state = state |> SocketState.add_lq("select * from person", "11-22", cb)
state = state |> SocketState.add_lq("select * from user", "11-23", cb)

assert SocketState.all_lq(state) == [
{"select * from person", cb},
{"select * from user", cb}
]

state = state |> SocketState.delete_lq_by_id("11-23")
assert SocketState.all_lq(state) == ["select * from person"]
assert SocketState.all_lq(state) == [{"select * from person", cb}]
assert SocketState.get_lq(state, "11-23") == nil
end

test "delete_lq_by_sql" do
state = SocketState.new()
state = state |> SocketState.add_lq("select * from person", "11-22")
state = state |> SocketState.add_lq("select * from user", "11-23")
assert SocketState.all_lq(state) == ["select * from person", "select * from user"]
cb = dummy_callback()
state = state |> SocketState.add_lq("select * from person", "11-22", cb)
state = state |> SocketState.add_lq("select * from user", "11-23", cb)

assert SocketState.all_lq(state) == [
{"select * from person", cb},
{"select * from user", cb}
]

state = SocketState.delete_lq_by_sql(state, "select * from person")
assert SocketState.all_lq(state) == ["select * from user"]
assert SocketState.all_lq(state) == [{"select * from user", cb}]
assert SocketState.get_lq(state, "11-22") == nil
assert SocketState.get_lq(state, "11-23") == %{query_id: "11-23", sql: "select * from user"}

assert SocketState.get_lq(state, "11-23") == %{
query_id: "11-23",
sql: "select * from user",
callback: cb
}
end
end
end
2 changes: 1 addition & 1 deletion lib/surrealix_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Surrealix.Test do
testpid = self()

{:ok, _} =
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn _event, data, _config ->
Surrealix.live_query(pid, "LIVE SELECT * FROM user;", fn data, _query_id ->
send(testpid, {:lq, data})
end)

Expand Down
1 change: 1 addition & 0 deletions lib/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# ExUnit.start(trace: true) ## -> verbose output, great for debugging!
ExUnit.start()
Mneme.start(restart: true)

Expand Down

0 comments on commit c87fe9b

Please sign in to comment.