Skip to content

Commit

Permalink
Dispatch behaviour (#142)
Browse files Browse the repository at this point in the history
  • Loading branch information
visciang committed Oct 23, 2022
1 parent 858d3fe commit 9e8d713
Show file tree
Hide file tree
Showing 14 changed files with 57 additions and 37 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ jobs:
runs-on: ubuntu-latest

container:
image: elixir:1.13.3
image: elixir:1.14.1

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v3

- name: Retrieve Cached Dependencies
uses: actions/cache@v2
uses: actions/cache@v3
id: mix-cache
with:
path: |
Expand Down
2 changes: 1 addition & 1 deletion config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ if config_env() == :test do

config :telegram,
api_base_url: "http://test:8000",
get_updates_poll_timeout: 1
get_updates_poll_timeout_s: 1
end
3 changes: 2 additions & 1 deletion lib/bot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,14 +43,15 @@ defmodule Telegram.Bot do
require Logger

@behaviour Telegram.Bot
@behaviour Telegram.Bot.Dispatch

@spec child_spec(Types.bot_opts()) :: Supervisor.child_spec()
def child_spec(token: token, max_bot_concurrency: max_bot_concurrency) do
supervisor_name = Utils.name(__MODULE__, token)
Supervisor.child_spec({Task.Supervisor, name: supervisor_name, max_children: max_bot_concurrency}, [])
end

@spec dispatch_update(Types.update(), Types.token()) :: :ok
@impl Telegram.Bot.Dispatch
def dispatch_update(update, token) do
supervisor_name = Utils.name(__MODULE__, token)

Expand Down
4 changes: 2 additions & 2 deletions lib/bot/chat_bot/chat/session/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Telegram.Bot.ChatBot.Chat.Session.Server do
defstruct @enforce_keys
end

@spec start_link({module(), Types.token(), ChatBot.chat()}) :: GenServer.on_start()
@spec start_link({ChatBot.t(), Types.token(), ChatBot.chat()}) :: GenServer.on_start()
def start_link({chatbot_behaviour, token, %{"id" => chat_id} = chat}) do
GenServer.start_link(
__MODULE__,
Expand All @@ -22,7 +22,7 @@ defmodule Telegram.Bot.ChatBot.Chat.Session.Server do
)
end

@spec handle_update(module(), Types.token(), Types.update()) :: any()
@spec handle_update(ChatBot.t(), Types.token(), Types.update()) :: any()
def handle_update(chatbot_behaviour, token, update) do
with {:get_chat, {:ok, chat}} <- {:get_chat, Utils.get_chat(update)},
{:get_chat_session_server, {:ok, server}} <-
Expand Down
11 changes: 11 additions & 0 deletions lib/bot/dispatch.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
defmodule Telegram.Bot.Dispatch do
@moduledoc """
Dispatch behaviour
"""

alias Telegram.Types

@type t :: module()

@callback dispatch_update(Types.update(), Types.token()) :: :ok
end
5 changes: 4 additions & 1 deletion lib/chat_bot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ defmodule Telegram.ChatBot do
alias Telegram.Bot.ChatBot.Chat.Session
alias Telegram.Types

@type t :: module()

@type chat :: map()
@type chat_state :: any()

Expand Down Expand Up @@ -93,6 +95,7 @@ defmodule Telegram.ChatBot do
defmacro __using__(_use_opts) do
quote location: :keep do
@behaviour Telegram.ChatBot
@behaviour Telegram.Bot.Dispatch

@impl Telegram.ChatBot
def handle_timeout(token, chat_id, chat_state) do
Expand All @@ -106,7 +109,7 @@ defmodule Telegram.ChatBot do
Supervisor.child_spec({Chat.Supervisor, {token, max_bot_concurrency}}, [])
end

@spec dispatch_update(Types.update(), Types.token()) :: :ok
@impl Telegram.Bot.Dispatch
def dispatch_update(update, token) do
Session.Server.handle_update(__MODULE__, token, update)

Expand Down
39 changes: 20 additions & 19 deletions lib/poller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,40 +50,41 @@ end
defmodule Telegram.Poller.Task do
@moduledoc false

alias Telegram.{Poller, Types}
alias Telegram.Bot.Dispatch
alias Telegram.Types
require Logger

use Task, restart: :permanent
use Retry

@type dispatch_update :: (Types.update(), Types.token() -> any())

defmodule Context do
@moduledoc false
defstruct [:dispatch_update, :token, :offset]

@enforce_keys [:dispatch, :token, :offset]
defstruct @enforce_keys

@type t :: %__MODULE__{
dispatch_update: Poller.Task.dispatch_update(),
dispatch: Dispatch.t(),
token: Types.token(),
offset: integer()
offset: nil | integer()
}
end

@spec start_link({module(), Types.token()}) :: {:ok, pid()}
def start_link({bot_behaviour_mod, token}) do
Task.start_link(__MODULE__, :run, [bot_behaviour_mod, token])
@spec start_link({Dispatch.t(), Types.token()}) :: {:ok, pid()}
def start_link({bot_dispatch_behaviour, token}) do
Task.start_link(__MODULE__, :run, [bot_dispatch_behaviour, token])
end

@doc false
@spec run(module(), Types.token()) :: no_return()
def run(bot_behaviour_module, token) do
Logger.metadata(bot: bot_behaviour_module)
@spec run(Dispatch.t(), Types.token()) :: no_return()
def run(bot_dispatch_behaviour, token) do
Logger.metadata(bot: bot_dispatch_behaviour)
Logger.info("Running in polling mode")

set_polling(token)

context = %Context{
dispatch_update: &bot_behaviour_module.dispatch_update/2,
dispatch: bot_dispatch_behaviour,
token: token,
offset: nil
}
Expand All @@ -101,14 +102,14 @@ defmodule Telegram.Poller.Task do
end
end

defp loop(context) do
defp loop(%Context{} = context) do
updates = wait_updates(context)

next_offset = process_updates(updates, context)
loop(%Context{context | offset: next_offset})
end

defp wait_updates(context) do
defp wait_updates(%Context{} = context) do
opts_offset = if context.offset != nil, do: [offset: context.offset], else: []
opts = [timeout: conf_get_updates_poll_timeout()] ++ opts_offset

Expand All @@ -125,19 +126,19 @@ defmodule Telegram.Poller.Task do
end
end

defp process_updates(updates, context) do
defp process_updates(updates, %Context{} = context) do
updates |> Enum.reduce(nil, &process_update(&1, &2, context))
end

defp process_update(update, _acc, context) do
defp process_update(update, _acc, %Context{} = context) do
Logger.debug("process_update: #{inspect(update)}")

context.dispatch_update.(update, context.token)
context.dispatch.dispatch_update(update, context.token)
update["update_id"] + 1
end

defp conf_get_updates_poll_timeout do
# timeout configuration opts unit: seconds
Application.get_env(:telegram, :get_updates_poll_timeout, 30)
Application.get_env(:telegram, :get_updates_poll_timeout_s, 30)
end
end
4 changes: 3 additions & 1 deletion lib/types.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ defmodule Telegram.Types do
Telegram types
"""

alias Telegram.Bot.Dispatch

@type token :: String.t()
@type method :: String.t()
@type update :: map()

@type max_bot_concurrency :: pos_integer() | :infinity
@type bot_opts :: [token: token(), max_bot_concurrency: max_bot_concurrency()]
@type bot_spec :: {module(), bot_opts()}
@type bot_spec :: {Dispatch.t(), bot_opts()}
end
8 changes: 4 additions & 4 deletions lib/webhook.ex
Original file line number Diff line number Diff line change
Expand Up @@ -138,14 +138,14 @@ defmodule Telegram.Webhook.Router do

post "/:token" do
update = conn.body_params
bot_behaviour_mod = Map.get(opts[:bot_routing_map], token)
bot_dispatch_behaviour = Map.get(opts[:bot_routing_map], token)

Logger.debug("received update: #{inspect(update)}", bot: bot_behaviour_mod, token: token)
Logger.debug("received update: #{inspect(update)}", bot: bot_dispatch_behaviour, token: token)

if bot_behaviour_mod == nil do
if bot_dispatch_behaviour == nil do
Plug.Conn.send_resp(conn, :not_found, "")
else
bot_behaviour_mod.dispatch_update(update, token)
bot_dispatch_behaviour.dispatch_update(update, token)
Plug.Conn.send_resp(conn, :ok, "")
end
end
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Telegram.Mixfile do
def project do
[
app: :telegram,
version: "0.21.0",
version: "0.22.0",
elixir: "~> 1.12",
start_permanent: Mix.env() == :prod,
preferred_cli_env: [
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@
"cowlib": {:hex, :cowlib, "2.11.0", "0b9ff9c346629256c42ebe1eeb769a83c6cb771a6ee5960bd110ab0b9b872063", [:make, :rebar3], [], "hexpm", "2b3e9da0b21c4565751a6d4901c20d1b4cc25cbb7fd50d91d2ab6dd287bc86a9"},
"credo": {:hex, :credo, "1.6.7", "323f5734350fd23a456f2688b9430e7d517afb313fbd38671b8a4449798a7854", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "41e110bfb007f7eda7f897c10bf019ceab9a0b269ce79f015d54b0dcf4fc7dd3"},
"dialyxir": {:hex, :dialyxir, "1.2.0", "58344b3e87c2e7095304c81a9ae65cb68b613e28340690dfe1a5597fd08dec37", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "61072136427a851674cab81762be4dbeae7679f85b1272b6d25c3a839aff8463"},
"earmark_parser": {:hex, :earmark_parser, "1.4.26", "f4291134583f373c7d8755566122908eb9662df4c4b63caa66a0eabe06569b0a", [:mix], [], "hexpm", "48d460899f8a0c52c5470676611c01f64f3337bad0b26ddab43648428d94aabc"},
"earmark_parser": {:hex, :earmark_parser, "1.4.29", "149d50dcb3a93d9f3d6f3ecf18c918fb5a2d3c001b5d3305c926cddfbd33355b", [:mix], [], "hexpm", "4902af1b3eb139016aed210888748db8070b8125c2342ce3dcae4f38dcc63503"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"},
"excoveralls": {:hex, :excoveralls, "0.14.6", "610e921e25b180a8538229ef547957f7e04bd3d3e9a55c7c5b7d24354abbba70", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "0eceddaa9785cfcefbf3cd37812705f9d8ad34a758e513bb975b081dce4eb11e"},
"ex_doc": {:hex, :ex_doc, "0.29.0", "4a1cb903ce746aceef9c1f9ae8a6c12b742a5461e6959b9d3b24d813ffbea146", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "f096adb8bbca677d35d278223361c7792d496b3fc0d0224c9d4bc2f651af5db1"},
"excoveralls": {:hex, :excoveralls, "0.15.0", "ac941bf85f9f201a9626cc42b2232b251ad8738da993cf406a4290cacf562ea4", [:mix], [{:hackney, "~> 1.16", [hex: :hackney, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "9631912006b27eca30a2f3c93562bc7ae15980afb014ceb8147dc5cdd8f376f1"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~>2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~>6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~>1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~>1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~>1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~>0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
Expand Down
1 change: 1 addition & 0 deletions test/poller_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Test.Telegram.Poller do
defmodule TestBotBehaviour do
use Telegram.Bot

@impl Telegram.Bot
def handle_update(update, _token) do
assert %{"message" => %{"text" => "/test"}} = update
end
Expand Down
1 change: 1 addition & 0 deletions test/support/test_bot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule Test.Bot do

use Telegram.Bot

@impl Telegram.Bot
def handle_update(_update, token) do
Telegram.Api.request(token, "testResponse")
end
Expand Down
2 changes: 1 addition & 1 deletion test/support/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule Test.Utils.Mock do

alias Test.Utils.Const

@retry_wait_period Application.compile_env(:telegram, :get_updates_poll_timeout) * 1_000 + 500
@retry_wait_period Application.compile_env(:telegram, :get_updates_poll_timeout_s) * 1_000 + 500

def tesla_mock_global_async do
Tesla.Mock.mock_global(fn %{url: url} = request ->
Expand Down

0 comments on commit 9e8d713

Please sign in to comment.