diff --git a/lib/livebook/session.ex b/lib/livebook/session.ex index fdb8395920e..1012294f495 100644 --- a/lib/livebook/session.ex +++ b/lib/livebook/session.ex @@ -222,6 +222,16 @@ defmodule Livebook.Session do GenServer.call(pid, {:register_client, client_pid, user}, @timeout) end + @doc """ + Resets the auto shutdown timer, if ticking. + + When the session has connected clients, nothing changes. + """ + @spec reset_auto_shutdown(pid()) :: :ok + def reset_auto_shutdown(pid) do + GenServer.cast(pid, :reset_auto_shutdown) + end + @doc """ Returns data of the given session. """ @@ -959,25 +969,31 @@ defmodule Livebook.Session do defp schedule_auto_shutdown(state) do client_count = map_size(state.data.clients_map) - case {client_count, state.auto_shutdown_timer_ref} do - {0, nil} when state.auto_shutdown_ms != nil -> + cond do + client_count == 0 and state.auto_shutdown_timer_ref == nil and state.auto_shutdown_ms != nil -> timer_ref = Process.send_after(self(), :close, state.auto_shutdown_ms) %{state | auto_shutdown_timer_ref: timer_ref} - {client_count, timer_ref} when client_count > 0 and timer_ref != nil -> - if Process.cancel_timer(timer_ref) == false do - receive do - :close -> :ok - end - end - - %{state | auto_shutdown_timer_ref: nil} + client_count > 0 -> + cancel_auto_shutdown_timer(state) - _ -> + true -> state end end + defp cancel_auto_shutdown_timer(%{auto_shutdown_timer_ref: nil} = state), do: state + + defp cancel_auto_shutdown_timer(state) do + if Process.cancel_timer(state.auto_shutdown_timer_ref) == false do + receive do + :close -> :ok + end + end + + %{state | auto_shutdown_timer_ref: nil} + end + @impl true def handle_continue(:app_init, state) do cell_ids = Data.cell_ids_for_full_evaluation(state.data, []) @@ -1091,6 +1107,13 @@ defmodule Livebook.Session do end @impl true + def handle_cast(:reset_auto_shutdown, state) do + {:noreply, + state + |> cancel_auto_shutdown_timer() + |> schedule_auto_shutdown()} + end + def handle_cast({:set_notebook_attributes, client_pid, attrs}, state) do client_id = client_id(state, client_pid) operation = {:set_notebook_attributes, client_id, attrs} @@ -2466,7 +2489,7 @@ defmodule Livebook.Session do status = state.data.app_data.status send(state.app_pid, {:app_status_changed, state.session_id, status}) - notify_update(state) + state end defp handle_action(state, :app_recover) do diff --git a/lib/livebook_web/errors.ex b/lib/livebook_web/errors.ex new file mode 100644 index 00000000000..11f6326c771 --- /dev/null +++ b/lib/livebook_web/errors.ex @@ -0,0 +1,7 @@ +defmodule LivebookWeb.NotFoundError do + defexception [:message, plug_status: 404] +end + +defmodule LivebookWeb.BadRequestError do + defexception [:message, plug_status: 400] +end diff --git a/lib/livebook_web/errors/not_found_error.ex b/lib/livebook_web/errors/not_found_error.ex deleted file mode 100644 index 414880c2ec0..00000000000 --- a/lib/livebook_web/errors/not_found_error.ex +++ /dev/null @@ -1,3 +0,0 @@ -defmodule LivebookWeb.NotFoundError do - defexception [:message, plug_status: 404] -end diff --git a/lib/livebook_web/plugs/proxy_plug.ex b/lib/livebook_web/plugs/proxy_plug.ex index 5f1d266c0fd..4491be81660 100644 --- a/lib/livebook_web/plugs/proxy_plug.ex +++ b/lib/livebook_web/plugs/proxy_plug.ex @@ -11,24 +11,43 @@ defmodule LivebookWeb.ProxyPlug do @impl true def call(%{path_info: ["sessions", id, "proxy" | path_info]} = conn, _opts) do session = fetch_session!(id) + Livebook.Session.reset_auto_shutdown(session.pid) proxy_handler_spec = fetch_proxy_handler_spec!(session) conn = prepare_conn(conn, path_info, ["sessions", id, "proxy"]) - conn = call_proxy_handler(proxy_handler_spec, conn) - halt(conn) + call_proxy_handler(proxy_handler_spec, conn) end def call(%{path_info: ["apps", slug, id, "proxy" | path_info]} = conn, _opts) do app = fetch_app!(slug) unless Enum.any?(app.sessions, &(&1.id == id)) do - raise NotFoundError, "could not find an app session matching #{inspect(id)}" + raise NotFoundError, "could not find an app session with id #{inspect(id)}" end session = fetch_session!(id) + Livebook.Session.reset_auto_shutdown(session.pid) + await_app_session_ready(app, session.id) + proxy_handler_spec = fetch_proxy_handler_spec!(session) + conn = prepare_conn(conn, path_info, ["apps", slug, id, "proxy"]) + call_proxy_handler(proxy_handler_spec, conn) + end + + def call(%{path_info: ["apps", slug, "proxy" | path_info]} = conn, _opts) do + app = fetch_app!(slug) + + if app.multi_session do + raise LivebookWeb.BadRequestError, + "the requested app is multi-session. In order to send requests to this app," <> + " you need to start a session and use its specific URL" + end + + session_id = Livebook.App.get_session_id(app.pid) + {:ok, session} = Livebook.Sessions.fetch_session(session_id) + Livebook.Session.reset_auto_shutdown(session.pid) + await_app_session_ready(app, session.id) proxy_handler_spec = fetch_proxy_handler_spec!(session) conn = prepare_conn(conn, path_info, ["apps", slug, "proxy"]) - conn = call_proxy_handler(proxy_handler_spec, conn) - halt(conn) + call_proxy_handler(proxy_handler_spec, conn) end def call(conn, _opts) do @@ -38,21 +57,27 @@ defmodule LivebookWeb.ProxyPlug do defp fetch_app!(slug) do case Livebook.Apps.fetch_app(slug) do {:ok, app} -> app - :error -> raise NotFoundError, "could not find an app matching #{inspect(slug)}" + :error -> raise NotFoundError, "could not find an app with slug #{inspect(slug)}" end end defp fetch_session!(id) do case Livebook.Sessions.fetch_session(id) do {:ok, session} -> session - {:error, _} -> raise NotFoundError, "could not find a session matching #{id}" + {:error, _} -> raise NotFoundError, "could not find a session with id #{id}" end end defp fetch_proxy_handler_spec!(session) do case Livebook.Session.fetch_proxy_handler_spec(session.pid) do - {:ok, pid} -> pid - {:error, _} -> raise NotFoundError, "could not find a kino proxy running" + {:ok, pid} -> + pid + + {:error, _} -> + raise NotFoundError, + "the session does not listen to proxied requests." <> + " See the Kino.Proxy documentation to learn about defining" <> + " custom request handlers" end end @@ -62,6 +87,32 @@ defmodule LivebookWeb.ProxyPlug do defp call_proxy_handler(proxy_handler_spec, conn) do {module, function, args} = proxy_handler_spec - apply(module, function, args ++ [conn]) + conn = apply(module, function, args ++ [conn]) + halt(conn) + end + + defp await_app_session_ready(app, session_id) do + unless session_ready?(app, session_id) do + Livebook.App.subscribe(app.slug) + await_session_execution_loop(app, session_id) + Livebook.App.unsubscribe(app.slug) + end + end + + defp await_session_execution_loop(%{slug: slug}, session_id) do + receive do + {:app_updated, %{slug: ^slug} = app} -> + unless session_ready?(app, session_id) do + await_session_execution_loop(app, session_id) + end + end + end + + defp session_ready?(app, session_id) do + if session = Enum.find(app.sessions, &(&1.id == session_id)) do + session.app_status.execution != :executing + else + false + end end end diff --git a/test/livebook/proxy_test.exs b/test/livebook/proxy_test.exs index ae533db415f..71056e8fd4e 100644 --- a/test/livebook/proxy_test.exs +++ b/test/livebook/proxy_test.exs @@ -78,6 +78,58 @@ defmodule Livebook.ProxyTest do assert text_response(put(conn, url), 200) == "used PUT method" assert text_response(patch(conn, url), 200) == "used PATCH method" assert text_response(delete(conn, url), 200) == "used DELETE method" + + # Generic path also works for single-session apps + url = "/apps/#{slug}/proxy/" + + assert text_response(get(conn, url), 200) == "used GET method" + end + + test "waits for the session to be executed before attempting the request", %{conn: conn} do + slug = Livebook.Utils.random_short_id() + + app_settings = %{ + Notebook.AppSettings.new() + | slug: slug, + access_type: :public, + auto_shutdown_ms: 5_000 + } + + notebook = %{proxy_notebook() | app_settings: app_settings} + + Livebook.Apps.subscribe() + pid = deploy_notebook_sync(notebook) + + assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}} + + # The app is configured with auto shutdown, so the session will + # start only once requested. We should wait until it executes + # and then proxy the request as usual + url = "/apps/#{slug}/proxy/" + + assert text_response(get(conn, url), 200) == "used GET method" + end + + test "returns error when requesting generic path for multi-session app", %{conn: conn} do + slug = Livebook.Utils.random_short_id() + + app_settings = %{ + Notebook.AppSettings.new() + | slug: slug, + access_type: :public, + multi_session: true + } + + notebook = %{proxy_notebook() | app_settings: app_settings} + + Livebook.Apps.subscribe() + pid = deploy_notebook_sync(notebook) + + assert_receive {:app_created, %{pid: ^pid, slug: ^slug, sessions: []}} + + assert_error_sent 400, fn -> + get(conn, "/apps/#{slug}/proxy/foo/bar") + end end end