diff --git a/.gitignore b/.gitignore index 2b1fe273..fb8c3722 100644 --- a/.gitignore +++ b/.gitignore @@ -36,4 +36,7 @@ test/panicker/panicker pnats .idea -_spec \ No newline at end of file +_spec + +/internal/node/internal-nats/pnats +/nex/pnats \ No newline at end of file diff --git a/agent/agent.go b/agent/agent.go index 2eb84cb1..a9d86d98 100644 --- a/agent/agent.go +++ b/agent/agent.go @@ -16,6 +16,7 @@ import ( "github.com/cloudevents/sdk-go/pkg/cloudevents" "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" "github.com/synadia-io/nex/agent/providers" agentapi "github.com/synadia-io/nex/internal/agent-api" "github.com/synadia-io/nex/internal/models" @@ -23,10 +24,13 @@ import ( "go.opentelemetry.io/otel/propagation" ) -const defaultAgentHandshakeTimeoutMillis = 500 -const runloopSleepInterval = 250 * time.Millisecond -const runloopTickInterval = 2500 * time.Millisecond -const workloadExecutionSleepTimeoutMillis = 1000 +const ( + defaultAgentHandshakeTimeoutMillis = 500 + runloopSleepInterval = 250 * time.Millisecond + runloopTickInterval = 2500 * time.Millisecond + workloadExecutionSleepTimeoutMillis = 1000 + workloadCacheFileKey = "workload" +) // Agent facilitates communication between the nex agent running in the firecracker VM // and the nex node by way of a configured internal NATS server. Agent instances provide @@ -61,7 +65,7 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) { metadata, err = GetMachineMetadata() } if err != nil { - fmt.Fprintf(os.Stderr, "failed to get machien metadata: %s\n", err) + fmt.Fprintf(os.Stderr, "failed to get machine metadata: %s\n", err) return nil, fmt.Errorf("failed to get machine metadata: %s", err) } @@ -70,35 +74,15 @@ func NewAgent(ctx context.Context, cancelF context.CancelFunc) (*Agent, error) { return nil, fmt.Errorf("invalid metadata: %v", metadata.Errors) } - nc, err := nats.Connect(fmt.Sprintf("nats://%s:%d", *metadata.NodeNatsHost, *metadata.NodeNatsPort)) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err) - return nil, err - } - - js, err := nc.JetStream() - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err) - return nil, err - } - - bucket, err := js.ObjectStore(agentapi.WorkloadCacheBucket) - if err != nil { - fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err) - return nil, err - } - return &Agent{ agentLogs: make(chan *agentapi.LogEntry, 64), eventLogs: make(chan *cloudevents.Event, 64), // sandbox defaults to true, only way to override that is with an explicit 'false' - cancelF: cancelF, - ctx: ctx, - sandboxed: isSandboxed(), - cacheBucket: bucket, - md: metadata, - nc: nc, - started: time.Now().UTC(), + cancelF: cancelF, + ctx: ctx, + sandboxed: isSandboxed(), + md: metadata, + started: time.Now().UTC(), }, nil } @@ -151,11 +135,11 @@ func (a *Agent) requestHandshake() error { } raw, _ := json.Marshal(msg) - resp, err := a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) + resp, err := a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) if err != nil { if errors.Is(err, nats.ErrNoResponders) { time.Sleep(time.Millisecond * 50) - resp, err = a.nc.Request(fmt.Sprintf("agentint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) + resp, err = a.nc.Request(fmt.Sprintf("hostint.%s.handshake", *a.md.VmID), raw, time.Millisecond*defaultAgentHandshakeTimeoutMillis) } if err != nil { @@ -191,9 +175,9 @@ func (a *Agent) cacheExecutableArtifact(req *agentapi.DeployRequest) (*string, e tempFile = fmt.Sprintf("%s.exe", tempFile) } - err := a.cacheBucket.GetFile(*req.WorkloadName, tempFile) + err := a.cacheBucket.GetFile(workloadCacheFileKey, tempFile) if err != nil { - msg := fmt.Sprintf("Failed to write workload artifact to temp dir: %s", err) + msg := fmt.Sprintf("Failed to get and write workload artifact to temp dir: %s", err) a.LogError(msg) return nil, errors.New(msg) } @@ -218,7 +202,7 @@ func (a *Agent) dispatchEvents() { continue } - subject := fmt.Sprintf("agentint.%s.events.%s", *a.md.VmID, entry.Type()) + subject := fmt.Sprintf("hostint.%s.events.%s", *a.md.VmID, entry.Type()) err = a.nc.Publish(subject, bytes) if err != nil { fmt.Fprintf(os.Stderr, "failed to publish event: %s", err.Error()) @@ -239,7 +223,7 @@ func (a *Agent) dispatchLogs() { continue } - subject := fmt.Sprintf("agentint.%s.logs", *a.md.VmID) + subject := fmt.Sprintf("hostint.%s.logs", *a.md.VmID) err = a.nc.Publish(subject, bytes) if err != nil { continue @@ -330,9 +314,17 @@ func (a *Agent) handleUndeploy(m *nats.Msg) { } func (a *Agent) handlePing(m *nats.Msg) { + // a.LogDebug(fmt.Sprintf("received ping on subject: %s", m.Subject)) _ = m.Respond([]byte("OK")) } +// Agent instances subscribe to the following `agentint.>` subjects, +// which are exported dynamically by each `` account on the +// configured internal NATS connection for consumption by the nex node: +// +// - agentint..deploy +// - agentint..undeploy +// - agentint..ping func (a *Agent) init() error { a.installSignalHandlers() @@ -341,7 +333,13 @@ func (a *Agent) init() error { propagation.Baggage{}, )) - err := a.requestHandshake() + err := a.initNATS() + if err != nil { + a.LogError(fmt.Sprintf("Failed to initialize NATS connection: %s", err)) + return err + } + + err = a.requestHandshake() if err != nil { a.LogError(fmt.Sprintf("Failed to handshake with node: %s", err)) return err @@ -373,6 +371,40 @@ func (a *Agent) init() error { return nil } +func (a *Agent) initNATS() error { + url := fmt.Sprintf("nats://%s:%d", *a.md.NodeNatsHost, *a.md.NodeNatsPort) + pair, err := nkeys.FromSeed([]byte(*a.md.NodeNatsNkeySeed)) + if err != nil { + fmt.Fprintf(os.Stderr, "invalid nkey seed: %v\n", *a.md.NodeNatsNkeySeed) + return fmt.Errorf("invalid nkey seed: %v", *a.md.NodeNatsNkeySeed) + } + + pk, _ := pair.PublicKey() + a.nc, err = nats.Connect(url, nats.Nkey(pk, func(b []byte) ([]byte, error) { + fmt.Fprintf(os.Stdout, "Attempting to sign NATS server nonce for internal NATS connection; public key: %s", pk) + return pair.Sign(b) + })) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to connect to shared NATS: %s", err) + return err + } + fmt.Printf("Connected to internal NATS: %s\n", url) + + js, err := a.nc.JetStream() + if err != nil { + fmt.Fprintf(os.Stderr, "failed to get JetStream context from shared NATS: %s", err) + return err + } + + a.cacheBucket, err = js.ObjectStore(agentapi.WorkloadCacheBucket) + if err != nil { + fmt.Fprintf(os.Stderr, "failed to get reference to shared object store: %s", err) + return err + } + + return nil +} + func (a *Agent) installSignalHandlers() { signal.Reset(syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP) resetSIGUSR() diff --git a/agent/metadata.go b/agent/metadata.go index a7aceee6..d3ee6304 100644 --- a/agent/metadata.go +++ b/agent/metadata.go @@ -21,6 +21,7 @@ const nexEnvSandbox = "NEX_SANDBOX" const nexEnvWorkloadID = "NEX_WORKLOADID" const nexEnvNodeNatsHost = "NEX_NODE_NATS_HOST" const nexEnvNodeNatsPort = "NEX_NODE_NATS_PORT" +const nexEnvNodeNatsSeed = "NEX_NODE_NATS_NKEY_SEED" const metadataClientTimeoutMillis = 50 const metadataPollingTimeoutMillis = 5000 @@ -71,6 +72,7 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) { vmid := os.Getenv(nexEnvWorkloadID) host := os.Getenv(nexEnvNodeNatsHost) port := os.Getenv(nexEnvNodeNatsPort) + seed := os.Getenv(nexEnvNodeNatsSeed) msg := "Metadata obtained from no-sandbox environment" p, err := strconv.Atoi(port) if err != nil { @@ -79,10 +81,11 @@ func GetMachineMetadataFromEnv() (*agentapi.MachineMetadata, error) { } return &agentapi.MachineMetadata{ - VmID: &vmid, - NodeNatsHost: &host, - NodeNatsPort: &p, - Message: &msg, + VmID: &vmid, + NodeNatsHost: &host, + NodeNatsPort: &p, + NodeNatsNkeySeed: &seed, + Message: &msg, }, nil } diff --git a/agent/providers/lib/v8.go b/agent/providers/lib/v8.go index 3c2e272b..f247cb1b 100644 --- a/agent/providers/lib/v8.go +++ b/agent/providers/lib/v8.go @@ -183,6 +183,7 @@ func (v *V8) Execute(ctx context.Context, payload []byte) ([]byte, error) { return } + _, _ = v.stdout.Write([]byte(fmt.Sprintf("calling js function via trigger subject: %s", subject))) val, err = fn.Call(v8ctx.Global(), argv1, argv2) if err != nil { errs <- err @@ -543,6 +544,8 @@ func (v *V8) newMessagingObjectTemplate(ctx context.Context) *v8.ObjectTemplate messaging := v8.NewObjectTemplate(v.iso) _ = messaging.Set(hostServicesMessagingPublishFunctionName, v8.NewFunctionTemplate(v.iso, func(info *v8.FunctionCallbackInfo) *v8.Value { + _, _ = v.stdout.Write([]byte(fmt.Sprintf("attempting to publish msg via %s", v.nc.Servers()[0]))) + args := info.Args() if len(args) != 2 { val, _ := v8.NewValue(v.iso, "subject and payload are required") @@ -847,7 +850,7 @@ func InitNexExecutionProviderV8(params *agentapi.ExecutionProviderParams) (*V8, hsclient := hostservices.NewHostServicesClient( params.NATSConn, - time.Second*2, + time.Second*5, // FIXME-- make configurable *params.Namespace, *params.WorkloadName, params.VmID, diff --git a/go.mod b/go.mod index 541e7d02..03c8153f 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/nats-io/nats.go v1.34.1 github.com/nats-io/natscli v0.1.4 github.com/nats-io/nkeys v0.4.7 + github.com/nats-io/nuid v1.0.1 github.com/onsi/ginkgo/v2 v2.17.1 github.com/onsi/gomega v1.32.0 github.com/pkg/errors v0.9.1 @@ -109,7 +110,6 @@ require ( github.com/muesli/cancelreader v0.2.2 // indirect github.com/muesli/reflow v0.3.0 // indirect github.com/muesli/termenv v0.15.2 // indirect - github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/ulid v1.3.1 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect diff --git a/host-services/builtins/keyvalue.go b/host-services/builtins/keyvalue.go index 39316f94..49b6fb79 100644 --- a/host-services/builtins/keyvalue.go +++ b/host-services/builtins/keyvalue.go @@ -60,8 +60,8 @@ func (k *KeyValueService) HandleRequest( method string, workloadName string, metadata map[string]string, - request []byte) (hostservices.ServiceResult, error) { - + request []byte, +) (hostservices.ServiceResult, error) { switch method { case kvServiceMethodGet: return k.handleGet(workloadId, workloadName, request, metadata, namespace) @@ -208,5 +208,6 @@ func (k *KeyValueService) resolveKeyValueStore(namespace, workload string) (nats } } + k.log.Debug("Resolved key/value store for KV host service", slog.String("name", kvStoreName), slog.String("bucket", kvStore.Bucket())) return kvStore, nil } diff --git a/host-services/client.go b/host-services/client.go index d88d3bc5..1f3a6c06 100644 --- a/host-services/client.go +++ b/host-services/client.go @@ -30,7 +30,7 @@ func NewHostServicesClient(nc *nats.Conn, timeout time.Duration, namespace, work } func (c *HostServicesClient) PerformRPC(ctx context.Context, service string, method string, payload []byte, metadata map[string]string) (ServiceResult, error) { - subject := fmt.Sprintf("agentint.%s.rpc.%s.%s.%s.%s", + subject := fmt.Sprintf("hostint.%s.rpc.%s.%s.%s.%s", c.workloadId, c.namespace, c.workloadName, diff --git a/host-services/server.go b/host-services/server.go index a4666358..a9c03b26 100644 --- a/host-services/server.go +++ b/host-services/server.go @@ -15,18 +15,18 @@ import ( ) type HostServicesServer struct { - log *slog.Logger - ncInternal *nats.Conn - services map[string]HostService - tracer trace.Tracer + log *slog.Logger + nc *nats.Conn + services map[string]HostService + tracer trace.Tracer } func NewHostServicesServer(nc *nats.Conn, log *slog.Logger, tracer trace.Tracer) *HostServicesServer { return &HostServicesServer{ - ncInternal: nc, - log: log, - services: make(map[string]HostService), - tracer: tracer, + log: log, + nc: nc, + services: make(map[string]HostService), + tracer: tracer, } } @@ -49,12 +49,19 @@ func (h *HostServicesServer) AddService(name string, svc HostService, config jso return nil } +// Host services server instances subscribe to the following `hostint.>` subjects, +// which are exported by the `nexnode` account on the configured internal +// NATS connection for consumption by agents: +// +// - hostint..rpc.... func (h *HostServicesServer) Start() error { - _, err := h.ncInternal.Subscribe("agentint.*.rpc.*.*.*.*", h.handleRPC) + _, err := h.nc.Subscribe("hostint.*.rpc.*.*.*.*", h.handleRPC) if err != nil { + h.log.Warn("Failed to create Host services rpc subscription", slog.String("error", err.Error())) return err } + h.log.Debug("Host services rpc subscription created", slog.String("address", h.nc.ConnectedAddr())) return nil } diff --git a/host-services/service.go b/host-services/service.go index ba717fe1..9fedb4c7 100644 --- a/host-services/service.go +++ b/host-services/service.go @@ -43,10 +43,13 @@ func ServiceResultPass(code uint, message string, data []byte) ServiceResult { type HostService interface { Initialize(json.RawMessage) error - HandleRequest(namespace string, + + HandleRequest( + namespace string, workloadId string, method string, workloadName string, metadata map[string]string, - request []byte) (ServiceResult, error) + request []byte, + ) (ServiceResult, error) } diff --git a/internal/agent-api/client.go b/internal/agent-api/client.go index 28be3bd8..0a22efb4 100644 --- a/internal/agent-api/client.go +++ b/internal/agent-api/client.go @@ -42,6 +42,7 @@ type AgentClient struct { agentID string handshakeTimeout time.Duration handshakeReceived *atomic.Bool + pingTimeout time.Duration stopping uint32 handshakeTimedOut HandshakeCallback @@ -59,23 +60,24 @@ type AgentClient struct { func NewAgentClient( nc *nats.Conn, log *slog.Logger, - handshakeTimeout time.Duration, + handshakeTimeout, pingTimeout time.Duration, onTimedOut HandshakeCallback, onSuccess HandshakeCallback, + onContactLost ContactLostCallback, onEvent EventCallback, onLog LogCallback, - onContactLost ContactLostCallback, ) *AgentClient { return &AgentClient{ + contactLost: onContactLost, eventReceived: onEvent, handshakeReceived: &atomic.Bool{}, handshakeTimeout: handshakeTimeout, handshakeTimedOut: onTimedOut, handshakeSucceeded: onSuccess, - contactLost: onContactLost, log: log, logReceived: onLog, nc: nc, + pingTimeout: pingTimeout, subz: make([]*nats.Subscription, 0), } } @@ -85,6 +87,13 @@ func (a *AgentClient) ID() string { return a.agentID } +// Agent client instances subscribe to the following `hostint.>` subjects, +// which are exported by the `nexnode` account on the configured internal +// NATS connection for consumption by agents: +// +// - hostint..handshake +// - hostint..events +// - hostint..logs func (a *AgentClient) Start(agentID string) error { a.log.Info("Agent client starting", slog.String("agent_id", agentID)) a.agentID = agentID @@ -92,19 +101,19 @@ func (a *AgentClient) Start(agentID string) error { var sub *nats.Subscription var err error - sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.handshake", agentID), a.handleHandshake) + sub, err = a.nc.Subscribe(fmt.Sprintf("hostint.%s.handshake", agentID), a.handleHandshake) if err != nil { return err } a.subz = append(a.subz, sub) - sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.events.*", agentID), a.handleAgentEvent) + sub, err = a.nc.Subscribe(fmt.Sprintf("hostint.%s.events.*", agentID), a.handleAgentEvent) if err != nil { return err } a.subz = append(a.subz, sub) - sub, err = a.nc.Subscribe(fmt.Sprintf("agentint.%s.logs", agentID), a.handleAgentLog) + sub, err = a.nc.Subscribe(fmt.Sprintf("hostint.%s.logs", agentID), a.handleAgentLog) if err != nil { return err } @@ -142,6 +151,7 @@ func (a *AgentClient) DeployWorkload(request *DeployRequest) (*DeployResponse, e a.log.Error("Failed to deserialize deployment response", slog.Any("error", err)) return nil, err } + a.workloadStartedAt = time.Now().UTC() return &deployResponse, nil } @@ -181,6 +191,8 @@ func (a *AgentClient) Stop() error { } func (a *AgentClient) Undeploy() error { + _ = a.Stop() + subject := fmt.Sprintf("agentint.%s.undeploy", a.agentID) a.log.Debug("sending undeploy request to agent via internal NATS connection", @@ -193,15 +205,17 @@ func (a *AgentClient) Undeploy() error { a.log.Warn("request to undeploy workload via internal NATS connection failed", slog.String("agent_id", a.agentID), slog.String("error", err.Error())) return err } + return nil } func (a *AgentClient) Ping() error { subject := fmt.Sprintf("agentint.%s.ping", a.agentID) + // a.log.Debug("pinging agent", slog.String("subject", subject)) - _, err := a.nc.Request(subject, []byte{}, 750*time.Millisecond) + _, err := a.nc.Request(subject, []byte{}, a.pingTimeout) if err != nil { - a.log.Warn("Agent failed to respond to ping", slog.Any("error", err)) + a.log.Warn("agent failed to respond to ping", slog.Any("error", err)) return err } @@ -279,8 +293,10 @@ func (a *AgentClient) handleHandshake(msg *nats.Msg) { } func (a *AgentClient) monitorAgent() { - ticker := time.NewTicker(15 * time.Second) - for { + ticker := time.NewTicker(5000 * time.Millisecond) // FIXME-- make configurable + defer ticker.Stop() + + for !a.shuttingDown() { <-ticker.C err := a.Ping() if err != nil { @@ -289,14 +305,10 @@ func (a *AgentClient) monitorAgent() { } break } - if a.stopping > 0 { - break - } } } func (a *AgentClient) handleAgentEvent(msg *nats.Msg) { - // agentint.{agentID}.events.{type} tokens := strings.Split(msg.Subject, ".") agentID := tokens[1] diff --git a/internal/agent-api/types.go b/internal/agent-api/types.go index b03610be..a4ff81e3 100644 --- a/internal/agent-api/types.go +++ b/internal/agent-api/types.go @@ -38,7 +38,7 @@ type ExecutionProviderParams struct { TmpFilename *string `json:"-"` VmID string `json:"-"` - // NATS connection which be injected into the execution provider + // NATS connections which be injected into the execution provider NATSConn *nats.Conn `json:"-"` } @@ -180,10 +180,11 @@ type HostServicesMessagingResponse struct { } type MachineMetadata struct { - VmID *string `json:"vmid"` - NodeNatsHost *string `json:"node_nats_host"` - NodeNatsPort *int `json:"node_nats_port"` - Message *string `json:"message"` + VmID *string `json:"vmid"` + NodeNatsHost *string `json:"node_nats_host"` + NodeNatsPort *int `json:"node_nats_port"` + NodeNatsNkeySeed *string `json:"node_nats_nkey"` + Message *string `json:"message"` Errors []error `json:"errors,omitempty"` } diff --git a/internal/models/config.go b/internal/models/config.go index d994fa33..dc74b0ad 100644 --- a/internal/models/config.go +++ b/internal/models/config.go @@ -21,6 +21,7 @@ const ( DefaultNodeVcpuCount = 1 DefaultOtelExporterUrl = "127.0.0.1:14532" DefaultAgentHandshakeTimeoutMillisecond = 5000 + DefaultAgentPingTimeoutMillisecond = 750 ) var ( @@ -36,6 +37,7 @@ var ( // as the virtual machines it produces type NodeConfiguration struct { AgentHandshakeTimeoutMillisecond int `json:"agent_handshake_timeout_ms,omitempty"` + AgentPingTimeoutMillisecond int `json:"agent_ping_timeout_ms,omitempty"` BinPath []string `json:"bin_path"` CNI CNIDefinition `json:"cni"` DefaultResourceDir string `json:"default_resource_dir"` @@ -144,6 +146,7 @@ func DefaultNodeConfiguration() NodeConfiguration { config := NodeConfiguration{ AgentHandshakeTimeoutMillisecond: DefaultAgentHandshakeTimeoutMillisecond, + AgentPingTimeoutMillisecond: DefaultAgentPingTimeoutMillisecond, BinPath: DefaultBinPath, // CAUTION: This needs to be the IP of the node server's internal NATS --as visible to the agent. // This is not necessarily the address on which the internal NATS server is actually listening inside the node. diff --git a/internal/node/controlapi.go b/internal/node/controlapi.go index 3e46e79a..924ab11c 100644 --- a/internal/node/controlapi.go +++ b/internal/node/controlapi.go @@ -29,6 +29,7 @@ type ApiListener struct { subz []*nats.Subscription } +// FIXME-- stop passing node here func NewApiListener(log *slog.Logger, mgr *WorkloadManager, node *Node) *ApiListener { config := node.config @@ -271,9 +272,19 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) { err := fmt.Errorf("invalid workload issuer: %s", request.DecodedClaims.Issuer) api.log.Error("Workload validation failed", slog.Any("err", err)) respondFail(controlapi.RunResponseType, m, fmt.Sprintf("%s", err)) + return + } + + agentClient, err := api.mgr.SelectRandomAgent() + if err != nil { + api.log.Error("Failed to get agent client from pool", slog.Any("err", err)) + respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to get agent client from pool: %s", err)) + return } - numBytes, workloadHash, err := api.mgr.CacheWorkload(&request) + workloadID := agentClient.ID() + + numBytes, workloadHash, err := api.mgr.CacheWorkload(workloadID, &request) if err != nil { api.log.Error("Failed to cache workload bytes", slog.Any("err", err)) respondFail(controlapi.RunResponseType, m, fmt.Sprintf("Failed to cache workload bytes: %s", err)) @@ -311,7 +322,7 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) { slog.String("type", string(request.WorkloadType)), ) - workloadID, err := api.mgr.DeployWorkload(deployRequest) + err = api.mgr.DeployWorkload(agentClient, deployRequest) if err != nil { api.log.Error("Failed to deploy workload", slog.String("error", err.Error()), @@ -320,22 +331,22 @@ func (api *ApiListener) handleDeploy(m *nats.Msg) { return } - if _, ok := api.mgr.handshakes[*workloadID]; !ok { + if _, ok := api.mgr.handshakes[workloadID]; !ok { api.log.Error("Attempted to deploy workload into bad process (no handshake)", - slog.String("workload_id", *workloadID), + slog.String("workload_id", workloadID), ) respondFail(controlapi.RunResponseType, m, "Could not deploy workload, agent pool did not initialize properly") return } workloadName := request.DecodedClaims.Subject - api.log.Info("Workload deployed", slog.String("workload", workloadName), slog.String("workload_id", *workloadID)) + api.log.Info("Workload deployed", slog.String("workload", workloadName), slog.String("workload_id", workloadID)) res := controlapi.NewEnvelope(controlapi.RunResponseType, controlapi.RunResponse{ Started: true, Name: workloadName, Issuer: request.DecodedClaims.Issuer, - ID: *workloadID, // FIXME-- rename to match + ID: workloadID, // FIXME-- rename to match }, nil) raw, err := json.Marshal(res) diff --git a/internal/node/internal-nats/config_template.go b/internal/node/internal-nats/config_template.go new file mode 100644 index 00000000..b45adc32 --- /dev/null +++ b/internal/node/internal-nats/config_template.go @@ -0,0 +1,94 @@ +package internalnats + +import ( + "bytes" + "log/slog" + "text/template" +) + +type internalServerData struct { + Credentials map[string]*credentials + NexHostUserPublic string + NexHostUserSeed string +} + +type credentials struct { + ID string + NkeySeed string + NkeyPublic string +} + +/* + * In the below template, the nexhost (the account used by the host) will + * be able to import the following: + * *.agentevt.> - agent events streamed from workloads + * hostint.> - where the first token after agentint is the account ID from the workload + */ + +const ( + configTemplate = ` +jetstream: true + +accounts: { + nexhost: { + jetstream: true + users: [ + {nkey: "{{ .NexHostUserPublic }}"} + ] + exports: [ + { + service: hostint.> + } + ], + imports: [ + {{ range .Credentials }} + { + service: {subject: agentint.{{ .ID }}.>, account: {{ .ID }}} + }, + { + stream: {subject: agentevt.>, account: {{ .ID }}}, prefix: {{ .ID }} + }, + {{ end }} + ] + }, + {{ range .Credentials }} + {{ .ID }}: { + jetstream: true + users: [ + {nkey: "{{ .NkeyPublic }}"} + ] + exports: [ + { + service: agentint.{{ .ID }}.>, accounts: [nexhost] + } + { + stream: agentevt.>, accounts: [nexhost] + } + ] + imports: [ + { + service: {account: nexhost, subject: hostint.{{ .ID }}.>} + } + ] + + }, + {{ end }} +} +no_sys_acc: true +debug: false +trace: false +` +) + +func GenerateTemplate(log *slog.Logger, config internalServerData) ([]byte, error) { + var wr bytes.Buffer + + t := template.Must(template.New("natsconfig").Parse(configTemplate)) + err := t.Execute(&wr, config) + if err != nil { + return nil, err + } + + log.Debug("generated NATS config", slog.String("config", wr.String())) + return wr.Bytes(), nil +} diff --git a/internal/node/internal-nats/config_template_test.go b/internal/node/internal-nats/config_template_test.go new file mode 100644 index 00000000..8bf51aae --- /dev/null +++ b/internal/node/internal-nats/config_template_test.go @@ -0,0 +1,135 @@ +package internalnats + +import ( + "fmt" + "log/slog" + "os" + "slices" + "strings" + "sync" + "testing" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" + "github.com/nats-io/nuid" +) + +func TestTemplateGenerator(t *testing.T) { + + data := internalServerData{ + Credentials: map[string]*credentials{}, + } + + hostUser, _ := nkeys.CreateUser() + hostPub, _ := hostUser.PublicKey() + hostSeed, _ := hostUser.Seed() + + data.NexHostUserPublic = hostPub + data.NexHostUserSeed = string(hostSeed) + + for i := 0; i < 10; i++ { + userSeed, _ := nkeys.CreateUser() + seed, _ := userSeed.Seed() + seedPub, _ := userSeed.PublicKey() + + id := nuid.Next() + + data.Credentials[id] = &credentials{ + NkeySeed: string(seed), + NkeyPublic: seedPub, + ID: id, + } + } + + bytes, err := GenerateTemplate(slog.Default(), data) + if err != nil { + t.Fatalf("failed to render template: %s", err) + } + + fmt.Printf("----\n%s\n----\n", string(bytes)) + + f, err := os.CreateTemp("", "fooconf") + if err != nil { + t.Fatalf("Failed to create temp file: %s", err) + } + defer os.Remove(f.Name()) // clean up + if _, err := f.Write(bytes); err != nil { + t.Fatalf("%s", err) + } + + opts := &server.Options{ + JetStream: true, + StoreDir: "pnats", + Port: -1, + } + err = opts.ProcessConfigFile(f.Name()) + if err != nil { + t.Fatalf("failed to process configuration file: %s", err) + } + s, err := server.NewServer(opts) + if err != nil { + server.PrintAndDie("nats-server: " + err.Error()) + } + s.ConfigureLogger() + if err := server.Run(s); err != nil { + server.PrintAndDie(err.Error()) + } + ncHost, err := nats.Connect(s.ClientURL(), nats.Nkey(data.NexHostUserPublic, func(b []byte) ([]byte, error) { + return hostUser.Sign(b) + })) + if err != nil { + t.Fatal(err) + } + fmt.Printf("%+v\n", ncHost.Servers()) + + var eventWg sync.WaitGroup + eventWg.Add(1) + + var id string + for k := range data.Credentials { + id = k + break + } + + _, _ = ncHost.Subscribe("*.agentevt.>", func(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + if tokens[0] == data.Credentials[id].ID && tokens[2] == "my_event" { + eventWg.Done() + } + }) + + _, _ = ncHost.Subscribe("hostint.>", func(msg *nats.Msg) { + tokens := strings.Split(msg.Subject, ".") + fmt.Printf("-- Replying to %s\n", msg.Subject) + if tokens[1] == data.Credentials[id].ID { + _ = msg.Respond([]byte{42, 42, 42}) + } + }) + + ncUser1, err := nats.Connect(s.ClientURL(), nats.Nkey(data.Credentials[id].NkeyPublic, func(b []byte) ([]byte, error) { + priv, _ := nkeys.FromSeed([]byte(data.Credentials[id].NkeySeed)) + return priv.Sign(b) + })) + if err != nil { + t.Fatalf("Failed to connect as user: %s", err) + } + + // host account should be able to see all of these + _ = ncUser1.Publish("agentevt.my_event", []byte{1, 2, 3}) + eventWg.Wait() + + res, err := ncUser1.Request(fmt.Sprintf("hostint.%s.service", id), []byte{1, 1, 1}, 1*time.Second) + if err != nil { + t.Fatal(err) + } + if !slices.Equal(res.Data, []byte{42, 42, 42}) { + t.Fatalf("Failed to get reply from exported service: %+v", res.Data) + } + + _, err = nats.Connect(s.ClientURL()) + if err == nil { + t.Fatal("Was supposed to fail to connect with unauthorized user but didn't") + } +} diff --git a/internal/node/internal-nats/internal_nats_server.go b/internal/node/internal-nats/internal_nats_server.go new file mode 100644 index 00000000..067465db --- /dev/null +++ b/internal/node/internal-nats/internal_nats_server.go @@ -0,0 +1,324 @@ +package internalnats + +import ( + "context" + "errors" + "log/slog" + "net/url" + "os" + "path" + "strconv" + "time" + + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/nats-io/nkeys" +) + +const ( + defaultInternalNatsConfigFile = "internalconf" + defaultInternalNatsStoreDir = "pnats" + workloadCacheBucketName = "NEXCACHE" + workloadCacheFileKey = "workload" +) + +type InternalNatsServer struct { + ncInternal *nats.Conn + log *slog.Logger + lastOpts *server.Options + server *server.Server + serverConfigData internalServerData +} + +func NewInternalNatsServer(log *slog.Logger) (*InternalNatsServer, error) { + opts := &server.Options{ + JetStream: true, + StoreDir: path.Join(os.TempDir(), defaultInternalNatsStoreDir), + Port: -1, + // Debug: true, + // Trace: true, + // NoLog: true, + } + + data := internalServerData{ + Credentials: map[string]*credentials{}, + } + + hostUser, _ := nkeys.CreateUser() + hostPub, _ := hostUser.PublicKey() + hostSeed, _ := hostUser.Seed() + + data.NexHostUserPublic = hostPub + data.NexHostUserSeed = string(hostSeed) + + opts, err := updateNatsOptions(opts, log, data) + if err != nil { + return nil, err + } + + s, err := server.NewServer(opts) + if err != nil { + server.PrintAndDie("nats-server: " + err.Error()) + return nil, err + } + + // uncomment this if you want internal NATS logs emitted + // s.ConfigureLogger() + + if err := server.Run(s); err != nil { + server.PrintAndDie(err.Error()) + return nil, err + } + + // This connection uses the `nexhost` account, specifically provisioned for the node + ncInternal, err := nats.Connect(s.ClientURL(), nats.Nkey(data.NexHostUserPublic, func(b []byte) ([]byte, error) { + log.Debug("Attempting to sign NATS server nonce for internal host connection", slog.String("public_key", data.NexHostUserPublic)) + return hostUser.Sign(b) + })) + if err != nil { + return nil, err + } + + opts.Port = getPort(s.ClientURL()) + + internalServer := InternalNatsServer{ + ncInternal: ncInternal, + serverConfigData: data, + log: log, + lastOpts: opts, + server: s, + } + + return &internalServer, nil +} + +func (s *InternalNatsServer) Port() int { + return s.lastOpts.Port +} + +func (s *InternalNatsServer) Subsz(opts *server.SubszOptions) (*server.Subsz, error) { + return s.server.Subsz(opts) +} + +// Returns a user keypair that can be used to log into the internal server +func (s *InternalNatsServer) CreateCredentials(id string) (nkeys.KeyPair, error) { + kp, err := nkeys.CreateUser() + if err != nil { + s.log.Error("Failed to create nkey user", slog.Any("error", err)) + return nil, err + } + + pk, _ := kp.PublicKey() + seed, _ := kp.Seed() + + creds := &credentials{ + NkeySeed: string(seed), + NkeyPublic: pk, + ID: id, + } + s.serverConfigData.Credentials[id] = creds + + opts := &server.Options{ + ConfigFile: s.lastOpts.ConfigFile, + JetStream: true, + Port: s.lastOpts.Port, + StoreDir: s.lastOpts.StoreDir, + } + + updated, err := updateNatsOptions(opts, s.log, s.serverConfigData) + if err != nil { + s.log.Error("Failed to update NATS options in internal server", slog.Any("error", err)) + return nil, err + } + + err = s.server.ReloadOptions(updated) + if err != nil { + s.log.Error("Failed to reload NATS internal server options", slog.Any("error", err)) + return nil, err + } + + nc, err := s.ConnectionWithCredentials(creds) + if err != nil { + s.log.Error("Failed to obtain connection for given credentials", slog.Any("error", err)) + return nil, err + } + + _, err = ensureWorkloadObjectStore(nc) + if err != nil { + s.log.Error("Failed to create or locate object store in internal NATS server", + slog.Any("error", err), + ) + return nil, err + } + + return kp, nil +} + +// Destroy previously-created credentials +func (s *InternalNatsServer) DestroyCredentials(id string) error { + delete(s.serverConfigData.Credentials, id) + + updated, err := updateNatsOptions(&server.Options{ + ConfigFile: s.lastOpts.ConfigFile, + JetStream: true, + Port: s.lastOpts.Port, + StoreDir: s.lastOpts.StoreDir, + }, s.log, s.serverConfigData) + if err != nil { + s.log.Error("Failed to update NATS options in internal server", slog.Any("error", err)) + return err + } + + err = s.server.ReloadOptions(updated) + if err != nil { + s.log.Error("Failed to reload NATS internal server options", slog.Any("error", err)) + return err + } + + return nil +} + +func (s *InternalNatsServer) ClientURL() string { + return s.ncInternal.ConnectedUrl() +} + +func (s *InternalNatsServer) Connection() *nats.Conn { + return s.ncInternal +} + +func (s *InternalNatsServer) Shutdown() { + s.server.Shutdown() + s.server.WaitForShutdown() +} + +func (s *InternalNatsServer) StoreFileForID(id string, bytes []byte) error { + ctx, cancelF := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelF() + + creds, err := s.FindCredentials(id) + if err != nil { + return err + } + + nc, err := s.ConnectionWithCredentials(creds) + if err != nil { + return err + } + + bucket, err := ensureWorkloadObjectStore(nc) + if err != nil { + return err + } + + _, err = bucket.PutBytes(ctx, workloadCacheFileKey, bytes) + return err +} + +func (s *InternalNatsServer) ConnectionWithID(id string) (*nats.Conn, error) { + creds, err := s.FindCredentials(id) + if err != nil { + return nil, err + } + + return s.ConnectionWithCredentials(creds) +} + +func (s *InternalNatsServer) ConnectionWithCredentials(creds *credentials) (*nats.Conn, error) { + pair, err := nkeys.FromSeed([]byte(creds.NkeySeed)) + if err != nil { + return nil, err + } + + nc, err := nats.Connect(s.server.ClientURL(), nats.Nkey(creds.NkeyPublic, func(b []byte) ([]byte, error) { + s.log.Debug("Attempting to sign NATS server nonce for internal connection", slog.String("public_key", creds.NkeyPublic)) + return pair.Sign(b) + })) + if err != nil { + s.log.Warn("Failed to sign NATS server nonce for internal connection", slog.String("public_key", creds.NkeyPublic)) + return nil, err + } + + return nc, nil +} + +func (s *InternalNatsServer) FindCredentials(id string) (*credentials, error) { + if creds, ok := s.serverConfigData.Credentials[id]; ok { + return creds, nil + } + + return nil, errors.New("No such workload") +} + +func ensureWorkloadObjectStore(nc *nats.Conn) (jetstream.ObjectStore, error) { + var err error + ctx, cancelF := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelF() + + js, err := jetstream.New(nc) + if err != nil { + return nil, err + } + var bucket jetstream.ObjectStore + + bucket, err = js.ObjectStore(ctx, workloadCacheBucketName) + if err != nil { + if errors.Is(err, jetstream.ErrBucketNotFound) { + bucket, err = js.CreateObjectStore(ctx, jetstream.ObjectStoreConfig{ + Bucket: workloadCacheBucketName, + Description: "Cache for workload images to be executed by agent", + Storage: jetstream.MemoryStorage, + }) + if err != nil { + return nil, err + } + } else { + return nil, err + } + } + return bucket, nil +} + +func updateNatsOptions(opts *server.Options, log *slog.Logger, data internalServerData) (*server.Options, error) { + bytes, err := GenerateTemplate(log, data) + if err != nil { + log.Error("Failed to generate internal nats server config file", slog.Any("error", err)) + return nil, err + } + + var f *os.File + if len(opts.ConfigFile) == 0 { + f, err = os.CreateTemp(os.TempDir(), defaultInternalNatsConfigFile) + } else { + f, err = os.Create(opts.ConfigFile) + } + if err != nil { + return nil, err + } + defer os.Remove(f.Name()) // clean up + + if _, err := f.Write(bytes); err != nil { + log.Error("Failed to write internal nats server config file", slog.Any("error", err)) + return nil, err + } + + err = opts.ProcessConfigFile(f.Name()) + if err != nil { + log.Error("Failed to process configuration file", slog.Any("error", err)) + return nil, err + } + + return opts, nil +} + +func getPort(clientUrl string) int { + u, err := url.Parse(clientUrl) + if err != nil { + return -1 + } + res, err := strconv.Atoi(u.Port()) + if err != nil { + return -1 + } + return res +} diff --git a/internal/node/internal-nats/internal_nats_server_test.go b/internal/node/internal-nats/internal_nats_server_test.go new file mode 100644 index 00000000..81ec1918 --- /dev/null +++ b/internal/node/internal-nats/internal_nats_server_test.go @@ -0,0 +1,107 @@ +package internalnats + +import ( + "context" + "fmt" + "log/slog" + "net/url" + "slices" + "testing" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" + "github.com/nats-io/nuid" +) + +func TestInternalNatsServer(t *testing.T) { + + server, err := NewInternalNatsServer(slog.Default()) + if err != nil { + t.Fatalf("Failed to create internal nats server: %s", err) + } + serverUrl := server.Connection().Servers()[0] + sUrl, err := url.Parse(serverUrl) + if err != nil { + t.Fatalf("Should have a valid URL from the server but didn't: %s", err) + } + p := server.Port() + if fmt.Sprintf("%d", p) != sUrl.Port() { + t.Fatalf("Port number from options doesn't match what's running: %d %s", p, sUrl.Port()) + } + + workloadId := nuid.Next() + + keypair, err := server.CreateCredentials(workloadId) + if err != nil { + t.Fatalf("Should have been able to add a workload user but couldn't: %s", err) + } + pk, _ := keypair.PublicKey() + seed, _ := keypair.Seed() + fmt.Printf("New workload user: %s %s\n", pk, string(seed)) + + // log in as the new workload + _, err = nats.Connect(server.Connection().Servers()[0], nats.Nkey(pk, func(b []byte) ([]byte, error) { + return keypair.Sign(b) + })) + + if err != nil { + t.Fatalf("Couldn't connect to the internal server as a workload: %s", err) + } + + workloadId2 := nuid.Next() + keypair2, err := server.CreateCredentials(workloadId2) + if err != nil { + t.Fatalf("Should have been able to add a workload user but couldn't: %s", err) + } + pk2, _ := keypair2.PublicKey() + seed2, _ := keypair2.Seed() + fmt.Printf("New workload user: %s %s\n", pk2, string(seed2)) + + // log in as the new workload + _, err = nats.Connect(server.Connection().Servers()[0], nats.Nkey(pk2, func(b []byte) ([]byte, error) { + return keypair2.Sign(b) + })) + + if err != nil { + t.Fatalf("Couldn't connect to the internal server as a workload: %s", err) + } +} + +// With the new security system, all agents will simply pull their workload binary from +// the NEXCACHE bucket in their account, with the key of 'workload' +func TestInternalNatsServerFileCache(t *testing.T) { + ctx := context.Background() + server, err := NewInternalNatsServer(slog.Default()) + if err != nil { + t.Fatalf("Failed to create internal nats server: %s", err) + } + fmt.Printf("Internal server on %s\n", server.Connection().Servers()[0]) + + workloadId := nuid.Next() + + keypair, err := server.CreateCredentials(workloadId) + if err != nil { + t.Fatalf("Should have been able to add a workload user but couldn't: %s", err) + } + pk, _ := keypair.PublicKey() + seed, _ := keypair.Seed() + fmt.Printf("New workload user: %s %s\n", pk, string(seed)) + + err = server.StoreFileForID(workloadId, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) + if err != nil { + t.Fatalf("Should have gotten no error but didn't: %s", err) + } + + ud, _ := server.FindCredentials(workloadId) + userCn, _ := server.ConnectionWithCredentials(ud) + js, _ := jetstream.New(userCn) + bucket, _ := js.ObjectStore(ctx, workloadCacheBucketName) + workload, err := bucket.GetBytes(ctx, workloadCacheFileKey) + if err != nil { + t.Fatalf("Should have queried the workload bytes, but got error instead: %s", err) + } + + if !slices.Equal(workload, []byte{1, 2, 3, 4, 5, 6, 7, 8, 9}) { + t.Fatalf("File bytes did not round trip properly, got %+v", workload) + } +} diff --git a/internal/node/node.go b/internal/node/node.go index b783f705..f7641692 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -5,10 +5,8 @@ import ( "errors" "fmt" "log/slog" - "net/url" "os" "os/signal" - "path" "path/filepath" "runtime" "strconv" @@ -31,7 +29,6 @@ import ( const ( systemNamespace = "system" - defaultInternalNatsStoreDir = "pnats" heartbeatInterval = 30 * time.Second publicNATSServerStartTimeout = 50 * time.Millisecond runloopSleepInterval = 100 * time.Millisecond @@ -66,10 +63,6 @@ type Node struct { natspub *server.Server nc *nats.Conn - natsint *server.Server - ncint *nats.Conn - ncHostServices *nats.Conn - startedAt time.Time telemetry *observability.Telemetry @@ -112,7 +105,6 @@ func NewNode( node.nexus = nodeOpts.NexusName node.capabilities = *models.GetNodeCapabilities(node.config.Tags) - return node, nil } @@ -271,29 +263,18 @@ func (n *Node) init() error { n.log.Info("Established node NATS connection", slog.String("servers", n.opts.Servers)) } - _err = n.startHostServicesConnection(n.nc) - if _err != nil { - n.log.Error("Failed to start host services connection", slog.Any("error", _err)) - err = errors.Join(err, fmt.Errorf("failed to start host services NATS connection: %s", _err)) - } else { - n.log.Info("Established host services NATS connection", slog.String("server", n.ncHostServices.Servers()[0])) - } - - // start internal NATS server - _err = n.startInternalNATS() - if _err != nil { - n.log.Error("Failed to start internal NATS server", slog.Any("err", _err)) - err = errors.Join(err, fmt.Errorf("failed to start internal NATS server: %s", _err)) - } else { - n.log.Info("Internal NATS server started", slog.String("client_url", n.natsint.ClientURL())) - } - - n.manager, _err = NewWorkloadManager(n.ctx, n.cancelF, - n.keypair, n.publicKey, - n.nc, n.ncint, n.ncHostServices, - n.config, n.log, n.telemetry) + n.manager, _err = NewWorkloadManager( + n.ctx, + n.cancelF, + n.keypair, + n.publicKey, + n.nc, + n.config, + n.log, + n.telemetry, + ) if _err != nil { - n.log.Error("Failed to initialize machine manager", slog.Any("err", _err)) + n.log.Error("Failed to initialize workload manager", slog.Any("err", _err)) err = errors.Join(err, _err) } @@ -319,92 +300,6 @@ func (n *Node) init() error { return err } -func (n *Node) startHostServicesConnection(defaultConnection *nats.Conn) error { - if n.config.HostServicesConfiguration != nil { - natsOpts := []nats.Option{ - nats.Name("nex-hostservices"), - } - if len(n.config.HostServicesConfiguration.NatsUserJwt) > 0 { - natsOpts = append(natsOpts, - nats.UserJWTAndSeed( - n.config.HostServicesConfiguration.NatsUserJwt, - n.config.HostServicesConfiguration.NatsUserSeed, - ), - ) - } - - if len(n.config.HostServicesConfiguration.NatsUrl) == 0 { - n.config.HostServicesConfiguration.NatsUrl = defaultConnection.Servers()[0] - n.ncHostServices = n.nc - } else { - nc, err := nats.Connect(n.config.HostServicesConfiguration.NatsUrl, natsOpts...) - if err != nil { - return err - } - n.ncHostServices = nc - } - } else { - n.ncHostServices = n.nc - } - return nil -} - -func (n *Node) startInternalNATS() error { - var err error - - n.natsint, err = server.NewServer(&server.Options{ - Host: "0.0.0.0", - Port: -1, - JetStream: true, - NoLog: true, - StoreDir: path.Join(os.TempDir(), defaultInternalNatsStoreDir), - }) - if err != nil { - return err - } - n.natsint.Start() - - clientUrl, err := url.Parse(n.natsint.ClientURL()) - if err != nil { - return fmt.Errorf("failed to parse internal NATS client URL: %s", err) - } - - p, err := strconv.Atoi(clientUrl.Port()) - if err != nil { - return fmt.Errorf("failed to parse internal NATS client URL: %s", err) - } - n.config.InternalNodePort = &p - - n.ncint, err = nats.Connect("", nats.InProcessServer(n.natsint)) - if err != nil { - n.log.Error("Failed to connect to internal nats", slog.Any("err", err), slog.Any("internal_url", clientUrl), slog.Bool("with_jetstream", n.natsint.JetStreamEnabled())) - return fmt.Errorf("failed to connect to internal nats: %s", err) - } - - rtt, err := n.ncint.RTT() - if err != nil { - n.log.Warn("Failed get internal nats RTT", slog.Any("err", err), slog.Any("internal_url", clientUrl)) - } else { - n.log.Debug("Internal NATS RTT", slog.String("rtt", rtt.String()), slog.Bool("with_jetstream", n.natsint.JetStreamEnabled())) - } - - jsCtx, err := n.ncint.JetStream() - if err != nil { - return fmt.Errorf("failed to establish jetstream connection to internal nats: %s", err) - } - - _, err = jsCtx.CreateObjectStore(&nats.ObjectStoreConfig{ - Bucket: WorkloadCacheBucketName, - Description: "Object store cache for nex-node workloads", - Storage: nats.MemoryStorage, - }) - if err != nil { - return fmt.Errorf("failed to create internal object store: %s", err) - } - - return nil -} - func (n *Node) startPublicNATS() error { if n.config.PublicNATSServer == nil { // no-op @@ -429,9 +324,18 @@ func (n *Node) startPublicNATS() error { } func (n *Node) handleAutostarts() { - time.Sleep(2 * time.Second) // delay a bit before attempting autostarts - for _, autostart := range n.config.AutostartConfiguration.Workloads { + var agentClient *agentapi.AgentClient + var err error + + for agentClient == nil { + agentClient, err = n.manager.SelectRandomAgent() + if err != nil { + n.log.Warn("Failed to resolve agent for autostart", slog.String("error", err.Error())) + time.Sleep(25 * time.Millisecond) + } + } + request, err := controlapi.NewDeployRequest( controlapi.Argv(autostart.Argv), controlapi.Location(autostart.Location), @@ -452,6 +356,7 @@ func (n *Node) handleAutostarts() { ) continue } + _, err = request.Validate() if err != nil { n.log.Error("Failed to validate autostart deployment request", @@ -459,6 +364,7 @@ func (n *Node) handleAutostarts() { ) continue } + agentDeployRequest := &agentapi.DeployRequest{ Argv: request.Argv, DecodedClaims: request.DecodedClaims, @@ -479,7 +385,7 @@ func (n *Node) handleAutostarts() { WorkloadJwt: request.WorkloadJwt, } - numBytes, workloadHash, err := n.api.mgr.CacheWorkload(request) + numBytes, workloadHash, err := n.api.mgr.CacheWorkload(agentClient.ID(), request) if err != nil { n.api.log.Error("Failed to cache auto-start workload bytes", slog.Any("err", err), @@ -492,7 +398,7 @@ func (n *Node) handleAutostarts() { agentDeployRequest.TotalBytes = int64(numBytes) agentDeployRequest.Hash = *workloadHash - workloadId, err := n.api.mgr.DeployWorkload(agentDeployRequest) + err = n.api.mgr.DeployWorkload(agentClient, agentDeployRequest) if err != nil { n.log.Error("Failed to deploy autostart workload", slog.Any("error", err), @@ -504,7 +410,7 @@ func (n *Node) handleAutostarts() { n.log.Info("Autostart workload started", slog.String("name", autostart.Name), slog.String("namespace", autostart.Namespace), - slog.String("workload_id", *workloadId), + slog.String("workload_id", agentClient.ID()), ) } } @@ -626,16 +532,16 @@ func (n *Node) validateConfig() error { func (n *Node) shutdown() { if atomic.AddUint32(&n.closing, 1) == 1 { n.log.Debug("shutting down") - _ = n.api.Drain() - _ = n.manager.Stop() + if n.api != nil { + _ = n.api.Drain() + } - if !n.startedAt.IsZero() { - _ = n.publishNodeStopped() + if n.manager != nil { + _ = n.manager.Stop() } - _ = n.ncint.Drain() - for !n.ncint.IsClosed() { - time.Sleep(time.Millisecond * 25) + if !n.startedAt.IsZero() { + _ = n.publishNodeStopped() } _ = n.nc.Drain() @@ -643,10 +549,6 @@ func (n *Node) shutdown() { time.Sleep(time.Millisecond * 25) } - n.natsint.Shutdown() - n.natsint.WaitForShutdown() - _ = os.Remove(path.Join(os.TempDir(), defaultInternalNatsStoreDir)) - if n.natspub != nil { n.natspub.Shutdown() n.natspub.WaitForShutdown() diff --git a/internal/node/nodeproxy.go b/internal/node/nodeproxy.go index 113071ba..6b04f0d5 100644 --- a/internal/node/nodeproxy.go +++ b/internal/node/nodeproxy.go @@ -3,9 +3,9 @@ package nexnode import ( "log/slog" - "github.com/nats-io/nats-server/v2/server" "github.com/nats-io/nats.go" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" "github.com/synadia-io/nex/internal/node/processmanager" ) @@ -38,14 +38,6 @@ func (n *NodeProxy) NodeConfiguration() *models.NodeConfiguration { return n.n.config } -func (n *NodeProxy) InternalNATS() *server.Server { - return n.n.natsint -} - -func (n *NodeProxy) InternalNATSConn() *nats.Conn { - return n.n.ncint -} - func (n *NodeProxy) Telemetry() *observability.Telemetry { return n.n.telemetry } @@ -66,8 +58,12 @@ func (m *WorkloadManagerProxy) NodeConfiguration() *models.NodeConfiguration { return m.m.config } +func (m *WorkloadManagerProxy) InternalNATS() *internalnats.InternalNatsServer { + return m.m.natsint +} + func (m *WorkloadManagerProxy) InternalNATSConn() *nats.Conn { - return m.m.ncInternal + return m.m.ncint } func (m *WorkloadManagerProxy) Telemetry() *observability.Telemetry { diff --git a/internal/node/processmanager/firecracker_procman.go b/internal/node/processmanager/firecracker_procman.go index 3d991096..697c1fcf 100644 --- a/internal/node/processmanager/firecracker_procman.go +++ b/internal/node/processmanager/firecracker_procman.go @@ -16,6 +16,7 @@ import ( agentapi "github.com/synadia-io/nex/internal/agent-api" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" @@ -32,21 +33,26 @@ type FirecrackerProcessManager struct { allVMs map[string]*runningFirecracker warmVMs chan *runningFirecracker + intNats *internalnats.InternalNatsServer + delegate ProcessDelegate deployRequests map[string]*agentapi.DeployRequest } func NewFirecrackerProcessManager( + intnats *internalnats.InternalNatsServer, log *slog.Logger, config *models.NodeConfiguration, telemetry *observability.Telemetry, ctx context.Context, ) (*FirecrackerProcessManager, error) { + return &FirecrackerProcessManager{ - config: config, - t: telemetry, - log: log, - ctx: ctx, + config: config, + intNats: intnats, + t: telemetry, + log: log, + ctx: ctx, allVMs: make(map[string]*runningFirecracker), warmVMs: make(chan *runningFirecracker, config.MachinePoolSize), @@ -129,7 +135,7 @@ func (f *FirecrackerProcessManager) Start(delegate ProcessDelegate) error { defer func() { if r := recover(); r != nil { - f.log.Debug(fmt.Sprintf("recovered: %s", r)) + f.log.Debug(fmt.Sprintf("firecracker process manager recovered from failure: %s", r)) } }() @@ -156,7 +162,14 @@ func (f *FirecrackerProcessManager) Start(delegate ProcessDelegate) error { continue } - err = f.setMetadata(vm) + workloadKey, err := f.intNats.CreateCredentials(vm.vmmID) + if err != nil { + f.log.Error("Failed to create workload user", slog.Any("err", err)) + continue + } + workloadSeed, _ := workloadKey.Seed() + + err = f.setMetadata(vm, string(workloadSeed)) if err != nil { f.log.Warn("Failed to set metadata on VM for warming pool.", slog.Any("err", err)) continue @@ -260,12 +273,13 @@ func (f *FirecrackerProcessManager) cleanSockets() { } } -func (f *FirecrackerProcessManager) setMetadata(vm *runningFirecracker) error { +func (f *FirecrackerProcessManager) setMetadata(vm *runningFirecracker, workloadSeed string) error { return vm.setMetadata(&agentapi.MachineMetadata{ - Message: models.StringOrNil("Host-supplied metadata"), - NodeNatsHost: vm.config.InternalNodeHost, - NodeNatsPort: vm.config.InternalNodePort, - VmID: &vm.vmmID, + Message: models.StringOrNil("Host-supplied metadata"), + NodeNatsHost: vm.config.InternalNodeHost, + NodeNatsPort: vm.config.InternalNodePort, + NodeNatsNkeySeed: &workloadSeed, + VmID: &vm.vmmID, }) } diff --git a/internal/node/processmanager/process_mgr_linux.go b/internal/node/processmanager/process_mgr_linux.go index 8a80b84e..9838757a 100644 --- a/internal/node/processmanager/process_mgr_linux.go +++ b/internal/node/processmanager/process_mgr_linux.go @@ -7,21 +7,24 @@ import ( "log/slog" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" ) // Initialize an appropriate agent process manager instance based on the sandbox config value func NewProcessManager( + intNats *internalnats.InternalNatsServer, log *slog.Logger, config *models.NodeConfiguration, telemetry *observability.Telemetry, ctx context.Context, ) (ProcessManager, error) { + if config.NoSandbox { log.Warn("⚠️ Sandboxing has been disabled! Workloads are spawned directly by agents") log.Warn("⚠️ Do not run untrusted workloads in this mode!") - return NewSpawningProcessManager(log, config, telemetry, ctx) + return NewSpawningProcessManager(intNats, log, config, telemetry, ctx) } - return NewFirecrackerProcessManager(log, config, telemetry, ctx) + return NewFirecrackerProcessManager(intNats, log, config, telemetry, ctx) } diff --git a/internal/node/processmanager/process_mgr_windows.go b/internal/node/processmanager/process_mgr_windows.go index 3619cb8b..08f543a4 100644 --- a/internal/node/processmanager/process_mgr_windows.go +++ b/internal/node/processmanager/process_mgr_windows.go @@ -7,15 +7,17 @@ import ( "log/slog" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" ) // Initialize an appropriate agent process manager instance based on the sandbox config value func NewProcessManager( + intNats *internalnats.InternalNatsServer, log *slog.Logger, config *models.NodeConfiguration, telemetry *observability.Telemetry, ctx context.Context, ) (ProcessManager, error) { - return NewSpawningProcessManager(log, config, telemetry, ctx) + return NewSpawningProcessManager(intNats, log, config, telemetry, ctx) } diff --git a/internal/node/processmanager/spawn_procman.go b/internal/node/processmanager/spawn_procman.go index f83cfde0..a3718ebd 100644 --- a/internal/node/processmanager/spawn_procman.go +++ b/internal/node/processmanager/spawn_procman.go @@ -14,6 +14,7 @@ import ( "github.com/rs/xid" agentapi "github.com/synadia-io/nex/internal/agent-api" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" ) @@ -32,6 +33,7 @@ type SpawningProcessManager struct { liveProcs map[string]*spawnedProcess warmProcs chan *spawnedProcess + intNats *internalnats.InternalNatsServer delegate ProcessDelegate deployRequests map[string]*agentapi.DeployRequest @@ -54,16 +56,18 @@ type spawnedProcess struct { } func NewSpawningProcessManager( + intNats *internalnats.InternalNatsServer, log *slog.Logger, config *models.NodeConfiguration, telemetry *observability.Telemetry, ctx context.Context, ) (*SpawningProcessManager, error) { return &SpawningProcessManager{ - config: config, - t: telemetry, - log: log, - ctx: ctx, + config: config, + t: telemetry, + log: log, + ctx: ctx, + intNats: intNats, stopMutexes: make(map[string]*sync.Mutex), @@ -223,6 +227,12 @@ func (s *SpawningProcessManager) spawn() (*spawnedProcess, error) { id := xid.New() workloadID := id.String() + kp, err := s.intNats.CreateCredentials(workloadID) + if err != nil { + return nil, err + } + seed, _ := kp.Seed() + cmd := exec.Command(nexAgentBinary) cmd.Env = append(os.Environ(), "NEX_SANDBOX=false", @@ -230,6 +240,7 @@ func (s *SpawningProcessManager) spawn() (*spawnedProcess, error) { // can't use the CNI host because we don't use it in no-sandbox mode "NEX_NODE_NATS_HOST=0.0.0.0", fmt.Sprintf("NEX_NODE_NATS_PORT=%d", *s.config.InternalNodePort), + fmt.Sprintf("NEX_NODE_NATS_NKEY_SEED=%s", seed), ) cmd.Stderr = &procLogEmitter{workloadID: workloadID, log: s.log.WithGroup(workloadID), stderr: true} @@ -245,7 +256,7 @@ func (s *SpawningProcessManager) spawn() (*spawnedProcess, error) { Exit: make(chan int), } - err := cmd.Start() + err = cmd.Start() if err != nil { s.log.Warn("Agent command failed to start", slog.Any("error", err)) return nil, err diff --git a/internal/node/services.go b/internal/node/services.go index 117f9a6b..ccdf4d38 100644 --- a/internal/node/services.go +++ b/internal/node/services.go @@ -8,6 +8,7 @@ import ( hs "github.com/synadia-io/nex/host-services" "github.com/synadia-io/nex/host-services/builtins" "github.com/synadia-io/nex/internal/models" + "go.opentelemetry.io/otel/trace" ) const hostServiceHTTP = "http" @@ -19,39 +20,35 @@ const hostServiceObjectStore = "objectstore" // exposed to workloads by way of the agent which makes RPC calls // via the internal NATS connection type HostServices struct { + config *models.HostServicesConfig log *slog.Logger - mgr *WorkloadManager ncHostServices *nats.Conn ncint *nats.Conn - - hsServer *hs.HostServicesServer - config *models.HostServicesConfig + server *hs.HostServicesServer } func NewHostServices( - mgr *WorkloadManager, ncint *nats.Conn, ncHostServices *nats.Conn, config *models.HostServicesConfig, log *slog.Logger, + tracer trace.Tracer, ) *HostServices { return &HostServices{ + config: config, log: log, - mgr: mgr, ncHostServices: ncHostServices, - config: config, ncint: ncint, // ‼️ It cannot be overstated how important it is that the host services server // be given the -internal- NATS connection and -not- the external/control one // // Sincerely, // Someone who lost a day of troubleshooting - hsServer: hs.NewHostServicesServer(ncint, log, mgr.t.Tracer), + server: hs.NewHostServicesServer(ncint, log, tracer), } } func (h *HostServices) init() error { - if httpConfig, ok := h.config.Services[hostServiceHTTP]; ok { if httpConfig.Enabled { http, err := builtins.NewHTTPService(h.ncHostServices, h.log) @@ -61,12 +58,14 @@ func (h *HostServices) init() error { } else { h.log.Debug("initialized http host service") } - err = h.hsServer.AddService(hostServiceHTTP, http, httpConfig.Configuration) + + err = h.server.AddService(hostServiceHTTP, http, httpConfig.Configuration) if err != nil { return err } } } + if kvConfig, ok := h.config.Services[hostServiceKeyValue]; ok { if kvConfig.Enabled { kv, err := builtins.NewKeyValueService(h.ncHostServices, h.log) @@ -76,12 +75,14 @@ func (h *HostServices) init() error { } else { h.log.Debug("initialized key/value host service") } - err = h.hsServer.AddService(hostServiceKeyValue, kv, kvConfig.Configuration) + + err = h.server.AddService(hostServiceKeyValue, kv, kvConfig.Configuration) if err != nil { return err } } } + if messagingConfig, ok := h.config.Services[hostServiceMessaging]; ok { if messagingConfig.Enabled { messaging, err := builtins.NewMessagingService(h.ncHostServices, h.log) @@ -91,12 +92,14 @@ func (h *HostServices) init() error { } else { h.log.Debug("initialized messaging host service") } - err = h.hsServer.AddService(hostServiceMessaging, messaging, messagingConfig.Configuration) + + err = h.server.AddService(hostServiceMessaging, messaging, messagingConfig.Configuration) if err != nil { return err } } } + if objectConfig, ok := h.config.Services[hostServiceObjectStore]; ok { if objectConfig.Enabled { object, err := builtins.NewObjectStoreService(h.ncHostServices, h.log) @@ -106,14 +109,14 @@ func (h *HostServices) init() error { } else { h.log.Debug("initialized object store host service") } - err = h.hsServer.AddService(hostServiceObjectStore, object, objectConfig.Configuration) + + err = h.server.AddService(hostServiceObjectStore, object, objectConfig.Configuration) if err != nil { return err } } } - h.log.Info("Host services configured", slog.Any("services", h.hsServer.Services())) - - return h.hsServer.Start() + h.log.Info("Host services configured", slog.Any("services", h.server.Services())) + return h.server.Start() } diff --git a/internal/node/workload_mgr.go b/internal/node/workload_mgr.go index a62a5f62..71e9fee0 100644 --- a/internal/node/workload_mgr.go +++ b/internal/node/workload_mgr.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" "log/slog" + "os" + "path" "strconv" "strings" "sync" @@ -18,6 +20,7 @@ import ( controlapi "github.com/synadia-io/nex/control-api" agentapi "github.com/synadia-io/nex/internal/agent-api" "github.com/synadia-io/nex/internal/models" + internalnats "github.com/synadia-io/nex/internal/node/internal-nats" "github.com/synadia-io/nex/internal/node/observability" "github.com/synadia-io/nex/internal/node/processmanager" @@ -28,6 +31,8 @@ import ( ) const ( + defaultInternalNatsStoreDir = "pnats" + EventSubjectPrefix = "$NEX.events" LogSubjectPrefix = "$NEX.logs" WorkloadCacheBucketName = "NEXCACHE" @@ -38,15 +43,18 @@ const ( // those processes. The workload manager does not know how the agent processes are created, only how to communicate // with them via the internal NATS server type WorkloadManager struct { - closing uint32 - config *models.NodeConfiguration - kp nkeys.KeyPair - log *slog.Logger - nc *nats.Conn - ncInternal *nats.Conn - cancel context.CancelFunc - ctx context.Context - t *observability.Telemetry + closing uint32 + config *models.NodeConfiguration + kp nkeys.KeyPair + log *slog.Logger + cancel context.CancelFunc + ctx context.Context + t *observability.Telemetry + + nc *nats.Conn + natsint *internalnats.InternalNatsServer + ncint *nats.Conn + ncHostServices *nats.Conn procMan processmanager.ProcessManager @@ -58,7 +66,8 @@ type WorkloadManager struct { pendingAgents map[string]*agentapi.AgentClient handshakes map[string]string - handshakeTimeout time.Duration // TODO: make configurable... + handshakeTimeout time.Duration + pingTimeout time.Duration hostServices *HostServices @@ -68,8 +77,7 @@ type WorkloadManager struct { // Subscriptions created on behalf of functions that cannot subscribe internallly subz map[string][]*nats.Subscription - natsStoreDir string - publicKey string + publicKey string } // Initialize a new workload manager instance to manage and communicate with agents @@ -78,7 +86,7 @@ func NewWorkloadManager( cancel context.CancelFunc, nodeKeypair nkeys.KeyPair, publicKey string, - nc, ncint, ncHostServices *nats.Conn, + nc *nats.Conn, config *models.NodeConfiguration, log *slog.Logger, telemetry *observability.Telemetry, @@ -96,10 +104,9 @@ func NewWorkloadManager( handshakeTimeout: time.Duration(config.AgentHandshakeTimeoutMillisecond) * time.Millisecond, kp: nodeKeypair, log: log, - natsStoreDir: defaultInternalNatsStoreDir, nc: nc, - ncInternal: ncint, poolMutex: &sync.Mutex{}, + pingTimeout: time.Duration(config.AgentPingTimeoutMillisecond) * time.Millisecond, publicKey: publicKey, t: telemetry, @@ -112,16 +119,33 @@ func NewWorkloadManager( var err error - w.procMan, err = processmanager.NewProcessManager(w.log, w.config, w.t, w.ctx) + // start internal NATS server + err = w.startInternalNATS() if err != nil { - w.log.Error("Failed to initialize agent process manager", slog.Any("error", err)) + w.log.Error("Failed to start internal NATS server", slog.Any("err", err)) + return nil, err + } else { + w.log.Info("Internal NATS server started", slog.String("client_url", w.natsint.ClientURL())) + } + + err = w.startHostServicesNATSConnection() + if err != nil { + w.log.Error("Failed to start host services NATS connection", slog.Any("error", err)) return nil, err + } else { + w.log.Info("Established host services NATS connection", slog.String("server", w.ncHostServices.Servers()[0])) } - w.hostServices = NewHostServices(w, ncint, ncHostServices, config.HostServicesConfiguration, w.log) + w.hostServices = NewHostServices(w.ncint, w.ncHostServices, config.HostServicesConfiguration, w.log, w.t.Tracer) err = w.hostServices.init() if err != nil { - w.log.Warn("Failed to initialize host services.", slog.Any("err", err)) + w.log.Warn("Failed to initialize host services", slog.Any("err", err)) + return nil, err + } + + w.procMan, err = processmanager.NewProcessManager(w.natsint, w.log, w.config, w.t, w.ctx) + if err != nil { + w.log.Error("Failed to initialize agent process manager", slog.Any("error", err)) return nil, err } @@ -139,11 +163,11 @@ func (w *WorkloadManager) Start() { } } -func (m *WorkloadManager) CacheWorkload(request *controlapi.DeployRequest) (uint64, *string, error) { +func (m *WorkloadManager) CacheWorkload(workloadID string, request *controlapi.DeployRequest) (uint64, *string, error) { bucket := request.Location.Host key := strings.Trim(request.Location.Path, "/") - m.log.Info("Attempting object store download", slog.String("bucket", bucket), slog.String("key", key), slog.String("url", m.nc.Opts.Url)) + m.log.Info("Attempting object store download", slog.String("bucket", bucket), slog.String("key", key)) opts := []nats.JSOpt{} if request.JsDomain != nil { @@ -173,50 +197,35 @@ func (m *WorkloadManager) CacheWorkload(request *controlapi.DeployRequest) (uint return 0, nil, err } - jsInternal, err := m.ncInternal.JetStream() + err = m.natsint.StoreFileForID(workloadID, workload) if err != nil { - m.log.Error("Failed to acquire JetStream context for internal object store.", slog.Any("err", err)) - panic(err) - } - - cache, err := jsInternal.ObjectStore(agentapi.WorkloadCacheBucket) - if err != nil { - m.log.Error("Failed to get object store reference for internal cache.", slog.Any("err", err)) - panic(err) - } - - obj, err := cache.PutBytes(request.DecodedClaims.Subject, workload) - if err != nil { - m.log.Error("Failed to write workload to internal cache.", slog.Any("err", err)) - panic(err) + m.log.Error("Failed to store bytes from source object store in cache", slog.Any("err", err), slog.String("key", key)) } workloadHash := sha256.New() workloadHash.Write(workload) workloadHashString := hex.EncodeToString(workloadHash.Sum(nil)) - m.log.Info("Successfully stored workload in internal object store", slog.String("name", request.DecodedClaims.Subject), slog.Int64("bytes", int64(obj.Size))) - return obj.Size, &workloadHashString, nil + m.log.Info("Successfully stored workload in internal object store", + slog.String("name", request.DecodedClaims.Subject), + slog.Int("bytes", len(workload))) + + return uint64(len(workload)), &workloadHashString, nil } // Deploy a workload as specified by the given deploy request to an available // agent in the configured pool -func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*string, error) { +func (w *WorkloadManager) DeployWorkload(agentClient *agentapi.AgentClient, request *agentapi.DeployRequest) error { w.poolMutex.Lock() defer w.poolMutex.Unlock() - agentClient, err := w.selectRandomAgent() - if err != nil { - return nil, fmt.Errorf("failed to deploy workload: %s", err) - } - workloadID := agentClient.ID() - err = w.procMan.PrepareWorkload(workloadID, request) + err := w.procMan.PrepareWorkload(workloadID, request) if err != nil { - return nil, fmt.Errorf("failed to prepare agent process for workload deployment: %s", err) + return fmt.Errorf("failed to prepare agent process for workload deployment: %s", err) } - status := w.ncInternal.Status() + status := w.ncint.Status() w.log.Debug("Workload manager deploying workload", slog.String("workload_id", workloadID), @@ -224,7 +233,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri deployResponse, err := agentClient.DeployWorkload(request) if err != nil { - return nil, fmt.Errorf("failed to submit request for workload deployment: %s", err) + return fmt.Errorf("failed to submit request for workload deployment: %s", err) } if deployResponse.Accepted { @@ -243,7 +252,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri slog.Any("err", err), ) _ = w.StopWorkload(workloadID, true) - return nil, err + return err } w.log.Info("Created trigger subject subscription for deployed workload", @@ -257,7 +266,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri } } else { _ = w.StopWorkload(workloadID, false) - return nil, fmt.Errorf("workload rejected by agent: %s", *deployResponse.Message) + return fmt.Errorf("workload rejected by agent: %s", *deployResponse.Message) } w.t.WorkloadCounter.Add(w.ctx, 1, metric.WithAttributes(attribute.String("workload_type", string(request.WorkloadType)))) @@ -265,7 +274,7 @@ func (w *WorkloadManager) DeployWorkload(request *agentapi.DeployRequest) (*stri w.t.DeployedByteCounter.Add(w.ctx, request.TotalBytes) w.t.DeployedByteCounter.Add(w.ctx, request.TotalBytes, metric.WithAttributes(attribute.String("namespace", *request.Namespace))) - return &workloadID, nil + return nil } // Locates a given workload by its workload ID and returns the deployment request associated with it @@ -348,6 +357,14 @@ func (w *WorkloadManager) Stop() error { w.log.Error("failed to stop agent process manager", slog.Any("error", err)) return err } + + _ = w.ncint.Drain() + for !w.ncint.IsClosed() { + time.Sleep(time.Millisecond * 25) + } + + w.natsint.Shutdown() + _ = os.Remove(path.Join(os.TempDir(), defaultInternalNatsStoreDir)) } return nil @@ -403,6 +420,9 @@ func (w *WorkloadManager) StopWorkload(id string, undeploy bool) error { if err != nil { w.log.Warn("request to undeploy workload via internal NATS connection failed", slog.String("workload_id", id), slog.String("error", err.Error())) } + + // FIXME-- this should probably just live in workload manager + _ = w.natsint.DestroyCredentials(id) } err = w.procMan.StopProcess(id) @@ -421,18 +441,25 @@ func (w *WorkloadManager) OnProcessStarted(id string) { w.poolMutex.Lock() defer w.poolMutex.Unlock() + clientConn, err := w.natsint.ConnectionWithID(id) + if err != nil { + w.log.Error("Failed to start agent client", slog.Any("err", err)) + return + } + agentClient := agentapi.NewAgentClient( - w.ncInternal, + clientConn, w.log, w.handshakeTimeout, + w.pingTimeout, w.agentHandshakeTimedOut, w.agentHandshakeSucceeded, + w.agentContactLost, w.agentEvent, w.agentLog, - w.agentContactLost, ) - err := agentClient.Start(id) + err = agentClient.Start(id) if err != nil { w.log.Error("Failed to start agent client", slog.Any("err", err)) return @@ -503,7 +530,7 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, w.t.FunctionFailedTriggers.Add(w.ctx, 1) w.t.FunctionFailedTriggers.Add(w.ctx, 1, metric.WithAttributes(attribute.String("namespace", *request.Namespace))) w.t.FunctionFailedTriggers.Add(w.ctx, 1, metric.WithAttributes(attribute.String("workload_name", *request.WorkloadName))) - _ = w.publishFunctionExecFailed(workloadID, *request.WorkloadName, tsub, err) + _ = w.publishFunctionExecFailed(workloadID, *request.WorkloadName, *request.Namespace, tsub, err) } else if resp != nil { parentSpan.SetStatus(codes.Ok, "Trigger succeeded") runtimeNs := resp.Header.Get(agentapi.NexRuntimeNs) @@ -546,8 +573,54 @@ func (w *WorkloadManager) generateTriggerHandler(workloadID string, tsub string, } } +func (w *WorkloadManager) startInternalNATS() error { + var err error + w.natsint, err = internalnats.NewInternalNatsServer(w.log) + if err != nil { + return err + } + + p := w.natsint.Port() + w.config.InternalNodePort = &p + w.ncint = w.natsint.Connection() + + return nil +} + +func (w *WorkloadManager) startHostServicesNATSConnection() error { + if w.config.HostServicesConfiguration != nil { + if w.config.HostServicesConfiguration.NatsUrl == "" { + w.config.HostServicesConfiguration.NatsUrl = w.nc.Servers()[0] + w.ncHostServices = w.nc + } else { + natsOpts := []nats.Option{ + nats.Name("nex-hostservices"), + } + if w.config.HostServicesConfiguration.NatsUserJwt == "" { + natsOpts = append(natsOpts, + nats.UserJWTAndSeed( + w.config.HostServicesConfiguration.NatsUserJwt, + w.config.HostServicesConfiguration.NatsUserSeed, + ), + ) + } + + var err error + w.ncHostServices, err = nats.Connect(w.config.HostServicesConfiguration.NatsUrl, natsOpts...) + if err != nil { + return err + } + } + } else { + w.ncHostServices = w.nc + } + + w.log.Debug("Configured NATS connection for host services", slog.String("url", w.ncHostServices.Servers()[0])) + return nil +} + // Picks a pending agent from the pool that will receive the next deployment -func (w *WorkloadManager) selectRandomAgent() (*agentapi.AgentClient, error) { +func (w *WorkloadManager) SelectRandomAgent() (*agentapi.AgentClient, error) { if len(w.pendingAgents) == 0 { return nil, errors.New("no available agent client in pool") } diff --git a/internal/node/workload_mgr_events.go b/internal/node/workload_mgr_events.go index a815a6c1..b8c7c591 100644 --- a/internal/node/workload_mgr_events.go +++ b/internal/node/workload_mgr_events.go @@ -110,21 +110,15 @@ func (w *WorkloadManager) agentLog(workloadId string, entry agentapi.LogEntry) { _ = w.nc.Publish(subject, bytes) } -func (w *WorkloadManager) publishFunctionExecFailed(workloadId string, workload string, tsub string, origErr error) error { - deployRequest, err := w.procMan.Lookup(workloadId) - if err != nil { - w.log.Warn("Tried to publish function exec failed event for non-existent workload", slog.String("workload_id", workloadId)) - return nil - } - +func (w *WorkloadManager) publishFunctionExecFailed(workloadId string, workloadName string, namespace string, tsub string, origErr error) error { functionExecFailed := struct { Name string `json:"workload_name"` Subject string `json:"trigger_subject"` Namespace string `json:"namespace"` Error string `json:"error"` }{ - Name: workload, - Namespace: *deployRequest.Namespace, + Name: workloadName, + Namespace: namespace, Subject: tsub, Error: origErr.Error(), } @@ -137,7 +131,7 @@ func (w *WorkloadManager) publishFunctionExecFailed(workloadId string, workload cloudevent.SetDataContentType(cloudevents.ApplicationJSON) _ = cloudevent.SetData(functionExecFailed) - err = PublishCloudEvent(w.nc, *deployRequest.Namespace, cloudevent, w.log) + err := PublishCloudEvent(w.nc, namespace, cloudevent, w.log) if err != nil { return err } @@ -149,7 +143,7 @@ func (w *WorkloadManager) publishFunctionExecFailed(workloadId string, workload } logBytes, _ := json.Marshal(emitLog) - subject := fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, *deployRequest.Namespace, w.publicKey, *deployRequest.WorkloadName, workloadId) + subject := fmt.Sprintf("%s.%s.%s.%s.%s", LogSubjectPrefix, namespace, w.publicKey, workloadName, workloadId) err = w.nc.Publish(subject, logBytes) if err != nil { w.log.Error("Failed to publish function exec failed log", slog.Any("err", err)) diff --git a/spec/node_linux_test.go b/spec/node_linux_test.go index 6c7644e3..d3710572 100644 --- a/spec/node_linux_test.go +++ b/spec/node_linux_test.go @@ -320,14 +320,6 @@ var _ = Describe("nex node", func() { Expect(nodeProxy.NodeConfiguration()).ToNot(BeNil()) // FIXME-- assert that it is === to the current nex node config JSON }) - It("should initialize an internal NATS server for private communication between running VMs and the host", func(ctx SpecContext) { - Expect(nodeProxy.InternalNATS()).ToNot(BeNil()) - }) - - It("should initialize a connection to the internal NATS server", func(ctx SpecContext) { - Expect(nodeProxy.InternalNATSConn()).ToNot(BeNil()) - }) - It("should initialize a machine manager to manage firecracker VMs and communicate with running agents", func(ctx SpecContext) { Expect(nodeProxy.WorkloadManager()).ToNot(BeNil()) }) @@ -434,8 +426,12 @@ var _ = Describe("nex node", func() { Expect(managerProxy.NodeConfiguration()).To(Equal(nodeProxy.NodeConfiguration())) // FIXME-- assert that it is === to the current nex node config JSON }) - It("should receive a reference to the internal NATS server connection", func(ctx SpecContext) { - Expect(managerProxy.InternalNATSConn()).To(Equal(nodeProxy.InternalNATSConn())) + It("should initialize an internal NATS server for private communication between running VMs and the host", func(ctx SpecContext) { + Expect(managerProxy.InternalNATS()).ToNot(BeNil()) + }) + + It("should initialize a connection to the internal NATS server", func(ctx SpecContext) { + Expect(managerProxy.InternalNATSConn()).ToNot(BeNil()) }) It("should receive a reference to the telemetry instance", func(ctx SpecContext) { @@ -447,7 +443,7 @@ var _ = Describe("nex node", func() { It("should complete an agent handshake for each VM in the configured pool size", func(ctx SpecContext) { workloads, _ := nodeProxy.WorkloadManager().RunningWorkloads() for _, workload := range workloads { - subsz, _ := nodeProxy.InternalNATS().Subsz(&server.SubszOptions{ + subsz, _ := managerProxy.InternalNATS().Subsz(&server.SubszOptions{ Subscriptions: true, Test: fmt.Sprintf("agentint.%s.handshake", workload.Id), }) diff --git a/spec/node_windows_test.go b/spec/node_windows_test.go index 40c23d7c..4be1aacc 100644 --- a/spec/node_windows_test.go +++ b/spec/node_windows_test.go @@ -267,14 +267,6 @@ var _ = Describe("nex node", func() { Expect(nodeProxy.NodeConfiguration()).ToNot(BeNil()) // FIXME-- assert that it is === to the current nex node config JSON }) - It("should initialize an internal NATS server for private communication between running VMs and the host", func(ctx SpecContext) { - Expect(nodeProxy.InternalNATS()).ToNot(BeNil()) - }) - - It("should initialize a connection to the internal NATS server", func(ctx SpecContext) { - Expect(nodeProxy.InternalNATSConn()).ToNot(BeNil()) - }) - It("should initialize a machine manager to manage firecracker VMs and communicate with running agents", func(ctx SpecContext) { Expect(nodeProxy.WorkloadManager()).ToNot(BeNil()) }) @@ -361,8 +353,12 @@ var _ = Describe("nex node", func() { Expect(managerProxy.NodeConfiguration()).To(Equal(nodeProxy.NodeConfiguration())) // FIXME-- assert that it is === to the current nex node config JSON }) - It("should receive a reference to the internal NATS server connection", func(ctx SpecContext) { - Expect(managerProxy.InternalNATSConn()).To(Equal(nodeProxy.InternalNATSConn())) + It("should initialize an internal NATS server for private communication between running VMs and the host", func(ctx SpecContext) { + Expect(managerProxy.InternalNATS()).ToNot(BeNil()) + }) + + It("should initialize a connection to the internal NATS server", func(ctx SpecContext) { + Expect(managerProxy.InternalNATSConn()).ToNot(BeNil()) }) It("should receive a reference to the telemetry instance", func(ctx SpecContext) { @@ -374,7 +370,7 @@ var _ = Describe("nex node", func() { It("should complete an agent handshake for each VM in the configured pool size", func(ctx SpecContext) { workloads, _ := nodeProxy.WorkloadManager().RunningWorkloads() for _, workload := range workloads { - subsz, _ := nodeProxy.InternalNATS().Subsz(&server.SubszOptions{ + subsz, _ := managerProxy.InternalNATS().Subsz(&server.SubszOptions{ Subscriptions: true, Test: fmt.Sprintf("agentint.%s.handshake", workload.Id), })