Skip to content

Commit

Permalink
Add monitors for GC :ets. Refactor Server.
Browse files Browse the repository at this point in the history
  • Loading branch information
merqlove committed Jul 28, 2016
1 parent ca5855f commit 1cb904c
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 55 deletions.
85 changes: 62 additions & 23 deletions lib/redix/pubsub/fastlane/server.ex
Expand Up @@ -10,7 +10,7 @@ defmodule Redix.PubSub.Fastlane.Server do

defmodule Subscription do
@moduledoc false
defstruct parent: nil, options: [], channel: nil, pid: nil
defstruct parent: nil, options: [], pid: nil
end

@doc """
Expand All @@ -24,13 +24,15 @@ defmodule Redix.PubSub.Fastlane.Server do
Initializes the server.
"""
def init(opts) do
Process.flag(:trap_exit, true)
channels = :ets.new(opts[:server_name], [:named_table, :duplicate_bag, {:read_concurrency, true}, {:write_concurrency, true}])
channels = :ets.new(opts[:server_name], [:named_table, :duplicate_bag,
read_concurrency: true, write_concurrency: true])

Process.flag(:trap_exit, true)
state = %{server_name: Keyword.fetch!(opts, :server_name),
channels: channels,
opts: opts,
connected: false,
monitors: %{},
pool_name: Keyword.fetch!(opts, :pool_name),
namespace: Keyword.fetch!(opts, :namespace),
decoder: Keyword.fetch!(opts, :decoder),
Expand All @@ -56,13 +58,13 @@ defmodule Redix.PubSub.Fastlane.Server do

@doc false
def subscribe(pubsub_server, from, channel, fastlane) do
GenServer.cast(pubsub_server, {:subscribe, from, channel, fastlane, :subscribe})
GenServer.cast(pubsub_server, {:subscribe, from, channel, %{fastlane: fastlane, method: :subscribe}})
:ok
end

@doc false
def psubscribe(pubsub_server, from, pattern, fastlane) do
GenServer.cast(pubsub_server, {:subscribe, from, pattern, fastlane, :psubscribe})
GenServer.cast(pubsub_server, {:subscribe, from, pattern, %{fastlane: fastlane, method: :psubscribe}})
:ok
end

Expand All @@ -80,7 +82,7 @@ defmodule Redix.PubSub.Fastlane.Server do

@doc false
def publish(pubsub_server, channel, message) do
GenServer.cast(pubsub_server, {:publish, channel, message})
GenServer.call(pubsub_server, {:publish, channel, message})
:ok
end

Expand All @@ -100,15 +102,24 @@ defmodule Redix.PubSub.Fastlane.Server do
{:reply, result, state}
end

def handle_cast({:publish, channel, message}, state) do
include_ns(channel, state)
def handle_call({:publish, channel, message}, _from, state) do
result = include_ns(channel, state)
|> _publish(message, state.pool_name)

{:noreply, state}
{:reply, result, state}
end

def handle_cast({:monitor, pid}, state) do
{:noreply, put_new_monitor(state, pid)}
end

def handle_cast({:subscribe, from, channel, fastlane, method}, %{redix_pid: _} = state) do
_subscribe(from, channel, fastlane, state, method)
def handle_cast({:demonitor, pid}, state) do
{:noreply, drop_monitor(state, pid)}
end

def handle_cast({:subscribe, from, channel, opts}, %{redix_pid: _} = state) do
:ok = GenServer.cast(state.server_name, {:monitor, from})
_subscribe(from, channel, state, opts)
{:noreply, state}
end

Expand Down Expand Up @@ -151,6 +162,11 @@ defmodule Redix.PubSub.Fastlane.Server do
{:noreply, state}
end

def handle_info({:DOWN, _ref, _type, pid, _info}, state) do
_unsubscribe(pid, state)
{:noreply, drop_monitor(state, pid)}
end

@doc """
Connection establishment and shutdown loop
On init, an initial conection to redis is attempted when starting `:redix`
Expand Down Expand Up @@ -182,7 +198,7 @@ defmodule Redix.PubSub.Fastlane.Server do
defp _notify_all_embed(subscribers, message) do
subscribers
|> Enum.each(fn
{_from, %{pid: pid, options: options, parent: parent}} ->
{_from, {pid, parent, options}} ->
parent.fastlane(pid, message, options)
_ -> :noop
end)
Expand All @@ -195,8 +211,10 @@ defmodule Redix.PubSub.Fastlane.Server do
{^channel, {^from, _}} -> true
_ -> false
end)
|> Enum.map(fn {_, {_, subscription}} ->
%{ id: channel, from: from, subscription: subscription}
|> Enum.map(fn {_, {_, {pid, parent, options}}} ->
%{ id: channel,
from: from,
fastlane: %Subscription{pid: pid, parent: parent, options: options}}
end)

case subscriptions do
Expand Down Expand Up @@ -230,28 +248,29 @@ defmodule Redix.PubSub.Fastlane.Server do
end
end

defp _subscribe(from, channel, {pid, parent, options}, state, method) when is_atom(parent) and is_list(options) do
subscription = %Subscription{pid: pid, parent: parent, options: options, channel: channel}

true = :ets.insert(state.channels, {channel, {from, subscription}})
defp _subscribe(from, channel, state, opts) do
true = :ets.insert(state.channels, {channel, {from, opts[:fastlane]}})

case subscribe_to_channel(state, channel, method) do
case subscribe_to_channel(state, channel, opts[:method]) do
:error -> :error
_ -> :ok
end
end
defp _subscribe(_, _, _, _, _), do: :error

defp _unsubscribe(from, channel, %{channels: channels} = state, method) do
true = :ets.match_delete(channels, {channel, {from, :_}})

case :ets.select_count(channels, [{{channel, :_}, [], [true]}]) do
0 -> unsubscribe_from_channel(channel, state, method)
_ -> :noop
0 ->
unsubscribe_from_channel(channel, state, method)
:ok = GenServer.cast(state.server_name, {:demonitor, from})
_ -> :ok
end
:ok
end
defp _unsubscribe(_, _, _, _), do: :error
defp _unsubscribe(from, %{channels: channels}) do
true = :ets.match_delete(channels, {:_, {from, :_}})
end

defp _unsubscribe_all(channel, %{channels: channels}) do
true = :ets.delete(channels, channel)
Expand Down Expand Up @@ -293,6 +312,26 @@ defmodule Redix.PubSub.Fastlane.Server do
end
defp decode_payload(message, _), do: message

# Monitor handlers

defp put_new_monitor(%{monitors: monitors} = state, pid) do
case Map.fetch(monitors, pid) do
{:ok, _ref} -> state
:error -> %{state | monitors: Map.put(monitors, pid, Process.monitor(pid))}
end
end

defp drop_monitor(%{monitors: monitors} = state, pid) do
case Map.fetch(monitors, pid) do
{:ok, ref} ->
Process.demonitor(ref)
%{state | monitors: Map.delete(monitors, pid)}
:error -> state
end
end

# Connection handler

defp establish_conn(state) do
redis_opts = Keyword.take(state.opts, @redix_opts)
case Redix.PubSub.start_link(redis_opts) do
Expand Down
76 changes: 46 additions & 30 deletions test/redix/pubsub/fastlane_test.exs
Expand Up @@ -29,14 +29,18 @@ defmodule Redix.PubSub.FastlaneTest do
assert :ok = Fastlane.subscribe(ps, "foo", {fl, FastlaneNamespace, [:some_id]})
assert :ok = Fastlane.subscribe(ps, "bar", {fl, FastlaneNamespace, [:some_second_id]})

pid = self()

assert match? {:ok, [%{id: "foo",
subscription: %Subscription{channel: "foo",
options: [:some_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "foo")
from: ^pid,
fastlane: %Subscription{options: [:some_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "foo")
assert match? {:ok, [%{id: "bar",
subscription: %Subscription{channel: "bar",
options: [:some_second_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "bar")
from: ^pid,
fastlane: %Subscription{options: [:some_second_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "bar")
assert match? :error, Server.find(ps, "tar")

# Next we unsubscribe
Expand All @@ -45,9 +49,10 @@ defmodule Redix.PubSub.FastlaneTest do

assert match? :error, Server.find(ps, "foo")
assert match? {:ok, [%{id: "bar",
subscription: %Subscription{channel: "bar",
options: [:some_second_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "bar")
from: ^pid,
fastlane: %Subscription{options: [:some_second_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "bar")

assert :ok = Fastlane.unsubscribe(ps, "bar")
assert match? :error, Server.find(ps, "bar")
Expand All @@ -58,14 +63,18 @@ defmodule Redix.PubSub.FastlaneTest do
assert :ok = Fastlane.psubscribe(ps, "foo*", {fl, FastlaneNamespace, [:some_id]})
assert :ok = Fastlane.psubscribe(ps, "bar*", {fl, FastlaneNamespace, [:some_second_id]})

pid = self()

assert match? {:ok, [%{id: "foo*",
subscription: %Subscription{channel: "foo*",
options: [:some_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "foo*")
from: ^pid,
fastlane: %Subscription{options: [:some_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "foo*")
assert match? {:ok, [%{id: "bar*",
subscription: %Subscription{channel: "bar*",
options: [:some_second_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "bar*")
from: ^pid,
fastlane: %Subscription{options: [:some_second_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "bar*")
assert match? :error, Server.find(ps, "tar*")

# Next we unsubscribe
Expand All @@ -74,9 +83,10 @@ defmodule Redix.PubSub.FastlaneTest do

assert match? :error, Server.find(ps, "foo*")
assert match? {:ok, [%{id: "bar*",
subscription: %Subscription{channel: "bar*",
options: [:some_second_id],
parent: FastlaneNamespace}}]}, Server.find(ps, "bar*")
from: ^pid,
fastlane: %Subscription{options: [:some_second_id],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "bar*")

assert :ok = Fastlane.punsubscribe(ps, "bar*")
assert match? :error, Server.find(ps, "bar*")
Expand All @@ -90,23 +100,29 @@ defmodule Redix.PubSub.FastlaneTest do
assert :ok = Fastlane.psubscribe(ps, "boo*", {fl, FastlaneNamespace, [:some_id3]})
assert :ok = Fastlane.psubscribe(ps, "boo*", {fl, FastlaneNamespace, [:some_id4]})

pid = self()

assert match? {:ok, [%{id: "foo",
subscription: %Subscription{channel: "foo",
options: [:some_id],
parent: FastlaneNamespace}},
from: ^pid,
fastlane: %Subscription{options: [:some_id],
parent: FastlaneNamespace,
pid: ^fl}},
%{id: "foo",
subscription: %Subscription{channel: "foo",
options: [:some_id2],
parent: FastlaneNamespace}}]}, Server.find(ps, "foo")
from: ^pid,
fastlane: %Subscription{options: [:some_id2],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "foo")

assert match? {:ok, [%{id: "boo*",
subscription: %Subscription{channel: "boo*",
options: [:some_id3],
parent: FastlaneNamespace}},
from: ^pid,
fastlane: %Subscription{options: [:some_id3],
parent: FastlaneNamespace,
pid: ^fl}},
%{id: "boo*",
subscription: %Subscription{channel: "boo*",
options: [:some_id4],
parent: FastlaneNamespace}}]}, Server.find(ps, "boo*")
from: ^pid,
fastlane: %Subscription{options: [:some_id4],
parent: FastlaneNamespace,
pid: ^fl}}]}, Server.find(ps, "boo*")
end

it "#unsubscribe :ok on not existing", %{conn: ps} do
Expand Down
4 changes: 2 additions & 2 deletions test/support/redix/pubsub/fastlane/namespace_test.exs
Expand Up @@ -14,14 +14,14 @@ defmodule Redix.PubSub.Fastlane.NamespaceTest do
def fastlane(subscribers, message) do
subscribers
|> Enum.each(fn
{_from, %{pid: pid, options: options, parent: _parent}} ->
{_from, {pid, _parent, options}} ->
__MODULE__.fastlane(pid, message, options)
_ -> :noop
end)
end

def fastlane(pid, payload, options) do
GenServer.cast({FastlaneTestNamespace, node()}, {:fastlane, pid, payload, options})
GenServer.cast(FastlaneTestNamespace, {:fastlane, pid, payload, options})
:ok
end

Expand Down

0 comments on commit 1cb904c

Please sign in to comment.