Skip to content

Commit

Permalink
Merge pull request #23 from poanetwork/ferigis.22.send_inactive_if_ag…
Browse files Browse the repository at this point in the history
…ent_gets_lost

[#22] [REST Handler] sending inactive when a client is no sending pings
  • Loading branch information
Dzol committed Jul 9, 2018
2 parents 7cfd4cc + b48a80b commit ca20d15
Show file tree
Hide file tree
Showing 16 changed files with 315 additions and 21 deletions.
Binary file added assets/REST_architecture.png
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
25 changes: 23 additions & 2 deletions lib/poa_backend/custom_handler.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
defmodule POABackend.CustomHandler do

alias POABackend.Protocol.Message
alias POABackend.Metric

@moduledoc """
A Custom Handler is responsible of handling data sent from Agents (i.e. REST over HTTP, WebSockets...) "speaking" the POA Protocol.
Expand Down Expand Up @@ -38,6 +41,13 @@ defmodule POABackend.CustomHandler do
is the id for that handler, the second one is the Elixir module which implements the CustomHandler behaviour and the third one is a list for arguments
which will be passed to the `child_spec/1` function as a parameter
### Helpful functions
This module also define some helpful functions:
- send_to_receivers/1: This function will publish the incomming message to the appropiate metric type (Data Type). A Custom Handler must call it when it wants to dispatch a message.
- publish_inactive/1: Will publish an inactive message to all the metrics in the system. A Custom Handler must call it when detects if a client is disconnected or/and inactive
"""

@doc """
Expand All @@ -53,12 +63,23 @@ defmodule POABackend.CustomHandler do
end

@doc """
This function dispatches the given Message to the appropiate receivers based on the Data Type.
This function dispatches the given Message to the appropiate receivers based on the Data Type (ie :ethereum_metric).
The mapping between Data Types and Receivers is done in the config file.
_Note_ the message must be a [POABackend.Protocol.Message](POABackend.Protocol.Message.html) struct
"""
@spec send_to_receivers(Message.t) :: :ok
def send_to_receivers(%Message{} = message) do
POABackend.Metric.add(message.data_type, message)
Metric.add(message.data_type, message)
end

@doc """
Publish an inactive message to all the metrics defined in the config file.
A Custom Handler must call this explicity when detecting if a client is inactive for a period of time
"""
def publish_inactive(agent_id) do
Metric.broadcast({:inactive, agent_id})
end
end
58 changes: 51 additions & 7 deletions lib/poa_backend/custom_handler/rest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,24 @@ defmodule POABackend.CustomHandler.REST do
@moduledoc """
This module implements the REST Custom Handler over HTTP/1.1.
It defines the endpoints needed to use the POA Protocol.
# Plugin Architecture
This plugin involves many processes. When the `POABackend.CustomHandler.Supervisor` calls the
`child_spec/2` function it will create its own supervision tree under that supervisor
![REST plugin Architecture](./REST_architecture.png)
- `POABackend.CustomHandler.REST.Supervisor` is the main supervisor. It is in charge of supervise its three
children.
- The `Ranch/Cowboy` branch is managed by Ranch and Cowboy apps. They are in charge of expose the REST endpoints on top
of http.
- The Registry is an Elixir Registry in charge of track/untrack Activity Monitor Servers, created by the next child
- `POABackend.CustomHandler.REST.Monitor.Supervisor` is a Supervisor with `:simple_one_for_one` strategy. It will start
dynamically `GenServer`'s implemented by `POABackend.CustomHandler.REST.Monitor.Server` module.
# REST endpoints
This Pluting also defines the endpoints needed to use the POA Protocol.
## _hello_ endpoint
Expand Down Expand Up @@ -290,10 +307,10 @@ defmodule POABackend.CustomHandler.REST do
alias POABackend.Protocol.DataType
alias POABackend.Protocol.Message

plug REST.AcceptPlug, "application/json"
plug REST.Plugs.Accept, "application/json"
plug Plug.Parsers, parsers: [:json], json_decoder: Poison
plug REST.RequiredFieldsPlug, ~w(id secret)
plug REST.AuthorizationPlug
plug REST.Plugs.RequiredFields, ~w(id secret)
plug REST.Plugs.Authorization
plug :match
plug :dispatch

Expand All @@ -304,13 +321,15 @@ defmodule POABackend.CustomHandler.REST do
end

post "/ping" do
:ok = REST.ping_monitor(conn.params["id"])

conn
|> put_resp_content_type("application/json")
|> send_success_resp()
end

post "/latency" do
conn = REST.RequiredFieldsPlug.call(conn, ~w(latency))
conn = REST.Plugs.RequiredFields.call(conn, ~w(latency))

with false <- conn.halted,
true <- is_float(conn.params["latency"])
Expand All @@ -329,7 +348,7 @@ defmodule POABackend.CustomHandler.REST do
end

post "/data" do
conn = REST.RequiredFieldsPlug.call(conn, ~w(type data))
conn = REST.Plugs.RequiredFields.call(conn, ~w(type data))

with false <- conn.halted,
true <- is_map(conn.params["data"]),
Expand Down Expand Up @@ -379,10 +398,35 @@ defmodule POABackend.CustomHandler.REST do
end
end

@doc """
This function will initialize an Activity Monitor Server for a given Agent ID if it doesn't
exist already. If it exist this function will send a ping message to the Monitor Server in order to
restart the timeout countdown.
The Activity Monitor Server is a `GenServer` which will be initialized under the
`POABackend.CustomHandler.REST.Monitor.Supervisor` supervisor.
"""
def ping_monitor(agent_id) when is_binary(agent_id) do
case Registry.lookup(:rest_activity_monitor_registry, agent_id) do
[{pid, _}] ->
GenServer.cast(pid, :ping)
[] ->
{:ok, _} = start_monitor(agent_id)
end

:ok
end

@doc false
def start_monitor(agent_id) when is_binary(agent_id) do
Supervisor.start_child(POABackend.CustomHandler.REST.Monitor.Supervisor, [agent_id])
end

# Custom Handler Callbacks

@doc false
def child_spec(options) do
Plug.Adapters.Cowboy.child_spec(scheme: options[:scheme], plug: POABackend.CustomHandler.REST.Router, options: [port: options[:port]])
POABackend.CustomHandler.REST.Supervisor.child_spec(options)
end

end
48 changes: 48 additions & 0 deletions lib/poa_backend/custom_handler/rest/monitor/server.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
defmodule POABackend.CustomHandler.REST.Monitor.Server do
@moduledoc false

use GenServer

@timeout 7000
@registry :rest_activity_monitor_registry

def start_link(args) do
agent_id = via_tuple(args)
GenServer.start_link(__MODULE__, args, name: agent_id)
end

def child_spec do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :worker,
restart: :temporary,
shutdown: 500
}
end

def init(args) do
{:ok, %{agent_id: args}, @timeout}
end

def handle_cast(:ping, state) do
{:noreply, state, @timeout}
end

def handle_info(:timeout, state) do
{:stop, :normal, state}
end

def terminate(_, state) do
set_inactive(state.agent_id)
Registry.unregister(@registry, state.agent_id)
end

defp via_tuple(agent_id) do
{:via, Registry, {@registry, agent_id}}
end

defp set_inactive(agent_id) do
POABackend.CustomHandler.publish_inactive(agent_id)
end
end
29 changes: 29 additions & 0 deletions lib/poa_backend/custom_handler/rest/monitor/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
defmodule POABackend.CustomHandler.REST.Monitor.Supervisor do
@moduledoc false

alias POABackend.CustomHandler.REST

def start_link do
Supervisor.start_link(__MODULE__, :noargs, name: __MODULE__)
end

def child_spec do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, []},
type: :supervisor,
restart: :permanent,
shutdown: 500
}
end

def init(:noargs) do
children = [
REST.Monitor.Server.child_spec()
]

opts = [strategy: :simple_one_for_one]
Supervisor.init(children, opts)
end

end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.AcceptPlug do
defmodule POABackend.CustomHandler.REST.Plugs.Accept do
@moduledoc false

@behaviour Plug
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.AuthorizationPlug do
defmodule POABackend.CustomHandler.REST.Plugs.Authorization do
@moduledoc false

@behaviour Plug
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule POABackend.CustomHandler.REST.RequiredFieldsPlug do
defmodule POABackend.CustomHandler.REST.Plugs.RequiredFields do
@moduledoc false

@behaviour Plug
Expand Down
33 changes: 33 additions & 0 deletions lib/poa_backend/custom_handler/rest/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule POABackend.CustomHandler.REST.Supervisor do
@moduledoc false

alias POABackend.CustomHandler.REST

def start_link(rest_options) do
Supervisor.start_link(__MODULE__, rest_options, name: __MODULE__)
end

def child_spec(rest_options) do
%{
id: __MODULE__,
start: {__MODULE__, :start_link, [rest_options]},
type: :supervisor,
restart: :permanent,
shutdown: 500
}
end

def init(rest_options) do
import Supervisor.Spec

children = [
Plug.Adapters.Cowboy.child_spec(scheme: rest_options[:scheme], plug: REST.Router, options: [port: rest_options[:port]]),
supervisor(Registry, [:unique, :rest_activity_monitor_registry]),
REST.Monitor.Supervisor.child_spec()
]

opts = [strategy: :one_for_one]
Supervisor.init(children, opts)
end

end
8 changes: 8 additions & 0 deletions lib/poa_backend/metric.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ defmodule POABackend.Metric do
GenStage.start_link(__MODULE__, :ok, name: name)
end

def broadcast(message) do
metrics = Application.get_env(:poa_backend, :metrics)

for metric <- metrics do
add(metric, message)
end
end

def init(:ok) do
{:producer, [], dispatcher: GenStage.BroadcastDispatcher}
end
Expand Down
21 changes: 21 additions & 0 deletions lib/poa_backend/receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ defmodule POABackend.Receiver do
- `init_receiver/1`: Called only once when the process starts
- `metrics_received/3`: This function is called eveytime the Producer (metric type) receives a message.
- `handle_message/1`: This is called when the Receiver process receives an Erlang message
- `handle_inactive/2`: This function is called when one client has been disconnected or is not active for a period of time.
- `terminate/1`: Called just before stopping the process
This is a simple example of custom Receiver Plugin
Expand All @@ -84,6 +85,10 @@ defmodule POABackend.Receiver do
{:ok, state}
end
def handle_inactive(agent_id, state) do
{:ok, state}
end
def terminate(_state) do
:ok
end
Expand Down Expand Up @@ -114,6 +119,15 @@ defmodule POABackend.Receiver do
"""
@callback handle_message(msg :: any(), state :: any()) :: {:ok, state :: any()}


@doc """
This function is called when a Custom Handler detects a client is inactive.
The Custom Handler must to call explicity to `POABackend.CustomHandler.publish_inactive/1` and it will publish the
`inactive` message to all the metrics in the system (defined in the config file).
"""
@callback handle_inactive(agent_id :: binary(), state :: any()) :: {:ok, state :: any()}

@doc """
This callback is called just before the Process goes down. This is a good place for closing connections.
"""
Expand All @@ -138,6 +152,13 @@ defmodule POABackend.Receiver do
end

@doc false
def handle_events([inactive: agent_id], _from, state) do

{:ok, internal_state} = handle_inactive(agent_id, state.internal_state)

{:noreply, [], %{state | internal_state: internal_state}}
end

def handle_events(events, from, state) do

{:ok, internal_state} = metrics_received(events, from, state.internal_state)
Expand Down

0 comments on commit ca20d15

Please sign in to comment.