From 3b268217eb0af7121204952ef64cb6d69008cbf8 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Tue, 21 Jan 2025 16:22:39 -0300 Subject: [PATCH 01/13] Add sse dependency and config --- config/config.exs | 6 ++++++ mix.exs | 5 ++++- 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/config/config.exs b/config/config.exs index d109b7f05..43379a39d 100644 --- a/config/config.exs +++ b/config/config.exs @@ -52,3 +52,9 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do config :lambda_ethereum_consensus, Snappy, skip_compilation?: true config :lambda_ethereum_consensus, Ssz, skip_compilation?: true end + +config :sse, + keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 1000} + +config :event_bus, + topics: [:finalized_checkpoint] diff --git a/mix.exs b/mix.exs index d993bbed9..63877121c 100644 --- a/mix.exs +++ b/mix.exs @@ -64,7 +64,10 @@ defmodule LambdaEthereumConsensus.MixProject do {:sentry, "~> 10.8.0"}, {:prom_ex, "~> 1.11.0"}, {:flama, git: "https://github.com/lambdaclass/ht1223_tracer"}, - {:uuid, "~> 1.1"} + {:uuid, "~> 1.1"}, + # TODO: We might want to use phoenix_pubsub instead and do our implementation of SSE. + {:sse, "~> 0.4"}, + {:event_bus, ">= 1.6.0"} ] end From adc9c6c1a171891c6b4c4930c54ff4133731d472 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Tue, 21 Jan 2025 19:06:39 -0300 Subject: [PATCH 02/13] Initial SSE testing --- config/config.exs | 4 +++ lib/beacon_api/controllers/v1/events.ex | 35 +++++++++++++++++++++++++ lib/beacon_api/helpers.ex | 3 +++ lib/beacon_api/router.ex | 6 ++++- mix.lock | 2 ++ 5 files changed, 49 insertions(+), 1 deletion(-) create mode 100644 lib/beacon_api/controllers/v1/events.ex diff --git a/config/config.exs b/config/config.exs index 43379a39d..1a0b9a6e4 100644 --- a/config/config.exs +++ b/config/config.exs @@ -58,3 +58,7 @@ config :sse, config :event_bus, topics: [:finalized_checkpoint] + +config :mime, :types, %{ + "text/event-stream" => ["sse"] +} diff --git a/lib/beacon_api/controllers/v1/events.ex b/lib/beacon_api/controllers/v1/events.ex new file mode 100644 index 000000000..919994c3d --- /dev/null +++ b/lib/beacon_api/controllers/v1/events.ex @@ -0,0 +1,35 @@ +defmodule BeaconApi.V1.Events do + use BeaconApi, :controller + + alias BeaconApi.ApiSpec + alias BeaconApi.Helpers + alias SSE.Chunk + + require Logger + + @topic :finalized_checkpoint + + def open_api_operation(:subscribe), + do: ApiSpec.spec().paths["/eth/v1/events"].get + + @spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t() + def subscribe(conn, _params) do + Logger.info("Subscribing to finalized checkpoint events") + _finalized_checkpoint = get_current_finalized_checkpoint() + + Logger.info("Sending SSE stream") + chunk = %Chunk{data: %{} |> Jason.encode!()} + + Logger.info(chunk |> inspect()) + conn + |> SSE.stream({[@topic], chunk}) + end + + defp get_current_finalized_checkpoint() do + x = Helpers.finalized_checkpoint() + + Logger.info(x |> inspect()) + + x + end +end diff --git a/lib/beacon_api/helpers.ex b/lib/beacon_api/helpers.ex index fc5511bc5..9fa9658a8 100644 --- a/lib/beacon_api/helpers.ex +++ b/lib/beacon_api/helpers.ex @@ -161,6 +161,9 @@ defmodule BeaconApi.Helpers do end end + @spec finalized_checkpoint() :: Types.Checkpoint.t() + def finalized_checkpoint(), do: ForkChoice.get_finalized_checkpoint() + @spec get_state_root(Types.root()) :: Types.root() | nil def get_state_root(root) do with %{} = block <- Blocks.get_block(root) do diff --git a/lib/beacon_api/router.ex b/lib/beacon_api/router.ex index df36dda97..02c701ccb 100644 --- a/lib/beacon_api/router.ex +++ b/lib/beacon_api/router.ex @@ -2,7 +2,7 @@ defmodule BeaconApi.Router do use BeaconApi, :router pipeline :api do - plug(:accepts, ["json"]) + plug(:accepts, ["json", "sse"]) plug(OpenApiSpex.Plug.PutApiSpec, module: BeaconApi.ApiSpec) end @@ -22,6 +22,10 @@ defmodule BeaconApi.Router do get("/identity", NodeController, :identity) get("/version", NodeController, :version) end + + scope "/events" do + get("/", Events, :subscribe) + end end # Ethereum API Version 2 diff --git a/mix.lock b/mix.lock index de3f807d8..403fa727f 100644 --- a/mix.lock +++ b/mix.lock @@ -19,6 +19,7 @@ "elixir_make": {:hex, :elixir_make, "0.8.4", "4960a03ce79081dee8fe119d80ad372c4e7badb84c493cc75983f9d3bc8bde0f", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:certifi, "~> 2.0", [hex: :certifi, repo: "hexpm", optional: true]}], "hexpm", "6e7f1d619b5f61dfabd0a20aa268e575572b542ac31723293a4c1a567d5ef040"}, "erlex": {:hex, :erlex, "0.2.7", "810e8725f96ab74d17aac676e748627a07bc87eb950d2b83acd29dc047a30595", [:mix], [], "hexpm", "3ed95f79d1a844c3f6bf0cea61e0d5612a42ce56da9c03f01df538685365efb0"}, "escape": {:hex, :escape, "0.1.0", "548edab75e6e6938b1e199ef59cb8e504bcfd3bcf83471d4ae9a3c7a7a3c7d45", [:mix], [], "hexpm", "a5d8e92db4677155df54bc1306d401b5233875d570d474201db03cb3047491cd"}, + "event_bus": {:hex, :event_bus, "1.7.0", "29a36fc09e8c4463c82206b6a300fa1d61cf4baf9a7b4e7cf0c3efb99c73998e", [:mix], [], "hexpm", "e556470f49f53060a0696c4bad81341252685011afc69eda25032c8a3a86eb2e"}, "ex2ms": {:hex, :ex2ms, "1.7.0", "45b9f523d0b777667ded60070d82d871a37e294f0b6c5b8eca86771f00f82ee1", [:mix], [], "hexpm", "2589eee51f81f1b1caa6d08c990b1ad409215fe6f64c73f73c67d36ed10be827"}, "exleveldb": {:hex, :exleveldb, "0.14.0", "8e9353bbce38482d6971d254c6b98ceb50f3f179c94732b5d17db1be426fca18", [:mix], [{:eleveldb, "~> 2.2.20", [hex: :eleveldb, repo: "hexpm", optional: false]}], "hexpm", "803cd3b4c826a1e17e7e28f6afe224837a743b475e1a48336f186af3dd8636ad"}, "expo": {:hex, :expo, "0.5.2", "beba786aab8e3c5431813d7a44b828e7b922bfa431d6bfbada0904535342efe2", [:mix], [], "hexpm", "8c9bfa06ca017c9cb4020fabe980bc7fdb1aaec059fd004c2ab3bff03b1c599c"}, @@ -68,6 +69,7 @@ "sentry": {:hex, :sentry, "10.8.1", "aa45309785e1521416225adb16e0b4d8b957578804527f3c7babb6fefbc5e456", [:mix], [{:hackney, "~> 1.8", [hex: :hackney, repo: "hexpm", optional: true]}, {:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:nimble_options, "~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_ownership, "~> 0.3.0 or ~> 1.0", [hex: :nimble_ownership, repo: "hexpm", optional: false]}, {:phoenix, "~> 1.6", [hex: :phoenix, repo: "hexpm", optional: true]}, {:phoenix_live_view, "~> 0.20 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.6", [hex: :plug, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: true]}], "hexpm", "495b3cdadad90ba72eef973aa3dec39b3b8b2a362fe87e2f4ef32133ac3b4097"}, "snappyer": {:hex, :snappyer, "1.2.10", "023e9ae00e969b0997208b5de7d3b12bb46ec6bc5411e8dc53e7b3f435b8f0fd", [:rebar3], [], "hexpm", "f55bd9ed147e7163cb3acd1e431a7ff2c9e31ceacbb8308786094fb64551c284"}, "sourceror": {:hex, :sourceror, "1.5.0", "3e65d5fbb1a8e2864ad6411262c8018fee73474f5789dda12285c82999253d5d", [:mix], [], "hexpm", "4a32b5d189d8453f73278c15712f8731b89e9211e50726b798214b303b51bfc7"}, + "sse": {:hex, :sse, "0.4.0", "f17affacbc4618bac07590eec7bff849aa27d1f71bb3d41da3fd3cb255d16910", [:mix], [{:event_bus, ">= 1.6.0", [hex: :event_bus, repo: "hexpm", optional: false]}, {:plug, ">= 1.4.5", [hex: :plug, repo: "hexpm", optional: false]}], "hexpm", "2dfb9923725b9d5292763c3de9b7798713f5771522823e961a250204917d7efb"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "statistex": {:hex, :statistex, "1.0.0", "f3dc93f3c0c6c92e5f291704cf62b99b553253d7969e9a5fa713e5481cd858a5", [:mix], [], "hexpm", "ff9d8bee7035028ab4742ff52fc80a2aa35cece833cf5319009b52f1b5a86c27"}, "stream_data": {:hex, :stream_data, "1.1.2", "05499eaec0443349ff877aaabc6e194e82bda6799b9ce6aaa1aadac15a9fdb4d", [:mix], [], "hexpm", "129558d2c77cbc1eb2f4747acbbea79e181a5da51108457000020a906813a1a9"}, From ae3aa566ad5a67b8c63da48d20a4a2e5d9015052 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Thu, 23 Jan 2025 17:04:36 -0300 Subject: [PATCH 03/13] Initial Event implementation for finalized_checkpoint --- config/config.exs | 2 +- .../v1/{events.ex => events_controller.ex} | 13 ++----- lib/beacon_api/event_listener.ex | 37 +++++++++++++++++++ lib/beacon_api/router.ex | 2 +- .../fork_choice/handlers.ex | 7 +++- network_params.yaml | 3 ++ 6 files changed, 52 insertions(+), 12 deletions(-) rename lib/beacon_api/controllers/v1/{events.ex => events_controller.ex} (66%) create mode 100644 lib/beacon_api/event_listener.ex diff --git a/config/config.exs b/config/config.exs index 1a0b9a6e4..9b4527743 100644 --- a/config/config.exs +++ b/config/config.exs @@ -54,7 +54,7 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do end config :sse, - keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 1000} + keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 5000} config :event_bus, topics: [:finalized_checkpoint] diff --git a/lib/beacon_api/controllers/v1/events.ex b/lib/beacon_api/controllers/v1/events_controller.ex similarity index 66% rename from lib/beacon_api/controllers/v1/events.ex rename to lib/beacon_api/controllers/v1/events_controller.ex index 919994c3d..f85b52511 100644 --- a/lib/beacon_api/controllers/v1/events.ex +++ b/lib/beacon_api/controllers/v1/events_controller.ex @@ -1,28 +1,23 @@ -defmodule BeaconApi.V1.Events do +defmodule BeaconApi.V1.EventsController do use BeaconApi, :controller alias BeaconApi.ApiSpec + alias BeaconApi.EventPubSub alias BeaconApi.Helpers - alias SSE.Chunk require Logger @topic :finalized_checkpoint def open_api_operation(:subscribe), - do: ApiSpec.spec().paths["/eth/v1/events"].get + do: ApiSpec.spec().paths["/eth/v1/events"].get @spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t() def subscribe(conn, _params) do Logger.info("Subscribing to finalized checkpoint events") _finalized_checkpoint = get_current_finalized_checkpoint() - Logger.info("Sending SSE stream") - chunk = %Chunk{data: %{} |> Jason.encode!()} - - Logger.info(chunk |> inspect()) - conn - |> SSE.stream({[@topic], chunk}) + EventPubSub.sse_subscribe(conn, @topic) end defp get_current_finalized_checkpoint() do diff --git a/lib/beacon_api/event_listener.ex b/lib/beacon_api/event_listener.ex new file mode 100644 index 000000000..5cf0f869f --- /dev/null +++ b/lib/beacon_api/event_listener.ex @@ -0,0 +1,37 @@ +defmodule BeaconApi.EventPubSub do + @moduledoc """ + Event listener for aggregating and sending events for SSE subscribers. + + This depends on `event_bus` and `sse`, but it could be easily switched later. + + The idea is to have a single place to publish events, and then a method for a connection to subscribe to them. + """ + + alias EventBus.Model.Event + alias SSE.Chunk + + @type topic() :: atom() + @type event_data() :: any() + + @doc """ + Publish an event to the event bus. + + TODO: We might want a noop if there are no subscribers for a topic. + """ + @spec publish(topic(), event_data()) :: :ok | {:error, atom()} + def publish(:finalized_checkpoint = topic, %{root: root, epoch: epoch}) do + data = %{root: BeaconApi.Utils.hex_encode(root), epoch: epoch} + chunk = %Chunk{data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + end + + def publish(_topic, _event_data), do: {:error, :unsupported_topic} + + @doc """ + Subscribe to a topic for stream events in an sse connection. + """ + @spec sse_subscribe(Plug.Conn.t(), topic(), event_data()) :: Plug.Conn.t() + def sse_subscribe(conn, topic), do: SSE.stream(conn, {[topic], %Chunk{data: []}}) +end diff --git a/lib/beacon_api/router.ex b/lib/beacon_api/router.ex index 02c701ccb..526c0d7da 100644 --- a/lib/beacon_api/router.ex +++ b/lib/beacon_api/router.ex @@ -24,7 +24,7 @@ defmodule BeaconApi.Router do end scope "/events" do - get("/", Events, :subscribe) + get("/", EventsController, :subscribe) end end diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index 6384ed48a..df12d606f 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do """ require Logger + alias BeaconApi.EventPubSub alias LambdaEthereumConsensus.Execution.ExecutionClient alias LambdaEthereumConsensus.ForkChoice alias LambdaEthereumConsensus.StateTransition @@ -281,7 +282,11 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do |> if_then_update( finalized_checkpoint.epoch > store.finalized_checkpoint.epoch, # Update finalized checkpoint - &%Store{&1 | finalized_checkpoint: finalized_checkpoint} + fn store -> + EventPubSub.publish(:finalized_checkpoint, finalized_checkpoint) + + %Store{store | finalized_checkpoint: finalized_checkpoint} + end ) end diff --git a/network_params.yaml b/network_params.yaml index 76f0325e1..13618b8b9 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -1,9 +1,12 @@ participants: - el_type: geth + el_image: ethereum/client-go:v1.14.12 cl_type: lighthouse + cl_image: sigp/lighthouse:v5.3.0 count: 2 validator_count: 32 - el_type: geth + el_image: ethereum/client-go:v1.14.12 cl_type: lambda cl_image: lambda_ethereum_consensus:latest use_separate_vc: false From 7c4481eb78d290c1581273dc01a8f0a03a44d119 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Thu, 23 Jan 2025 19:09:26 -0300 Subject: [PATCH 04/13] Added no-cache to sse testing --- lib/beacon_api/event_listener.ex | 36 +++++++++++++++++++++++++------- 1 file changed, 28 insertions(+), 8 deletions(-) diff --git a/lib/beacon_api/event_listener.ex b/lib/beacon_api/event_listener.ex index 5cf0f869f..1017e8942 100644 --- a/lib/beacon_api/event_listener.ex +++ b/lib/beacon_api/event_listener.ex @@ -7,8 +7,11 @@ defmodule BeaconApi.EventPubSub do The idea is to have a single place to publish events, and then a method for a connection to subscribe to them. """ + require Logger alias EventBus.Model.Event + alias LambdaEthereumConsensus.Store alias SSE.Chunk + alias Types.StateInfo @type topic() :: atom() @type event_data() :: any() @@ -19,12 +22,25 @@ defmodule BeaconApi.EventPubSub do TODO: We might want a noop if there are no subscribers for a topic. """ @spec publish(topic(), event_data()) :: :ok | {:error, atom()} - def publish(:finalized_checkpoint = topic, %{root: root, epoch: epoch}) do - data = %{root: BeaconApi.Utils.hex_encode(root), epoch: epoch} - chunk = %Chunk{data: [Jason.encode!(data)]} - event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} - - EventBus.notify(event) + def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do + with %StateInfo{root: state_root} = Store.BlockStates.get_state_info(block_root) do + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + state: BeaconApi.Utils.hex_encode(state_root), + epoch: epoch, + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + else + nil -> + Logger.error("State not available for block", root: block_root) + + {:error, :state_not_available} + end end def publish(_topic, _event_data), do: {:error, :unsupported_topic} @@ -32,6 +48,10 @@ defmodule BeaconApi.EventPubSub do @doc """ Subscribe to a topic for stream events in an sse connection. """ - @spec sse_subscribe(Plug.Conn.t(), topic(), event_data()) :: Plug.Conn.t() - def sse_subscribe(conn, topic), do: SSE.stream(conn, {[topic], %Chunk{data: []}}) + @spec sse_subscribe(Plug.Conn.t(), topic()) :: Plug.Conn.t() + def sse_subscribe(conn, topic) do + conn + |> Plug.Conn.put_resp_header("cache-control", "no-cache") + |> SSE.stream({[topic], %Chunk{data: []}}) + end end From c4f022eb822cbac6fabd5201e3418b15e2bf7cdd Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Thu, 23 Jan 2025 23:36:17 -0300 Subject: [PATCH 05/13] Fixed idle timeout for sse connections --- config/runtime.exs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/config/runtime.exs b/config/runtime.exs index 66852bc25..58e242c5e 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -150,7 +150,9 @@ config :lambda_ethereum_consensus, EngineApi, # Beacon API config :lambda_ethereum_consensus, BeaconApi.Endpoint, server: enable_beacon_api, - http: [port: beacon_api_port || 4000], + # We use an infinit idle timeout to avoid closing sse connections, if needed we can + # create a separate endpoint for them. + http: [port: beacon_api_port || 4000, protocol_options: [idle_timeout: :infinity]], url: [host: "localhost"], render_errors: [ formats: [json: BeaconApi.ErrorJSON], From 4f4f94b13ea0f9076e8251ff20c669ab253d8102 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Fri, 24 Jan 2025 14:51:07 -0300 Subject: [PATCH 06/13] All around fixes + error codes --- .../controllers/v1/events_controller.ex | 41 +++++++++++++------ .../{event_listener.ex => event_pubsub.ex} | 40 +++++++++++------- 2 files changed, 54 insertions(+), 27 deletions(-) rename lib/beacon_api/{event_listener.ex => event_pubsub.ex} (53%) diff --git a/lib/beacon_api/controllers/v1/events_controller.ex b/lib/beacon_api/controllers/v1/events_controller.ex index f85b52511..ead2ac927 100644 --- a/lib/beacon_api/controllers/v1/events_controller.ex +++ b/lib/beacon_api/controllers/v1/events_controller.ex @@ -3,28 +3,45 @@ defmodule BeaconApi.V1.EventsController do alias BeaconApi.ApiSpec alias BeaconApi.EventPubSub - alias BeaconApi.Helpers + import BeaconApi.EventPubSub, only: [is_implemented_topic: 1] require Logger - @topic :finalized_checkpoint - def open_api_operation(:subscribe), do: ApiSpec.spec().paths["/eth/v1/events"].get @spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t() - def subscribe(conn, _params) do - Logger.info("Subscribing to finalized checkpoint events") - _finalized_checkpoint = get_current_finalized_checkpoint() - - EventPubSub.sse_subscribe(conn, @topic) + def subscribe(conn, %{"topics" => topic}) when is_implemented_topic(topic), + do: EventPubSub.sse_subscribe(conn, topic) + + def subscribe(conn, %{"topics" => not_implemented_topic}) do + error = + Jason.encode!(%{ + code: 400, + message: + "Invalid topic: #{not_implemented_topic}. For now we just support: #{inspect(EventPubSub.implemented_topics())}" + }) + + send_chunked_error(conn, error) end - defp get_current_finalized_checkpoint() do - x = Helpers.finalized_checkpoint() + def subscribe(conn, _params) do + error = + Jason.encode!(%{ + code: 400, + message: "Missing field topics" + }) - Logger.info(x |> inspect()) + send_chunked_error(conn, error) + end - x + defp send_chunked_error(conn, error) do + conn + |> Plug.Conn.send_chunked(400) + |> Plug.Conn.chunk(error) + |> case do + {:ok, conn} -> Plug.Conn.halt(conn) + {:error, _reason} -> Plug.Conn.halt(conn) + end end end diff --git a/lib/beacon_api/event_listener.ex b/lib/beacon_api/event_pubsub.ex similarity index 53% rename from lib/beacon_api/event_listener.ex rename to lib/beacon_api/event_pubsub.ex index 1017e8942..1749d0830 100644 --- a/lib/beacon_api/event_listener.ex +++ b/lib/beacon_api/event_pubsub.ex @@ -2,7 +2,9 @@ defmodule BeaconApi.EventPubSub do @moduledoc """ Event listener for aggregating and sending events for SSE subscribers. - This depends on `event_bus` and `sse`, but it could be easily switched later. + TODO: This depends on `event_bus` and `sse`, but it could be easily switched later: + - `event_bus` we could move to phoenix pubsub + - `sse` we could just implement it ourselves using Plug.Conn.chunk and Plug.Conn.send_chunked The idea is to have a single place to publish events, and then a method for a connection to subscribe to them. """ @@ -16,6 +18,14 @@ defmodule BeaconApi.EventPubSub do @type topic() :: atom() @type event_data() :: any() + # This is also dependant on the already needed event_bus compile time config + @implemented_topics Application.compile_env!(:event_bus, :topics) + + defguard is_implemented_topic(topic) when topic in @implemented_topics + + @spec implemented_topics() :: list(topic()) + def implemented_topics(), do: @implemented_topics + @doc """ Publish an event to the event bus. @@ -23,19 +33,20 @@ defmodule BeaconApi.EventPubSub do """ @spec publish(topic(), event_data()) :: :ok | {:error, atom()} def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do - with %StateInfo{root: state_root} = Store.BlockStates.get_state_info(block_root) do - data = %{ - block: BeaconApi.Utils.hex_encode(block_root), - state: BeaconApi.Utils.hex_encode(state_root), - epoch: epoch, - execution_optimistic: false - } - - chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} - event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} - - EventBus.notify(event) - else + case Store.BlockStates.get_state_info(block_root) do + %StateInfo{root: state_root} -> + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + state: BeaconApi.Utils.hex_encode(state_root), + epoch: epoch, + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + nil -> Logger.error("State not available for block", root: block_root) @@ -51,7 +62,6 @@ defmodule BeaconApi.EventPubSub do @spec sse_subscribe(Plug.Conn.t(), topic()) :: Plug.Conn.t() def sse_subscribe(conn, topic) do conn - |> Plug.Conn.put_resp_header("cache-control", "no-cache") |> SSE.stream({[topic], %Chunk{data: []}}) end end From 135a74107dd98488817e8d085d1c7b4a279c6f00 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Fri, 24 Jan 2025 17:15:05 -0300 Subject: [PATCH 07/13] Patch EventPubsub on upgrades checkpoint test --- test/unit/fork_choice/handlers_test.exs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/test/unit/fork_choice/handlers_test.exs b/test/unit/fork_choice/handlers_test.exs index 19de64872..c1dfdc68b 100644 --- a/test/unit/fork_choice/handlers_test.exs +++ b/test/unit/fork_choice/handlers_test.exs @@ -1,6 +1,8 @@ defmodule Unit.ForkChoice.HandlersTest do use ExUnit.Case + use Patch + alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.Utils.Diff alias Types.Store @@ -44,6 +46,8 @@ defmodule Unit.ForkChoice.HandlersTest do end test "upgrades unrealized checkpoints" do + patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end) + start_time = 0 end_time = start_time + ChainSpec.get("SECONDS_PER_SLOT") * ChainSpec.get("SLOTS_PER_EPOCH") From 55b4cc76b1e3ec0152969105c3154b8cf65ba7c6 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Fri, 24 Jan 2025 17:17:32 -0300 Subject: [PATCH 08/13] Fixed another test reliying in the checkpoints update --- test/unit/beacon_api/beacon_api_v1_test.exs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/test/unit/beacon_api/beacon_api_v1_test.exs b/test/unit/beacon_api/beacon_api_v1_test.exs index 3a9fbfe50..75e4323e5 100644 --- a/test/unit/beacon_api/beacon_api_v1_test.exs +++ b/test/unit/beacon_api/beacon_api_v1_test.exs @@ -158,6 +158,8 @@ defmodule Unit.BeaconApiTest.V1 do test "node identity" do alias LambdaEthereumConsensus.Libp2pPort alias LambdaEthereumConsensus.P2P.Metadata + + patch(BeaconApi.EventPubSub, :publish, fn _, _ -> :ok end) patch(ForkChoice, :get_fork_version, fn -> ChainSpec.get("DENEB_FORK_VERSION") end) start_link_supervised!({Libp2pPort, genesis_time: :os.system_time(:second), store: %Store{}}) From a15e0046ed9bbeb2a95304cdfdd3728bd818f0e3 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Fri, 24 Jan 2025 17:26:12 -0300 Subject: [PATCH 09/13] Added issue to TODO comment --- lib/beacon_api/event_pubsub.ex | 2 +- mix.exs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/beacon_api/event_pubsub.ex b/lib/beacon_api/event_pubsub.ex index 1749d0830..5d6cd1ff5 100644 --- a/lib/beacon_api/event_pubsub.ex +++ b/lib/beacon_api/event_pubsub.ex @@ -2,7 +2,7 @@ defmodule BeaconApi.EventPubSub do @moduledoc """ Event listener for aggregating and sending events for SSE subscribers. - TODO: This depends on `event_bus` and `sse`, but it could be easily switched later: + TODO: (#1368) This depends on `event_bus` and `sse`, but it could be easily switched later: - `event_bus` we could move to phoenix pubsub - `sse` we could just implement it ourselves using Plug.Conn.chunk and Plug.Conn.send_chunked diff --git a/mix.exs b/mix.exs index 63877121c..30efc3fbf 100644 --- a/mix.exs +++ b/mix.exs @@ -65,7 +65,7 @@ defmodule LambdaEthereumConsensus.MixProject do {:prom_ex, "~> 1.11.0"}, {:flama, git: "https://github.com/lambdaclass/ht1223_tracer"}, {:uuid, "~> 1.1"}, - # TODO: We might want to use phoenix_pubsub instead and do our implementation of SSE. + # TODO: (#1368) We might want to use phoenix_pubsub instead and do our implementation of SSE. {:sse, "~> 0.4"}, {:event_bus, ">= 1.6.0"} ] From 924844961f551cb787506199d474a73e8bb8f5d9 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 27 Jan 2025 15:38:52 -0300 Subject: [PATCH 10/13] Added block events to the avaliable events to subscribe --- config/config.exs | 4 +- .../controllers/v1/events_controller.ex | 33 +++++++++------ lib/beacon_api/event_pubsub.ex | 40 +++++++++++++------ .../fork_choice/fork_choice.ex | 2 + 4 files changed, 53 insertions(+), 26 deletions(-) diff --git a/config/config.exs b/config/config.exs index 9b4527743..9478220ac 100644 --- a/config/config.exs +++ b/config/config.exs @@ -54,10 +54,10 @@ if System.get_env("RUSTLER_SKIP_COMPILE") do end config :sse, - keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 5000} + keep_alive: {:system, "SSE_KEEP_ALIVE_IN_MS", 55000} config :event_bus, - topics: [:finalized_checkpoint] + topics: [:finalized_checkpoint, :block] config :mime, :types, %{ "text/event-stream" => ["sse"] diff --git a/lib/beacon_api/controllers/v1/events_controller.ex b/lib/beacon_api/controllers/v1/events_controller.ex index ead2ac927..cfe4033f2 100644 --- a/lib/beacon_api/controllers/v1/events_controller.ex +++ b/lib/beacon_api/controllers/v1/events_controller.ex @@ -4,25 +4,20 @@ defmodule BeaconApi.V1.EventsController do alias BeaconApi.ApiSpec alias BeaconApi.EventPubSub - import BeaconApi.EventPubSub, only: [is_implemented_topic: 1] require Logger def open_api_operation(:subscribe), do: ApiSpec.spec().paths["/eth/v1/events"].get @spec subscribe(Plug.Conn.t(), any) :: Plug.Conn.t() - def subscribe(conn, %{"topics" => topic}) when is_implemented_topic(topic), - do: EventPubSub.sse_subscribe(conn, topic) + def subscribe(conn, %{"topics" => topics}) do + case parse_topics(topics) do + {:ok, topics} -> + EventPubSub.sse_subscribe(conn, topics) - def subscribe(conn, %{"topics" => not_implemented_topic}) do - error = - Jason.encode!(%{ - code: 400, - message: - "Invalid topic: #{not_implemented_topic}. For now we just support: #{inspect(EventPubSub.implemented_topics())}" - }) - - send_chunked_error(conn, error) + {:error, error} -> + send_chunked_error(conn, error) + end end def subscribe(conn, _params) do @@ -35,6 +30,20 @@ defmodule BeaconApi.V1.EventsController do send_chunked_error(conn, error) end + defp parse_topics(topics_string) do + # topics is a string list of topics like "finalized_checkpoint, block" we need to split it + topics = topics_string |> String.split(",") |> Enum.map(&String.trim/1) + + if Enum.all?(topics, &EventPubSub.implemented_topic?/1) do + {:ok, topics} + else + not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1) + + {:error, + "Invalid topic/s #{inspect(not_implemented_topics)}. For now, only #{inspect(EventPubSub.implemented_topics())} are supported."} + end + end + defp send_chunked_error(conn, error) do conn |> Plug.Conn.send_chunked(400) diff --git a/lib/beacon_api/event_pubsub.ex b/lib/beacon_api/event_pubsub.ex index 5d6cd1ff5..616c79d49 100644 --- a/lib/beacon_api/event_pubsub.ex +++ b/lib/beacon_api/event_pubsub.ex @@ -15,30 +15,34 @@ defmodule BeaconApi.EventPubSub do alias SSE.Chunk alias Types.StateInfo - @type topic() :: atom() + # TODO: We need to use atoms here probably, so i need to make a couple of changes yet before merging this PR + @type topic() :: String.t() + @type topics() :: list(topic()) @type event_data() :: any() # This is also dependant on the already needed event_bus compile time config - @implemented_topics Application.compile_env!(:event_bus, :topics) + @implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1) - defguard is_implemented_topic(topic) when topic in @implemented_topics - - @spec implemented_topics() :: list(topic()) + @spec implemented_topics() :: topics() def implemented_topics(), do: @implemented_topics + @spec implemented_topic?(topic()) :: boolean() + def implemented_topic?(topic), do: topic in @implemented_topics + @doc """ Publish an event to the event bus. TODO: We might want a noop if there are no subscribers for a topic. """ - @spec publish(topic(), event_data()) :: :ok | {:error, atom()} + @spec publish(atom(), event_data()) :: :ok | {:error, atom()} def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do case Store.BlockStates.get_state_info(block_root) do %StateInfo{root: state_root} -> data = %{ block: BeaconApi.Utils.hex_encode(block_root), state: BeaconApi.Utils.hex_encode(state_root), - epoch: epoch, + epoch: Integer.to_string(epoch), + # TODO: this is a placeholder, we need to get if the execution is optimistic or not execution_optimistic: false } @@ -54,14 +58,26 @@ defmodule BeaconApi.EventPubSub do end end + def publish(:block = topic, %{root: block_root, slot: slot}) do + data = %{ + block: BeaconApi.Utils.hex_encode(block_root), + slot: Integer.to_string(slot), + # TODO: this is a placeholder, we need to get if the execution is optimistic or not + execution_optimistic: false + } + + chunk = %Chunk{event: topic, data: [Jason.encode!(data)]} + event = %Event{id: UUID.uuid4(), data: chunk, topic: topic} + + EventBus.notify(event) + end + def publish(_topic, _event_data), do: {:error, :unsupported_topic} @doc """ Subscribe to a topic for stream events in an sse connection. """ - @spec sse_subscribe(Plug.Conn.t(), topic()) :: Plug.Conn.t() - def sse_subscribe(conn, topic) do - conn - |> SSE.stream({[topic], %Chunk{data: []}}) - end + @spec sse_subscribe(Plug.Conn.t(), topics()) :: Plug.Conn.t() + def sse_subscribe(conn, topics) when is_list(topics), + do: SSE.stream(conn, {topics, %Chunk{data: []}}) end diff --git a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex index 02384db26..d473b37b6 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/fork_choice.ex @@ -4,6 +4,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do """ require Logger + alias BeaconApi.EventPubSub alias LambdaEthereumConsensus.Execution.ExecutionChain alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.ForkChoice.Head @@ -65,6 +66,7 @@ defmodule LambdaEthereumConsensus.ForkChoice do |> tap(fn store -> StoreDb.persist_store(store) Logger.info("[Fork choice] Added new block", slot: slot, root: block_root) + EventPubSub.publish(:block, %{root: block_root, slot: slot}) Logger.info("[Fork choice] Recomputed head", slot: store.head_slot, From 56ec122442c1970a73de3567d96cd89afeb4b0e5 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 27 Jan 2025 16:55:48 -0300 Subject: [PATCH 11/13] Fix a small comment regarding topics as strings or atoms --- lib/beacon_api/controllers/v1/events_controller.ex | 7 +++---- lib/beacon_api/event_pubsub.ex | 10 +++++----- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/lib/beacon_api/controllers/v1/events_controller.ex b/lib/beacon_api/controllers/v1/events_controller.ex index cfe4033f2..c52ea5154 100644 --- a/lib/beacon_api/controllers/v1/events_controller.ex +++ b/lib/beacon_api/controllers/v1/events_controller.ex @@ -31,14 +31,13 @@ defmodule BeaconApi.V1.EventsController do end defp parse_topics(topics_string) do - # topics is a string list of topics like "finalized_checkpoint, block" we need to split it + # topics is a string list in the form of: "finalized_checkpoint, block" we need to split it topics = topics_string |> String.split(",") |> Enum.map(&String.trim/1) + not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1) - if Enum.all?(topics, &EventPubSub.implemented_topic?/1) do + if Enum.empty?(not_implemented_topics) do {:ok, topics} else - not_implemented_topics = Enum.reject(topics, &EventPubSub.implemented_topic?/1) - {:error, "Invalid topic/s #{inspect(not_implemented_topics)}. For now, only #{inspect(EventPubSub.implemented_topics())} are supported."} end diff --git a/lib/beacon_api/event_pubsub.ex b/lib/beacon_api/event_pubsub.ex index 616c79d49..cd66405e7 100644 --- a/lib/beacon_api/event_pubsub.ex +++ b/lib/beacon_api/event_pubsub.ex @@ -15,26 +15,26 @@ defmodule BeaconApi.EventPubSub do alias SSE.Chunk alias Types.StateInfo - # TODO: We need to use atoms here probably, so i need to make a couple of changes yet before merging this PR - @type topic() :: String.t() + @type topic() :: String.t() | atom() @type topics() :: list(topic()) @type event_data() :: any() - # This is also dependant on the already needed event_bus compile time config + # This is also dependant on the already needed event_bus compile time config, we maintain them as strings for convienience @implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1) @spec implemented_topics() :: topics() def implemented_topics(), do: @implemented_topics @spec implemented_topic?(topic()) :: boolean() - def implemented_topic?(topic), do: topic in @implemented_topics + def implemented_topic?(topic) when is_atom(topic), do: implemented_topic?(Atom.to_string(topic)) + def implemented_topic?(topic) when is_binary(topic), do: topic in @implemented_topics @doc """ Publish an event to the event bus. TODO: We might want a noop if there are no subscribers for a topic. """ - @spec publish(atom(), event_data()) :: :ok | {:error, atom()} + @spec publish(topic(), event_data()) :: :ok | {:error, atom()} def publish(:finalized_checkpoint = topic, %{root: block_root, epoch: epoch}) do case Store.BlockStates.get_state_info(block_root) do %StateInfo{root: state_root} -> From 5724ced1fd000302797266334f541f960476c047 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 27 Jan 2025 17:36:42 -0300 Subject: [PATCH 12/13] Fix a small credo issue --- lib/beacon_api/event_pubsub.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/beacon_api/event_pubsub.ex b/lib/beacon_api/event_pubsub.ex index cd66405e7..8f26c1875 100644 --- a/lib/beacon_api/event_pubsub.ex +++ b/lib/beacon_api/event_pubsub.ex @@ -19,7 +19,8 @@ defmodule BeaconApi.EventPubSub do @type topics() :: list(topic()) @type event_data() :: any() - # This is also dependant on the already needed event_bus compile time config, we maintain them as strings for convienience + # This is also dependant on the already needed event_bus compile time config, + # we maintain them as strings for convienience @implemented_topics Application.compile_env!(:event_bus, :topics) |> Enum.map(&Atom.to_string/1) @spec implemented_topics() :: topics() From 77a6f3153b90554df0b396fe95275933451771b9 Mon Sep 17 00:00:00 2001 From: Rodrigo Oliveri Date: Mon, 27 Jan 2025 17:52:56 -0300 Subject: [PATCH 13/13] Remove unneeded diff --- lib/beacon_api/helpers.ex | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/beacon_api/helpers.ex b/lib/beacon_api/helpers.ex index 9fa9658a8..fc5511bc5 100644 --- a/lib/beacon_api/helpers.ex +++ b/lib/beacon_api/helpers.ex @@ -161,9 +161,6 @@ defmodule BeaconApi.Helpers do end end - @spec finalized_checkpoint() :: Types.Checkpoint.t() - def finalized_checkpoint(), do: ForkChoice.get_finalized_checkpoint() - @spec get_state_root(Types.root()) :: Types.root() | nil def get_state_root(root) do with %{} = block <- Blocks.get_block(root) do