Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Preliminary support for account security in nats internal server #243

Merged
merged 31 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
f90ca48
Preliminary support for account security in nats internal server
autodidaddict May 24, 2024
ed320cd
linter
autodidaddict May 24, 2024
188926a
safety checkin for rebooting :) starting to integrate the new interna…
autodidaddict May 24, 2024
67da0e8
Agents are working in new secure server
autodidaddict May 24, 2024
a28f9ab
suppressing internal NATS logs by default
autodidaddict May 24, 2024
c6bcd0b
lint
autodidaddict May 24, 2024
919ad17
adding subz wrapper
autodidaddict May 24, 2024
9d75111
shuffling stuff around to avoid a race condition
autodidaddict May 28, 2024
eb1fe26
removing superfluous query from firecracker startup
autodidaddict May 28, 2024
a808083
adding some additional logging
autodidaddict May 28, 2024
05d3756
removing cgo disable from agent build
autodidaddict May 29, 2024
e7634d6
Set internal nats in firecracker process manager init
kthomas May 30, 2024
3ee1b37
Log ping in agent
kthomas May 30, 2024
83366e2
Make agent ping timeout configurable
kthomas May 30, 2024
5a24732
Cleanup internal nats server config and init
kthomas May 30, 2024
b4bca92
Add logging
kthomas Jun 3, 2024
63db3cc
Refactor for readability
kthomas Jun 3, 2024
8ed3b97
Remove users list from internal NATS struct
kthomas Jun 3, 2024
b545cbe
Fix template handling
kthomas Jun 3, 2024
284b748
Destroy cached credentials
kthomas Jun 3, 2024
2278c3f
Cleanup code
kthomas Jun 4, 2024
68ba1c9
Add nil protection
kthomas Jun 4, 2024
f3bffa0
Add logging
kthomas Jun 4, 2024
d42029c
Add import/export for agentint.> for internal nex node connection
kthomas Jun 4, 2024
b8fd88c
Drop user/password from internal nex user
kthomas Jun 4, 2024
bd40b3e
Use agentint and hostint subjects
kthomas Jun 4, 2024
450db8e
Stop unnecessary cleanup of cached internal creds
kthomas Jun 5, 2024
c69aa08
Fix linter
kthomas Jun 5, 2024
f80fc1c
Fix test
kthomas Jun 5, 2024
a5140c2
Fix autostart
kthomas Jun 5, 2024
ad67c82
Support multiple autostart workloads
kthomas Jun 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,7 @@ test/panicker/panicker
pnats
.idea

_spec
_spec

/internal/node/internal-nats/pnats
/nex/pnats
106 changes: 69 additions & 37 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,21 @@ import (

"github.com/cloudevents/sdk-go/pkg/cloudevents"
"github.com/nats-io/nats.go"
"github.com/nats-io/nkeys"
"github.com/synadia-io/nex/agent/providers"
agentapi "github.com/synadia-io/nex/internal/agent-api"
"github.com/synadia-io/nex/internal/models"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)

const defaultAgentHandshakeTimeoutMillis = 500
const runloopSleepInterval = 250 * time.Millisecond
const runloopTickInterval = 2500 * time.Millisecond
const workloadExecutionSleepTimeoutMillis = 1000
const (
defaultAgentHandshakeTimeoutMillis = 500
runloopSleepInterval = 250 * time.Millisecond
runloopTickInterval = 2500 * time.Millisecond
workloadExecutionSleepTimeoutMillis = 1000
workloadCacheFileKey = "workload"
)

// Agent facilitates communication between the nex agent running in the firecracker VM
// and the nex node by way of a configured internal NATS server. Agent instances provide
Expand Down Expand Up @@ -61,7 +65,7 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
metadata, err = GetMachineMetadata()
}
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get machien metadata: %s\n", err)
fmt.Fprintf(os.Stderr, "failed to get machine metadata: %s\n", err)
return nil, fmt.Errorf("failed to get machine metadata: %s", err)
}

Expand All @@ -70,35 +74,15 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) {
return nil, fmt.Errorf("invalid metadata: %v", metadata.Errors)
}

nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", *metadata.NodeNatsHost, *metadata.NodeNatsPort))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err)
return nil, err
}

js, err := nc.JetStream()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err)
return nil, err
}

bucket, err := js.ObjectStore(agentapi.WorkloadCacheBucket)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err)
return nil, err
}

return &Agent{
agentLogs: make(chan *agentapi.LogEntry, 64),
eventLogs: make(chan *cloudevents.Event, 64),
// sandbox defaults to true, only way to override that is with an explicit 'false'
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
cacheBucket: bucket,
md: metadata,
nc: nc,
started: time.Now().UTC(),
cancelF: cancelF,
ctx: ctx,
sandboxed: isSandboxed(),
md: metadata,
started: time.Now().UTC(),
}, nil
}

Expand Down Expand Up @@ -151,11 +135,11 @@ func (a *Agent) requestHandshake() error {
}
raw, _ := json.Marshal(msg)

resp, err := a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err := a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
if err != nil {
if errors.Is(err, nats.ErrNoResponders) {
time.Sleep(time.Millisecond * 50)
resp, err = a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
resp, err = a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis)
}

if err != nil {
Expand Down Expand Up @@ -191,9 +175,9 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e
tempFile = fmt.Sprintf("%s.exe", tempFile)
}

err := a.cacheBucket.GetFile(*req.WorkloadName, tempFile)
err := a.cacheBucket.GetFile(workloadCacheFileKey, tempFile)
if err != nil {
msg := fmt.Sprintf("Failed to write workload artifact to temp dir: %s", err)
msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err)
a.LogError(msg)
return nil, errors.New(msg)
}
Expand All @@ -218,7 +202,7 @@ func (a *Agent) dispatchEvents() {
continue
}

subject := fmt.Sprintf("agentint.%s.events.%s", *a.md.VmID, entry.Type())
subject := fmt.Sprintf("hostint.%s.events.%s", *a.md.VmID, entry.Type())
err = a.nc.Publish(subject, bytes)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to publish event: %s", err.Error())
Expand All @@ -239,7 +223,7 @@ func (a *Agent) dispatchLogs() {
continue
}

subject := fmt.Sprintf("agentint.%s.logs", *a.md.VmID)
subject := fmt.Sprintf("hostint.%s.logs", *a.md.VmID)
err = a.nc.Publish(subject, bytes)
if err != nil {
continue
Expand Down Expand Up @@ -330,9 +314,17 @@ func (a *Agent) handleUndeploy(m *nats.Msg) {
}

func (a *Agent) handlePing(m *nats.Msg) {
// a.LogDebug(fmt.Sprintf("received ping on subject: %s", m.Subject))
_ = m.Respond([]byte("OK"))
}

// Agent instances subscribe to the following `agentint.>` subjects,
// which are exported dynamically by each `<agent_id>` account on the
// configured internal NATS connection for consumption by the nex node:
//
// - agentint.<agent_id>.deploy
// - agentint.<agent_id>.undeploy
// - agentint.<agent_id>.ping
func (a *Agent) init() error {
a.installSignalHandlers()

Expand All @@ -341,7 +333,13 @@ func (a *Agent) init() error {
propagation.Baggage{},
))

err := a.requestHandshake()
err := a.initNATS()
if err != nil {
a.LogError(fmt.Sprintf("Failed to initialize NATS connection: %s", err))
return err
}

err = a.requestHandshake()
if err != nil {
a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err))
return err
Expand Down Expand Up @@ -373,6 +371,40 @@ func (a *Agent) init() error {
return nil
}

func (a *Agent) initNATS() error {
url := fmt.Sprintf("nats://%s:%d", *a.md.NodeNatsHost, *a.md.NodeNatsPort)
pair, err := nkeys.FromSeed([]byte(*a.md.NodeNatsNkeySeed))
if err != nil {
fmt.Fprintf(os.Stderr, "invalid nkey seed: %v\n", *a.md.NodeNatsNkeySeed)
return fmt.Errorf("invalid nkey seed: %v", *a.md.NodeNatsNkeySeed)
}

pk, _ := pair.PublicKey()
a.nc, err = nats.Connect(url, nats.Nkey(pk, func(b []byte) ([]byte, error) {
fmt.Fprintf(os.Stdout, "Attempting to sign NATS server nonce for internal NATS connection; public key: %s", pk)
return pair.Sign(b)
}))
if err != nil {
fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err)
return err
}
fmt.Printf("Connected to internal NATS: %s\n", url)

js, err := a.nc.JetStream()
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err)
return err
}

a.cacheBucket, err = js.ObjectStore(agentapi.WorkloadCacheBucket)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err)
return err
}

return nil
}

func (a *Agent) installSignalHandlers() {
signal.Reset(syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP)
resetSIGUSR()
Expand Down
11 changes: 7 additions & 4 deletions agent/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ const nexEnvSandbox = "NEX_SANDBOX"
const nexEnvWorkloadID = "NEX_WORKLOADID"
const nexEnvNodeNatsHost = "NEX_NODE_NATS_HOST"
const nexEnvNodeNatsPort = "NEX_NODE_NATS_PORT"
const nexEnvNodeNatsSeed = "NEX_NODE_NATS_NKEY_SEED"

const metadataClientTimeoutMillis = 50
const metadataPollingTimeoutMillis = 5000
Expand Down Expand Up @@ -71,6 +72,7 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) {
vmid := os.Getenv(nexEnvWorkloadID)
host := os.Getenv(nexEnvNodeNatsHost)
port := os.Getenv(nexEnvNodeNatsPort)
seed := os.Getenv(nexEnvNodeNatsSeed)
msg := "Metadata obtained from no-sandbox environment"
p, err := strconv.Atoi(port)
if err != nil {
Expand All @@ -79,10 +81,11 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) {
}

return &agentapi.MachineMetadata{
VmID: &vmid,
NodeNatsHost: &host,
NodeNatsPort: &p,
Message: &msg,
VmID: &vmid,
NodeNatsHost: &host,
NodeNatsPort: &p,
NodeNatsNkeySeed: &seed,
Message: &msg,
}, nil
}

Expand Down
5 changes: 4 additions & 1 deletion agent/providers/lib/v8.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ func (v *V8) Execute(ctx context.Context, payload []byte) ([]byte, error) {
return
}

_, _ = v.stdout.Write([]byte(fmt.Sprintf("calling js function via trigger subject: %s", subject)))
val, err = fn.Call(v8ctx.Global(), argv1, argv2)
if err != nil {
errs <- err
Expand Down Expand Up @@ -543,6 +544,8 @@ func (v *V8) newMessagingObjectTemplate(ctx context.Context) *v8.ObjectTemplate
messaging := v8.NewObjectTemplate(v.iso)

_ = messaging.Set(hostServicesMessagingPublishFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value {
_, _ = v.stdout.Write([]byte(fmt.Sprintf("attempting to publish msg via %s", v.nc.Servers()[0])))

args := info.Args()
if len(args) != 2 {
val, _ := v8.NewValue(v.iso, "subject and payload are required")
Expand Down Expand Up @@ -847,7 +850,7 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8,

hsclient := hostservices.NewHostServicesClient(
params.NATSConn,
time.Second*2,
time.Second*5, // FIXME-- make configurable
*params.Namespace,
*params.WorkloadName,
params.VmID,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ require (
github.com/nats-io/nats.go v1.34.1
github.com/nats-io/natscli v0.1.4
github.com/nats-io/nkeys v0.4.7
github.com/nats-io/nuid v1.0.1
github.com/onsi/ginkgo/v2 v2.17.1
github.com/onsi/gomega v1.32.0
github.com/pkg/errors v0.9.1
Expand Down Expand Up @@ -109,7 +110,6 @@ require (
github.com/muesli/cancelreader v0.2.2 // indirect
github.com/muesli/reflow v0.3.0 // indirect
github.com/muesli/termenv v0.15.2 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
Expand Down
5 changes: 3 additions & 2 deletions host-services/builtins/keyvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ func (k *KeyValueService) HandleRequest(
method string,
workloadName string,
metadata map[string]string,
request []byte) (hostservices.ServiceResult, error) {

request []byte,
) (hostservices.ServiceResult, error) {
switch method {
case kvServiceMethodGet:
return k.handleGet(workloadId, workloadName, request, metadata, namespace)
Expand Down Expand Up @@ -208,5 +208,6 @@ func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats
}
}

k.log.Debug("Resolved key/value store for KV host service", slog.String("name", kvStoreName), slog.String("bucket", kvStore.Bucket()))
return kvStore, nil
}
2 changes: 1 addition & 1 deletion host-services/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func NewHostServicesClient(nc *nats.Conn, timeout time.Duration, namespace, work
}

func (c *HostServicesClient) PerformRPC(ctx context.Context, service string, method string, payload []byte, metadata map[string]string) (ServiceResult, error) {
subject := fmt.Sprintf("agentint.%s.rpc.%s.%s.%s.%s",
subject := fmt.Sprintf("hostint.%s.rpc.%s.%s.%s.%s",
c.workloadId,
c.namespace,
c.workloadName,
Expand Down
25 changes: 16 additions & 9 deletions host-services/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@ import (
)

type HostServicesServer struct {
log *slog.Logger
ncInternal *nats.Conn
services map[string]HostService
tracer trace.Tracer
log *slog.Logger
nc *nats.Conn
services map[string]HostService
tracer trace.Tracer
}

func NewHostServicesServer(nc *nats.Conn, log *slog.Logger, tracer trace.Tracer) *HostServicesServer {
return &HostServicesServer{
ncInternal: nc,
log: log,
services: make(map[string]HostService),
tracer: tracer,
log: log,
nc: nc,
services: make(map[string]HostService),
tracer: tracer,
}
}

Expand All @@ -49,12 +49,19 @@ func (h *HostServicesServer) AddService(name string, svc HostService, config jso
return nil
}

// Host services server instances subscribe to the following `hostint.>` subjects,
// which are exported by the `nexnode` account on the configured internal
// NATS connection for consumption by agents:
//
// - hostint.<agent_id>.rpc.<namespace>.<workloadName>.<service>.<method>
func (h *HostServicesServer) Start() error {
_, err := h.ncInternal.Subscribe("agentint.*.rpc.*.*.*.*", h.handleRPC)
_, err := h.nc.Subscribe("hostint.*.rpc.*.*.*.*", h.handleRPC)
if err != nil {
h.log.Warn("Failed to create Host services rpc subscription", slog.String("error", err.Error()))
return err
}

h.log.Debug("Host services rpc subscription created", slog.String("address", h.nc.ConnectedAddr()))
return nil
}

Expand Down
7 changes: 5 additions & 2 deletions host-services/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,13 @@ func ServiceResultPass(code uint, message string, data []byte) ServiceResult {

type HostService interface {
Initialize(json.RawMessage) error
HandleRequest(namespace string,

HandleRequest(
namespace string,
workloadId string,
method string,
workloadName string,
metadata map[string]string,
request []byte) (ServiceResult, error)
request []byte,
) (ServiceResult, error)
}
Loading
Loading