Skip to content

Commit

Permalink
Converting lattice cache to use a durable key value store (#513)
Browse files Browse the repository at this point in the history
* Converting lattice cache to use a durable key value store

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* format

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* sadpanda

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* formatted sadpanda

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Making migration to new cache explicit and one-time-only

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Changing name of bucket to LATTICEDATA_xxx

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Echo test put the same linkdef twice and didn't need to

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* FORMATTING IS MY FAVORITE

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Fixing a merge booboo

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Making the deprecation warning more stern

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* :sadpanda:

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Addressing PR comments

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Cleaning up topic generation for JS/KV

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Control interface now allows for the setting of linkdefinitions (backwards compatible)

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Fixing tiny bug, adding param names to a typespec

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* Delete linkdefs has always worked. nothing to see here.

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* boolean safety

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* d'oh

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* d'oh 2

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* d'oh 3

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

* d'oh 4

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>

Signed-off-by: Kevin Hoffman <autodidaddict@users.noreply.github.com>
  • Loading branch information
autodidaddict committed Dec 14, 2022
1 parent 4a089a4 commit 65a4198
Show file tree
Hide file tree
Showing 24 changed files with 820 additions and 318 deletions.
36 changes: 30 additions & 6 deletions host_core/lib/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ defmodule HostCore.Application do
use Application

@host_config_file "host_config.json"
@extra_keys [
:cluster_adhoc,
:cache_deliver_inbox,
:metadata_deliver_inbox,
:host_seed,
:enable_structured_logging,
:structured_log_level,
:host_key
]

def start(_type, _args) do
create_ets_tables()
Expand Down Expand Up @@ -120,14 +129,24 @@ defmodule HostCore.Application do
|> Map.put(:host_key, host_key)
|> Map.put(:host_seed, host_seed)

config = ensure_booleans(config)

s =
Hashids.new(
salt: "lc_deliver_inbox",
min_len: 2
)

s2 =
Hashids.new(
salt: "md_deliver_inbox",
min_len: 2
)

hid = Hashids.encode(s, Enum.random(1..4_294_967_295))
hid2 = Hashids.encode(s2, Enum.random(1..4_294_967_295))
config = Map.put(config, :cache_deliver_inbox, "_INBOX.#{hid}")
config = Map.put(config, :metadata_deliver_inbox, "INBOX.#{hid2}")

if config.js_domain != nil && String.valid?(config.js_domain) &&
String.length(config.js_domain) > 1 do
Expand Down Expand Up @@ -205,6 +224,16 @@ defmodule HostCore.Application do
config
end

defp ensure_booleans(config) do
bool_keys = [:config_service_enabled, :ctl_tls, :rpc_tls, :enable_ipv6]

Enum.reduce(bool_keys, config, fn key, config ->
old = Map.get(config, key, nil)
new = HostCore.Vhost.ConfigPlan.string_to_bool(old)
Map.put(config, key, new)
end)
end

defp write_config(config) do
write_json(config, @host_config_file)

Expand Down Expand Up @@ -233,12 +262,7 @@ defmodule HostCore.Application do

defp remove_extras(config) do
config
|> Map.delete(:cluster_adhoc)
|> Map.delete(:cache_deliver_inbox)
|> Map.delete(:host_seed)
|> Map.delete(:enable_structured_logging)
|> Map.delete(:structured_log_level)
|> Map.delete(:host_key)
|> Map.drop(@extra_keys)
end

defp ensure_contains(list, item) do
Expand Down
12 changes: 7 additions & 5 deletions host_core/lib/host_core/actors/actor_module.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ defmodule HostCore.Actors.ActorModule do
use GenServer, restart: :transient

alias HostCore.Actors.ActorRpcSupervisor
alias HostCore.Claims.Manager, as: ClaimsManager
alias HostCore.Policy.Manager, as: PolicyManager
alias HostCore.CloudEvent
alias HostCore.ControlInterface.LatticeServer
alias HostCore.Vhost.VirtualHost
Expand Down Expand Up @@ -494,11 +496,11 @@ defmodule HostCore.Actors.ActorModule do
target = token.invocation["target"]

decision =
with {:ok, _topic} <- HostCore.Policy.Manager.policy_topic(config),
with {:ok, _topic} <- PolicyManager.policy_topic(config),
{:ok, source_claims} <-
HostCore.Claims.Manager.lookup_claims(lattice_prefix, origin["public_key"]),
ClaimsManager.lookup_claims(lattice_prefix, origin["public_key"]),
{:ok, _target_claims} <-
HostCore.Claims.Manager.lookup_claims(lattice_prefix, target["public_key"]) do
ClaimsManager.lookup_claims(lattice_prefix, target["public_key"]) do
expired =
case source_claims[:exp] do
nil -> false
Expand All @@ -511,7 +513,7 @@ defmodule HostCore.Actors.ActorModule do
else
config = VirtualHost.config(host_id)

HostCore.Policy.Manager.evaluate_action(
PolicyManager.evaluate_action(
config,
origin,
target,
Expand Down Expand Up @@ -616,7 +618,7 @@ defmodule HostCore.Actors.ActorModule do

Registry.register(Registry.ActorRegistry, claims.public_key, host_id)

HostCore.Claims.Manager.put_claims(lattice_prefix, claims)
ClaimsManager.put_claims(host_id, lattice_prefix, claims)
ActorRpcSupervisor.start_or_reuse_consumer_supervisor(lattice_prefix, claims)

{:ok, agent} =
Expand Down
2 changes: 1 addition & 1 deletion host_core/lib/host_core/actors/actor_supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ defmodule HostCore.Actors.ActorSupervisor do
{:ok, old_claims} <-
HostCore.Claims.Manager.lookup_claims(lattice_prefix, new_claims.public_key),
:ok <- validate_actor_for_update(old_claims, new_claims) do
HostCore.Claims.Manager.put_claims(lattice_prefix, new_claims)
HostCore.Claims.Manager.put_claims(host_id, lattice_prefix, new_claims)
HostCore.Refmaps.Manager.put_refmap(host_id, lattice_prefix, ref, new_claims.public_key)
targets = find_actor(new_claims.public_key, host_id)

Expand Down
39 changes: 33 additions & 6 deletions host_core/lib/host_core/claims/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@ defmodule HostCore.Claims.Manager do
@moduledoc false
require Logger

import HostCore.Jetstream.MetadataCacheLoader, only: [broadcast_event: 3]

@type cached_claimsdata :: %{
call_alias: String.t(),
iss: String.t(),
name: String.t(),
caps: String.t(),
rev: String.t(),
tags: String.t(),
version: String.t(),
sub: String.t()
}

@spec lookup_claims(lattice_prefix :: String.t(), public_key :: String.t()) ::
:error | {:ok, map()}
def lookup_claims(lattice_prefix, public_key) do
Expand All @@ -11,17 +24,26 @@ defmodule HostCore.Claims.Manager do
end
end

@spec cache_claims(
lattice_prefix :: String.t(),
key :: String.t(),
claims :: cached_claimsdata()
) :: any()
def cache_claims(lattice_prefix, key, claims) do
:ets.insert(claims_table_atom(lattice_prefix), {key, claims})
end

def uncache_claims(lattice_prefix, key) do
:ets.delete(claims_table_atom(lattice_prefix), key)
end

def cache_call_alias(lattice_prefix, call_alias, public_key) do
if call_alias != nil && String.length(call_alias) > 1 do
:ets.insert_new(callalias_table_atom(lattice_prefix), {call_alias, public_key})
end
end

def put_claims(lattice_prefix, claims) do
def put_claims(host_id, lattice_prefix, claims) do
key = claims.public_key

claims = %{
Expand Down Expand Up @@ -62,7 +84,7 @@ defmodule HostCore.Claims.Manager do

cache_call_alias(lattice_prefix, claims.call_alias, claims.sub)
cache_claims(lattice_prefix, key, claims)
publish_claims(lattice_prefix, claims)
publish_claims(host_id, lattice_prefix, claims)
end

def claims_table_atom(lattice_prefix) do
Expand All @@ -83,12 +105,17 @@ defmodule HostCore.Claims.Manager do
end
end

defp publish_claims(lattice_prefix, claims) do
topic = "lc.#{lattice_prefix}.claims.#{claims.sub}"
defp publish_claims(host_id, lattice_prefix, claims) do
config = HostCore.Vhost.VirtualHost.config(host_id)

conn = HostCore.Nats.control_connection(lattice_prefix)
HostCore.Jetstream.Client.kv_put(
lattice_prefix,
config.js_domain,
"CLAIMS_#{claims.sub}",
Jason.encode!(claims)
)

HostCore.Nats.safe_pub(conn, topic, Jason.encode!(claims))
broadcast_event(:claims_added, claims, lattice_prefix)
end

def get_claims(lattice_prefix) when is_binary(lattice_prefix) do
Expand Down
14 changes: 5 additions & 9 deletions host_core/lib/host_core/control_interface/lattice_server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule HostCore.ControlInterface.LatticeServer do
use Gnat.Server

alias HostCore.CloudEvent
alias HostCore.Linkdefs.Manager, as: LinkdefsManager

import HostCore.Nats,
only: [safe_pub: 3, control_connection: 1]
Expand Down Expand Up @@ -88,11 +89,10 @@ defmodule HostCore.ControlInterface.LatticeServer do

### LINKDEFS
# These requests are targeted at one host per lattice, changes made as a result
# are emitted to the appropriate stream and cached
# are emitted to the appropriate stream and cached.
# THESE ARE NOW DEPRECATED
# Eventually the host will stop subscribing to these topics

# Put a link definition
# This will first store the link definition in memory, then publish it to the stream
# then publish it directly to the relevant provider via the RPC channel
defp handle_request({"linkdefs", "put"}, body, _reply_to, prefix) do
Tracer.with_span "Handle Linkdef Put (ctl)", kind: :server do
with {:ok, ld} <- Jason.decode(body),
Expand All @@ -114,15 +114,11 @@ defmodule HostCore.ControlInterface.LatticeServer do
end
end

# Remove a link definition
# This will first remove the link definition from memory, then publish the removal
# message to the stream, then publish the removal directly to the relevant provider via the
# RPC channel
defp handle_request({"linkdefs", "del"}, body, _reply_to, prefix) do
Tracer.with_span "Handle Linkdef Del (ctl)", kind: :server do
with {:ok, ld} <- Jason.decode(body),
true <- has_values(ld, ["actor_id", "contract_id", "link_name"]) do
HostCore.Linkdefs.Manager.del_link_definition(
HostCore.Linkdefs.Manager.del_link_definition_by_triple(
prefix,
ld["actor_id"],
ld["contract_id"],
Expand Down
88 changes: 0 additions & 88 deletions host_core/lib/host_core/jetstream/cache_loader.ex

This file was deleted.

Loading

0 comments on commit 65a4198

Please sign in to comment.