Skip to content

Commit

Permalink
Added actors started and actors stopped events (#617)
Browse files Browse the repository at this point in the history
* added actors started and actors stopped events

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* refactored with statement

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* allow new events to passthrough dashboard

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* fixed start_actor references in tests

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* fixed more references in tests

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* ONE MORE TEST

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* addressed dialyzer, verified hotwatch

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* better handled lattice prefix

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

* removed one more prefix, handle config

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>

---------

Signed-off-by: Brooks Townsend <brooks@cosmonic.com>
  • Loading branch information
brooksmtownsend committed May 16, 2023
1 parent 28b0fb4 commit 62de47f
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 23 deletions.
4 changes: 0 additions & 4 deletions host_core/lib/host_core/actors/actor_module.ex
Expand Up @@ -779,10 +779,6 @@ defmodule HostCore.Actors.ActorModule do
}
|> CloudEvent.new("actor_started", host_id)
|> CloudEvent.publish(lattice_prefix)

# topic = "#{@event_prefix}.#{lattice_prefix}"

# HostCore.Nats.safe_pub(HostCore.Nats.control_connection(lattice_prefix), topic, msg)
end

def publish_actor_updated(prefix, host_id, actor_pk, revision, instance_id) do
Expand Down
187 changes: 176 additions & 11 deletions host_core/lib/host_core/actors/actor_supervisor.ex
Expand Up @@ -11,6 +11,7 @@ defmodule HostCore.Actors.ActorSupervisor do

alias HostCore.Actors.ActorModule
alias HostCore.Actors.ActorRpcSupervisor
alias HostCore.CloudEvent
alias HostCore.Vhost.VirtualHost
alias HostCore.WasmCloud.Native

Expand Down Expand Up @@ -41,35 +42,91 @@ defmodule HostCore.Actors.ActorSupervisor do
Logger.debug("Start actor request received for #{oci}", oci_ref: oci)

source = HostCore.Policy.Manager.default_source()
claims_result = get_claims(bytes, oci)

# If we can't lookup the host ID, we can't start the actor. Shouldn't
# reach this point but just in case.
host_pid =
case VirtualHost.lookup(host_id) do
{:ok, {pid, _config}} -> pid
:error -> nil
end

config = VirtualHost.config(host_pid)
labels = VirtualHost.labels(host_pid)

with {:ok, {pid, _}} <- VirtualHost.lookup(host_id),
config <- VirtualHost.config(pid),
labels <- VirtualHost.labels(pid),
{:ok, claims} <- get_claims(bytes, oci),
with true <- is_pid(host_pid),
# Reversed boolean here so we can catch the error in one else block
true <- !is_nil(config),
{:ok, claims} <- claims_result,
target <- %{
publicKey: claims.public_key,
issuer: claims.issuer,
contractId: nil,
linkName: nil
},
# Validate policy
%{permitted: true} <-
HostCore.Policy.Manager.evaluate_action(config, labels, source, target, @start_actor),
# Ensure no other OCI reference is running for this actor ID
{:ok} <- check_other_oci_already_running(oci, claims.public_key, host_id),
# Start actors
pids <- start_actor_instances(claims, bytes, oci, annotations, host_id, count) do
Tracer.add_event("Actor(s) Started", [])
Tracer.set_status(:ok, "")

publish_actors_started(
claims,
oci,
annotations,
pids |> length(),
host_id,
config.lattice_prefix
)

{:ok, pids}
else
%{permitted: false, message: message, requestId: request_id} ->
Tracer.set_status(:error, "Policy denied starting actor, request: #{request_id}")
{:error, "Starting actor denied: #{message}"}
# Could not lookup host or config by ID
false ->
error = "Host not found"
Tracer.set_status(:error, error)

:error ->
Tracer.set_status(:error, "Host not found")
{:error, "Failed to find host #{host_id}"}

# Policy server denied starting actor
%{permitted: false, message: message, requestId: request_id} ->
error = "Policy denied starting actor, request: #{request_id}"
Tracer.set_status(:error, error)

public_key = public_key_from_claims_result(claims_result)

publish_actors_start_failed(
public_key,
oci,
annotations,
host_id,
config.lattice_prefix,
error
)

{:error, "Starting actor denied: #{message}"}

# Error extracting claims or starting actor after passing validation
{:error, err} ->
Tracer.set_status(:error, "#{inspect(err)}")
error = "#{inspect(err)}"
Tracer.set_status(:error, error)

public_key = public_key_from_claims_result(claims_result)

publish_actors_start_failed(
public_key,
oci,
annotations,
host_id,
config.lattice_prefix,
error
)

{:error, err}
end
end
Expand All @@ -90,6 +147,16 @@ defmodule HostCore.Actors.ActorSupervisor do
end
end

@spec public_key_from_claims_result(
claims :: {:ok, claims :: map()} | {:error, error :: binary()}
) :: binary()
defp public_key_from_claims_result(claims) do
case claims do
{:ok, claims} -> claims.public_key
{:error, _error} -> "N/A"
end
end

# Returns whether the given actor's public key has at least one
# OCI reference running _other_ than the candidate supplied.
defp check_other_oci_already_running(oci, pk, host_id) do
Expand All @@ -106,6 +173,14 @@ defmodule HostCore.Actors.ActorSupervisor do
end
end

@spec start_actor_instances(
claims :: map(),
bytes :: binary(),
oci :: binary(),
annotations :: map(),
host_id :: binary(),
count :: non_neg_integer()
) :: list()
defp start_actor_instances(claims, bytes, oci, annotations, host_id, count) do
# Start `count` instances of this actor
opts = %{
Expand Down Expand Up @@ -429,11 +504,19 @@ defmodule HostCore.Actors.ActorSupervisor do
end

# Terminate `count` instances of an actor
@spec terminate_actor(
host_id :: binary(),
public_key :: binary(),
count :: non_neg_integer(),
annotations :: map()
) :: :ok
def terminate_actor(host_id, public_key, count, annotations) when count > 0 do
remaining = halt_required_actors(host_id, public_key, annotations, count)

lattice_prefix = VirtualHost.get_lattice_for_host(host_id)
publish_actors_stopped(host_id, public_key, lattice_prefix, count, remaining, annotations)

if remaining <= 0 do
lattice_prefix = VirtualHost.get_lattice_for_host(host_id)
ActorRpcSupervisor.stop_rpc_subscriber(lattice_prefix, public_key)
end

Expand Down Expand Up @@ -470,4 +553,86 @@ defmodule HostCore.Actors.ActorSupervisor do
end

defp get_annotations(pid), do: ActorModule.annotations(pid)

@spec publish_actors_started(
claims :: %{
:call_alias => any,
:caps => any,
:expires_human => any,
:issuer => any,
:name => any,
:not_before_human => any,
:public_key => any,
:revision => any,
:tags => any,
:version => any
},
oci :: String.t(),
annotations :: map(),
count :: non_neg_integer(),
host_id :: String.t(),
lattice_prefix :: String.t()
) :: :ok
def publish_actors_started(claims, oci, annotations, count, host_id, lattice_prefix) do
%{
public_key: claims.public_key,
image_ref: oci,
annotations: annotations,
host_id: host_id,
claims: %{
call_alias: claims.call_alias,
caps: claims.caps,
issuer: claims.issuer,
tags: claims.tags,
name: claims.name,
version: claims.version,
revision: claims.revision,
not_before_human: claims.not_before_human,
expires_human: claims.expires_human
},
count: count
}
|> CloudEvent.new("actors_started", host_id)
|> CloudEvent.publish(lattice_prefix)
end

@spec publish_actors_start_failed(
public_key :: String.t(),
oci :: String.t(),
annotations :: map(),
host_id :: String.t(),
lattice_prefix :: String.t(),
error :: String.t()
) :: :ok
def publish_actors_start_failed(public_key, oci, annotations, host_id, lattice_prefix, error) do
%{
public_key: public_key,
image_ref: oci,
annotations: annotations,
host_id: host_id,
error: error
}
|> CloudEvent.new("actors_start_failed", host_id)
|> CloudEvent.publish(lattice_prefix)
end

@spec publish_actors_stopped(
host_id :: String.t(),
public_key :: String.t(),
lattice_prefix :: String.t(),
count :: non_neg_integer(),
remaining :: non_neg_integer(),
annotations :: map()
) :: :ok
def publish_actors_stopped(host_id, public_key, lattice_prefix, count, remaining, annotations) do
%{
host_id: host_id,
public_key: public_key,
count: count,
remaining: remaining,
annotations: annotations
}
|> CloudEvent.new("actors_stopped", host_id)
|> CloudEvent.publish(lattice_prefix)
end
end
15 changes: 11 additions & 4 deletions host_core/lib/host_core/vhost/virtual_host.ex
Expand Up @@ -241,7 +241,7 @@ defmodule HostCore.Vhost.VirtualHost do
to look the configuration up in the ETS cache. This function no longer makes
a GenServer call
"""
@spec config(pid()) :: nil | HostCore.Vhost.Configuration.t()
@spec config(pid() | String.t() | nil) :: nil | HostCore.Vhost.Configuration.t()
def config(pid) when is_pid(pid) do
case Registry.keys(Registry.HostRegistry, pid) do
[host_id] ->
Expand All @@ -252,7 +252,6 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

@spec config(String.t()) :: nil | HostCore.Vhost.Configuration.t()
def config(host_id) when is_binary(host_id) do
case :ets.lookup(:vhost_config_table, host_id) do
[{_, config}] ->
Expand All @@ -264,12 +263,16 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def config(_unknown) do
nil
end

@doc """
Returns the labels associated with the vhost by looking it up on the
ETS cache. Does NOT make a GenServer call
"""
@spec labels(pid()) :: map()
def labels(pid) do
@spec labels(pid() | nil) :: map()
def labels(pid) when is_pid(pid) do
case config(pid) do
nil ->
%{}
Expand All @@ -279,6 +282,10 @@ defmodule HostCore.Vhost.VirtualHost do
end
end

def labels(_unknown) do
nil
end

def generate_ping_reply(pid) do
GenServer.call(pid, :generate_ping_reply)
end
Expand Down
2 changes: 1 addition & 1 deletion host_core/mix.exs
@@ -1,7 +1,7 @@
defmodule HostCore.MixProject do
use Mix.Project

@app_vsn "0.62.1"
@app_vsn "0.63.0"

def project do
[
Expand Down
4 changes: 2 additions & 2 deletions wasmcloud_host/chart/Chart.yaml
Expand Up @@ -8,7 +8,7 @@ keywords:
- wasmCloud
- wasmcloud

home: https://wasmcloud.dev
home: https://wasmcloud.com
sources:
- https://github.com/wasmCloud/wasmcloud-otp
icon: https://github.com/wasmCloud/wasmcloud.com-dev/raw/main/static/images/wasmcloud-logo.png
Expand All @@ -20,4 +20,4 @@ type: application
# in one PR, then open a new PR updating this `version` and `appVersion`.
version: 0.6.11

appVersion: "0.62.1"
appVersion: "0.63.0"
30 changes: 30 additions & 0 deletions wasmcloud_host/lib/wasmcloud_host/lattice/state_monitor.ex
Expand Up @@ -548,6 +548,36 @@ defmodule WasmcloudHost.Lattice.StateMonitor do
state
end

# No-op. No more information to gain from actors_started
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_started"
}
) do
state
end

# No-op. No more information to gain from actors_start_failed
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_start_failed"
}
) do
state
end

# No-op. No more information to gain from actors_stopped
defp process_event(
state,
%Cloudevents.Format.V_1_0.Event{
type: "com.wasmcloud.lattice.actors_stopped"
}
) do
state
end

# Fallthrough event handler to prevent errors for new events
defp process_event(
state,
Expand Down
2 changes: 1 addition & 1 deletion wasmcloud_host/mix.exs
@@ -1,7 +1,7 @@
defmodule WasmcloudHost.MixProject do
use Mix.Project

@app_vsn "0.62.1"
@app_vsn "0.63.0"

def project do
[
Expand Down

0 comments on commit 62de47f

Please sign in to comment.