Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added actors started and actors stopped events #617

Merged
merged 9 commits into from
May 16, 2023
4 changes: 0 additions & 4 deletions host_core/lib/host_core/actors/actor_module.ex
Expand Up @@ -779,10 +779,6 @@ defmodule HostCore.Actors.ActorModule do
}
|> CloudEvent.new("actor_started", host_id)
|> CloudEvent.publish(lattice_prefix)

# topic = "#{@event_prefix}.#{lattice_prefix}"

# HostCore.Nats.safe_pub(HostCore.Nats.control_connection(lattice_prefix), topic, msg)
end

def publish_actor_updated(prefix, host_id, actor_pk, revision, instance_id) do
Expand Down
187 changes: 176 additions & 11 deletions host_core/lib/host_core/actors/actor_supervisor.ex
Expand Up @@ -11,6 +11,7 @@ defmodule HostCore.Actors.ActorSupervisor do

alias HostCore.Actors.ActorModule
alias HostCore.Actors.ActorRpcSupervisor
alias HostCore.CloudEvent
alias HostCore.Vhost.VirtualHost
alias HostCore.WasmCloud.Native

Expand Down Expand Up @@ -41,35 +42,91 @@ defmodule HostCore.Actors.ActorSupervisor do
Logger.debug("Start actor request received for #{oci}", oci_ref: oci)

source = HostCore.Policy.Manager.default_source()
claims_result = get_claims(bytes, oci)

# If we can't lookup the host ID, we can't start the actor. Shouldn't
# reach this point but just in case.
host_pid =
case VirtualHost.lookup(host_id) do
{:ok, {pid, _config}} -> pid
:error -> nil
end

config = VirtualHost.config(host_pid)
labels = VirtualHost.labels(host_pid)

with {:ok, {pid, _}} <- VirtualHost.lookup(host_id),
config <- VirtualHost.config(pid),
labels <- VirtualHost.labels(pid),
{:ok, claims} <- get_claims(bytes, oci),
with true <- is_pid(host_pid),
# Reversed boolean here so we can catch the error in one else block
true <- !is_nil(config),
{:ok, claims} <- claims_result,
brooksmtownsend marked this conversation as resolved.
Show resolved Hide resolved
target <- %{
publicKey: claims.public_key,
issuer: claims.issuer,
contractId: nil,
linkName: nil
},
# Validate policy
%{permitted: true} <-
HostCore.Policy.Manager.evaluate_action(config, labels, source, target, @start_actor),
# Ensure no other OCI reference is running for this actor ID
{:ok} <- check_other_oci_already_running(oci, claims.public_key, host_id),
# Start actors
pids <- start_actor_instances(claims, bytes, oci, annotations, host_id, count) do
Tracer.add_event("Actor(s) Started", [])
Tracer.set_status(:ok, "")

publish_actors_started(
claims,
oci,
annotations,
pids |> length(),
host_id,
config.lattice_prefix
)

{:ok, pids}
else
%{permitted: false, message: message, requestId: request_id} ->
Tracer.set_status(:error, "Policy denied starting actor, request: #{request_id}")
{:error, "Starting actor denied: #{message}"}
# Could not lookup host or config by ID
false ->
error = "Host not found"
Tracer.set_status(:error, error)

:error ->
Tracer.set_status(:error, "Host not found")
{:error, "Failed to find host #{host_id}"}

# Policy server denied starting actor
%{permitted: false, message: message, requestId: request_id} ->
error = "Policy denied starting actor, request: #{request_id}"
Tracer.set_status(:error, error)

public_key = public_key_from_claims_result(claims_result)

publish_actors_start_failed(
public_key,
oci,
annotations,
host_id,
config.lattice_prefix,
error
)

{:error, "Starting actor denied: #{message}"}

# Error extracting claims or starting actor after passing validation
{:error, err} ->
Tracer.set_status(:error, "#{inspect(err)}")
error = "#{inspect(err)}"
Tracer.set_status(:error, error)

public_key = public_key_from_claims_result(claims_result)

publish_actors_start_failed(
public_key,
oci,
annotations,
host_id,
config.lattice_prefix,
error
)

{:error, err}
end
end
Expand All @@ -90,6 +147,16 @@ defmodule HostCore.Actors.ActorSupervisor do
end
end

@spec public_key_from_claims_result(
claims :: {:ok, claims :: map()} | {:error, error :: binary()}
) :: binary()
defp public_key_from_claims_result(claims) do
case claims do
{:ok, claims} -> claims.public_key
{:error, _error} -> "N/A"
end
end

# Returns whether the given actor's public key has at least one
# OCI reference running _other_ than the candidate supplied.
defp check_other_oci_already_running(oci, pk, host_id) do
Expand All @@ -106,6 +173,14 @@ defmodule HostCore.Actors.ActorSupervisor do
end
end

@spec start_actor_instances(
claims :: map(),
bytes :: binary(),
oci :: binary(),
annotations :: map(),
host_id :: binary(),
count :: non_neg_integer()
) :: list()
defp start_actor_instances(claims, bytes, oci, annotations, host_id, count) do
# Start `count` instances of this actor
opts = %{
Expand Down Expand Up @@ -429,11 +504,19 @@ defmodule HostCore.Actors.ActorSupervisor do
end

# Terminate `count` instances of an actor
@spec terminate_actor(
host_id :: binary(),
public_key :: binary(),
count :: non_neg_integer(),
annotations :: map()
) :: :ok
def terminate_actor(host_id, public_key, count, annotations) when count > 0 do
remaining = halt_required_actors(host_id, public_key, annotations, count)

lattice_prefix = VirtualHost.get_lattice_for_host(host_id)
publish_actors_stopped(host_id, public_key, lattice_prefix, count, remaining, annotations)

if remaining <= 0 do
lattice_prefix = VirtualHost.get_lattice_for_host(host_id)
ActorRpcSupervisor.stop_rpc_subscriber(lattice_prefix, public_key)
end

Expand Down Expand Up @@ -470,4 +553,86 @@ defmodule HostCore.Actors.ActorSupervisor do
end

defp get_annotations(pid), do: ActorModule.annotations(pid)

@spec publish_actors_started(
claims :: %{
:call_alias => any,
:caps => any,
:expires_human => any,
:issuer => any,
:name => any,
:not_before_human => any,
:public_key => any,
:revision => any,
:tags => any,
:version => any
},
oci :: String.t(),
annotations :: map(),
count :: non_neg_integer(),
host_id :: String.t(),
lattice_prefix :: String.t()
) :: :ok
def publish_actors_started(claims, oci, annotations, count, host_id, lattice_prefix) do
%{
public_key: claims.public_key,
image_ref: oci,
annotations: annotations,
host_id: host_id,
claims: %{
call_alias: claims.call_alias,
caps: claims.caps,
issuer: claims.issuer,
tags: claims.tags,
name: claims.name,
version: claims.version,
revision: claims.revision,
not_before_human: claims.not_before_human,
expires_human: claims.expires_human
},
count: count
}
|> CloudEvent.new("actors_started", host_id)
|> CloudEvent.publish(lattice_prefix)
end

@spec publish_actors_start_failed(
public_key :: String.t(),
oci :: String.t(),
annotations :: map(),
host_id :: String.t(),
lattice_prefix :: String.t(),
error :: String.t()
) :: :ok
def publish_actors_start_failed(public_key, oci, annotations, host_id, lattice_prefix, error) do
%{
public_key: public_key,
image_ref: oci,
annotations: annotations,
host_id: host_id,
error: error
}
|> CloudEvent.new("actors_start_failed", host_id)
|> CloudEvent.publish(lattice_prefix)
end

@spec publish_actors_stopped(
host_id :: String.t(),
public_key :: String.t(),
lattice_prefix :: String.t(),
count :: non_neg_integer(),
remaining :: non_neg_integer(),
annotations :: map()
) :: :ok
def publish_actors_stopped(host_id, public_key, lattice_prefix, count, remaining, annotations) do
%{
host_id: host_id,
public_key: public_key,
count: count,
remaining: remaining,
annotations: annotations
}
|> CloudEvent.new("actors_stopped", host_id)
|> CloudEvent.publish(lattice_prefix)
end
end
15 changes: 11 additions & 4 deletions host_core/lib/host_core/vhost/virtual_host.ex
Expand Up @@ -241,7 +241,7 @@ defmodule HostCore.Vhost.VirtualHost do
to look the configuration up in the ETS cache. This function no longer makes
a GenServer call
"""
@spec config(pid()) :: nil | HostCore.Vhost.Configuration.t()
@spec config(pid() | String.t() | nil) :: nil | HostCore.Vhost.Configuration.t()
def config(pid) when is_pid(pid) do
case Registry.keys(Registry.HostRegistry, pid) do
[host_id] ->
Expand All @@ -252,7 +252,6 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

@spec config(String.t()) :: nil | HostCore.Vhost.Configuration.t()
def config(host_id) when is_binary(host_id) do
case :ets.lookup(:vhost_config_table, host_id) do
[{_, config}] ->
Expand All @@ -264,12 +263,16 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def config(_unknown) do
nil
end

@doc """
Returns the labels associated with the vhost by looking it up on the
ETS cache. Does NOT make a GenServer call
"""
@spec labels(pid()) :: map()
def labels(pid) do
@spec labels(pid() | nil) :: map()
def labels(pid) when is_pid(pid) do
case config(pid) do
nil ->
%{}
Expand All @@ -279,6 +282,10 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def labels(_unknown) do
nil
end

def generate_ping_reply(pid) do
GenServer.call(pid, :generate_ping_reply)
end
Expand Down
2 changes: 1 addition & 1 deletion host_core/mix.exs
@@ -1,7 +1,7 @@
defmodule HostCore.MixProject do
use Mix.Project

@app_vsn "0.62.1"
@app_vsn "0.63.0"

def project do
[
Expand Down
4 changes: 2 additions & 2 deletions wasmcloud_host/chart/Chart.yaml
Expand Up @@ -8,7 +8,7 @@ keywords:
- wasmCloud
- wasmcloud

home: https://wasmcloud.dev
home: https://wasmcloud.com
sources:
- https://github.com/wasmCloud/wasmcloud-otp
icon: https://github.com/wasmCloud/wasmcloud.com-dev/raw/main/static/images/wasmcloud-logo.png
Expand All @@ -20,4 +20,4 @@ type: application
# in one PR, then open a new PR updating this `version` and `appVersion`.
version: 0.6.11

appVersion: "0.62.1"
appVersion: "0.63.0"
30 changes: 30 additions & 0 deletions wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex
Expand Up @@ -548,6 +548,36 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
state
end

# No-op. No more information to gain from actors_started
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_started"
}
) do
state
end

# No-op. No more information to gain from actors_start_failed
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_start_failed"
}
) do
state
end

# No-op. No more information to gain from actors_stopped
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_stopped"
}
) do
state
end

# Fallthrough event handler to prevent errors for new events
defp process_event(
state,
Expand Down
2 changes: 1 addition & 1 deletion wasmcloud_host/mix.exs
@@ -1,7 +1,7 @@
defmodule WasmcloudHost.MixProject do
use Mix.Project

@app_vsn "0.62.1"
@app_vsn "0.63.0"

def project do
[
Expand Down