Skip to content

Commit

Permalink
better handled lattice prefix
Browse files Browse the repository at this point in the history
Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed May 16, 2023
1 parent 2afd300 commit d7050b3
Show file tree
Hide file tree
Showing 13 changed files with 79 additions and 104 deletions.
57 changes: 27 additions & 30 deletions host_core/lib/host_core/actors/actor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ defmodule HostCore.Actors.ActorSupervisor do
@spec start_actor(
bytes :: binary(),
host_id :: String.t(),
lattice_prefix :: String.t(),
oci :: String.t(),
count :: integer(),
annotations :: map()
) :: {:error, any} | {:ok, [pid()]}
def start_actor(bytes, host_id, lattice_prefix, oci \\ "", count \\ 1, annotations \\ %{})
def start_actor(bytes, host_id, oci \\ "", count \\ 1, annotations \\ %{})
when is_binary(bytes) do
Tracer.with_span "Starting Actor" do
Tracer.set_attribute("actor_ref", oci)
Expand All @@ -45,9 +44,18 @@ defmodule HostCore.Actors.ActorSupervisor do
source = HostCore.Policy.Manager.default_source()
claims_result = get_claims(bytes, oci)

with {:ok, {pid, _}} <- VirtualHost.lookup(host_id),
config <- VirtualHost.config(pid),
labels <- VirtualHost.labels(pid),
# If we can't lookup the host ID, we can't start the actor. Shouldn't
# reach this point but just in case.
host_pid =
case VirtualHost.lookup(host_id) do
{:ok, {pid, _config}} -> pid
:error -> nil
end

config = VirtualHost.config(host_pid)
labels = VirtualHost.labels(host_pid)

with true <- is_pid(host_pid),
{:ok, claims} <- claims_result,
target <- %{
publicKey: claims.public_key,
Expand All @@ -71,27 +79,16 @@ defmodule HostCore.Actors.ActorSupervisor do
annotations,
pids |> length(),
host_id,
lattice_prefix
config.lattice_prefix
)

{:ok, pids}
else
# Could not lookup host by ID
:error ->
false ->
error = "Host not found"
Tracer.set_status(:error, error)

public_key = public_key_from_claims_result(claims_result)

publish_actors_start_failed(
public_key,
oci,
annotations,
host_id,
lattice_prefix,
error
)

{:error, "Failed to find host #{host_id}"}

# Policy server denied starting actor
Expand All @@ -106,7 +103,7 @@ defmodule HostCore.Actors.ActorSupervisor do
oci,
annotations,
host_id,
lattice_prefix,
config.lattice_prefix,
error
)

Expand All @@ -124,7 +121,7 @@ defmodule HostCore.Actors.ActorSupervisor do
oci,
annotations,
host_id,
lattice_prefix,
config.lattice_prefix,
error
)

Expand Down Expand Up @@ -213,20 +210,20 @@ defmodule HostCore.Actors.ActorSupervisor do
end)
end

def start_actor_from_ref(host_id, ref, lattice_prefix, count \\ 1, annotation \\ %{}) do
def start_actor_from_ref(host_id, ref, count \\ 1, annotation \\ %{}) do
cond do
String.starts_with?(ref, "bindle://") ->
start_actor_from_bindle(host_id, ref, lattice_prefix, count, annotation)
start_actor_from_bindle(host_id, ref, count, annotation)

String.starts_with?(ref, "file://") ->
start_actor_from_file(host_id, ref, lattice_prefix, count, annotation)
start_actor_from_file(host_id, ref, count, annotation)

true ->
start_actor_from_oci(host_id, ref, lattice_prefix, count, annotation)
start_actor_from_oci(host_id, ref, count, annotation)
end
end

def start_actor_from_oci(host_id, ref, lattice_prefix, count \\ 1, annotations \\ %{}) do
def start_actor_from_oci(host_id, ref, count \\ 1, annotations \\ %{}) do
Tracer.with_span "Starting Actor from OCI", kind: :server do
Tracer.set_attribute("host_id", host_id)
Tracer.set_attribute("oci_ref", ref)
Expand Down Expand Up @@ -256,12 +253,12 @@ defmodule HostCore.Actors.ActorSupervisor do

bytes
|> IO.iodata_to_binary()
|> start_actor(host_id, lattice_prefix, ref, count, annotations)
|> start_actor(host_id, ref, count, annotations)
end
end
end

def start_actor_from_bindle(host_id, bindle_id, lattice_prefix, count \\ 1, annotations \\ %{}) do
def start_actor_from_bindle(host_id, bindle_id, count \\ 1, annotations \\ %{}) do
Tracer.with_span "Starting Actor from Bindle", kind: :server do
creds = VirtualHost.get_creds(host_id, :bindle, bindle_id)

Expand All @@ -284,12 +281,12 @@ defmodule HostCore.Actors.ActorSupervisor do

bytes
|> IO.iodata_to_binary()
|> start_actor(host_id, lattice_prefix, bindle_id, count, annotations)
|> start_actor(host_id, bindle_id, count, annotations)
end
end
end

def start_actor_from_file(host_id, fileref, lattice_prefix, count \\ 1, annotations \\ %{}) do
def start_actor_from_file(host_id, fileref, count \\ 1, annotations \\ %{}) do
config = VirtualHost.config(host_id)

if config.enable_actor_from_fs do
Expand All @@ -307,7 +304,7 @@ defmodule HostCore.Actors.ActorSupervisor do

{:ok, binary} ->
binary
|> start_actor(host_id, lattice_prefix, fileref, count, annotations)
|> start_actor(host_id, fileref, count, annotations)
end
end
else
Expand Down
7 changes: 3 additions & 4 deletions host_core/lib/host_core/control_interface/host_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ defmodule HostCore.ControlInterface.HostServer do
Tracer.set_attribute("host_id", host_id)
Tracer.set_attribute("lattice_id", prefix)

res =
ActorSupervisor.start_actor_from_ref(host_id, actor_ref, prefix, count, annotations)
res = ActorSupervisor.start_actor_from_ref(host_id, actor_ref, count, annotations)

case res do
{:ok, _pid} ->
Expand Down Expand Up @@ -135,7 +134,7 @@ defmodule HostCore.ControlInterface.HostServer do

# Scale Actor
# input: #{"actor_id" => "...", "actor_ref" => "...", "count" => 5}
defp handle_request({"cmd", host_id, "scale"}, body, _reply_to, prefix) do
defp handle_request({"cmd", host_id, "scale"}, body, _reply_to, _prefix) do
with {:ok, scale_request} <- Jason.decode(body),
true <- has_values(scale_request, ["actor_id", "actor_ref", "count"]) do
actor_id = scale_request["actor_id"]
Expand All @@ -148,7 +147,7 @@ defmodule HostCore.ControlInterface.HostServer do
Tracer.set_current_span(ctx)

Tracer.with_span "Handle Scale Actor Request (ctl)", kind: :server do
case ActorSupervisor.scale_actor(host_id, prefix, actor_id, count, actor_ref) do
case ActorSupervisor.scale_actor(host_id, actor_id, count, actor_ref) do
{:error, err} ->
Logger.error("Error scaling actor #{actor_id}: #{err}", actor_id: actor_id)

Expand Down
21 changes: 12 additions & 9 deletions host_core/lib/host_core/vhost/virtual_host.ex
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ defmodule HostCore.Vhost.VirtualHost do
to look the configuration up in the ETS cache. This function no longer makes
a GenServer call
"""
@spec config(pid()) :: nil | HostCore.Vhost.Configuration.t()
@spec config(pid() | String.t() | nil) :: nil | HostCore.Vhost.Configuration.t()
def config(pid) when is_pid(pid) do
case Registry.keys(Registry.HostRegistry, pid) do
[host_id] ->
Expand All @@ -252,7 +252,6 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

@spec config(String.t()) :: nil | HostCore.Vhost.Configuration.t()
def config(host_id) when is_binary(host_id) do
case :ets.lookup(:vhost_config_table, host_id) do
[{_, config}] ->
Expand All @@ -264,12 +263,16 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def config(_unknown) do
nil
end

@doc """
Returns the labels associated with the vhost by looking it up on the
ETS cache. Does NOT make a GenServer call
"""
@spec labels(pid()) :: map()
def labels(pid) do
@spec labels(pid() | nil) :: map()
def labels(pid) when is_pid(pid) do
case config(pid) do
nil ->
%{}
Expand All @@ -279,6 +282,10 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def labels(_unknown) do
nil
end

def generate_ping_reply(pid) do
GenServer.call(pid, :generate_ping_reply)
end
Expand Down Expand Up @@ -568,10 +575,6 @@ defmodule HostCore.Vhost.VirtualHost do
end

defp start_autostart_actor(actor, state) do
ActorSupervisor.start_actor_from_ref(
state.config.host_key,
state.config.lattice_prefix,
actor
)
ActorSupervisor.start_actor_from_ref(state.config.host_key, actor)
end
end
2 changes: 1 addition & 1 deletion host_core/test/e2e/controlinterface_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule HostCore.E2E.ControlInterfaceTest do
on_exit(fn -> cleanup(pid, config) end)

{:ok, bytes} = File.read(@echo_path)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key, config.lattice_prefix)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key)

prefix = config.lattice_prefix
topic = "wasmbus.ctl.#{prefix}.get.claims"
Expand Down
4 changes: 2 additions & 2 deletions host_core/test/e2e/echo_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule HostCore.E2E.EchoTest do
on_exit(fn -> cleanup(pid, config) end)

{:ok, bytes} = File.read(@echo_path)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key, config.lattice_prefix)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key)

:ok = HostCoreTest.EventWatcher.wait_for_actor_start(evt_watcher, @echo_key)

Expand Down Expand Up @@ -89,7 +89,7 @@ defmodule HostCore.E2E.EchoTest do
on_exit(fn -> cleanup(pid, config) end)

{:ok, bytes} = File.read(@echo_unpriv_path)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key, config.lattice_prefix)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key)
:ok = HostCoreTest.EventWatcher.wait_for_actor_start(evt_watcher, @echo_unpriv_key)

actor_count = actor_count(config.host_key, @echo_unpriv_key)
Expand Down
4 changes: 2 additions & 2 deletions host_core/test/e2e/kvcounter_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ defmodule HostCore.E2E.KVCounterTest do
# :timer.sleep(6000)

{:ok, bytes} = File.read(@kvcounter_path)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key, config.lattice_prefix)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key)

:ok = HostCoreTest.EventWatcher.wait_for_actor_start(evt_watcher, @kvcounter_key)

Expand Down Expand Up @@ -141,7 +141,7 @@ defmodule HostCore.E2E.KVCounterTest do
on_exit(fn -> cleanup(pid, config) end)

{:ok, bytes} = File.read(@kvcounter_unpriv_path)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key, config.lattice_prefix)
{:ok, _pid} = ActorSupervisor.start_actor(bytes, config.host_key)
:ok = HostCoreTest.EventWatcher.wait_for_actor_start(evt_watcher, @kvcounter_unpriv_key)

# NOTE: Link definitions are put _before_ providers are started so that they receive
Expand Down

0 comments on commit d7050b3

Please sign in to comment.