Skip to content

Commit

Permalink
[cmd/opampsupervisor]: Persist the instance ID between restarts (#32618)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
* Adds the ability to persist the instance ID on disk and load it on
startup.

**Link to tracking Issue:** Closes #21073

**Testing:**
* Add unit tests for persistence functionality
* Add e2e tests for persistence (testing that initial id is persisted,
and that server specified ID is persisted)

**Documentation:** N/A
  • Loading branch information
BinaryFissionGames committed May 16, 2024
1 parent 4b9a237 commit 125ff49
Show file tree
Hide file tree
Showing 12 changed files with 432 additions and 62 deletions.
13 changes: 13 additions & 0 deletions .chloggen/feat_opamp-supervisor-persist-instance-id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: opampsupervisor

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Allows the supervisor to persist its instance ID between restarts.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [21073]
11 changes: 11 additions & 0 deletions cmd/opampsupervisor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ The supervisor is currently undergoing heavy development and is not ready for an

4. The supervisor should connect to the OpAMP server and start a Collector instance.

## Persistent data storage
The supervisor persists some data to disk in order to mantain state between restarts. The directory where this data is stored may be specified via the supervisor configuration:
```yaml
storage:
directory: "/path/to/storage/dir"
```

By default, the supervisor will use `/var/lib/otelcol/supervisor` on posix systems, and `%ProgramData%/Otelcol/Supervisor` on Windows.

This directory will be created on supervisor startup if it does not exist.

## Status

The OpenTelemetry OpAMP Supervisor is intended to be the reference
Expand Down
158 changes: 155 additions & 3 deletions cmd/opampsupervisor/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,10 @@ func getSupervisorConfig(t *testing.T, configType string, extraConfigData map[st
extension = ".exe"
}
configData := map[string]string{
"goos": runtime.GOOS,
"goarch": runtime.GOARCH,
"extension": extension,
"goos": runtime.GOOS,
"goarch": runtime.GOARCH,
"extension": extension,
"storage_dir": t.TempDir(),
}

for key, val := range extraConfigData {
Expand Down Expand Up @@ -781,3 +782,154 @@ func TestSupervisorRestartsWithLastReceivedConfig(t *testing.T) {
}, 10*time.Second, 500*time.Millisecond, "Collector was not started with the last received remote config")

}

func TestSupervisorPersistsInstanceID(t *testing.T) {
// Tests shutting down and starting up a new supervisor will
// persist and re-use the same instance ID.
storageDir := t.TempDir()

agentIDChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var firstAgentID string
select {
case firstAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get first agent ID")
}

t.Logf("Got agent ID %s, shutting down supervisor", firstAgentID)

s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, false)

t.Logf("Supervisor disconnected")

// Drain agent ID channel so we get a fresh ID from the new supervisor
select {
case <-agentIDChan:
default:
}

s = newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var secondAgentID string
select {
case secondAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get second agent ID")
}

require.Equal(t, firstAgentID, secondAgentID)
}

func TestSupervisorPersistsNewInstanceID(t *testing.T) {
// Tests that an agent ID that is given from the server to the agent in an AgentIdentification message
// is properly persisted.
storageDir := t.TempDir()

newID := "01HW3GS9NWD840C5C2BZS3KYPW"

agentIDChan := make(chan string, 1)
server := newOpAMPServer(
t,
defaultConnectingHandler,
server.ConnectionCallbacksStruct{
OnMessageFunc: func(_ context.Context, _ types.Connection, message *protobufs.AgentToServer) *protobufs.ServerToAgent {

select {
case agentIDChan <- message.InstanceUid:
default:
}

if message.InstanceUid != newID {
return &protobufs.ServerToAgent{
InstanceUid: message.InstanceUid,
AgentIdentification: &protobufs.AgentIdentification{
NewInstanceUid: newID,
},
}
}

return &protobufs.ServerToAgent{}
},
})

s := newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

for id := range agentIDChan {
if id == newID {
t.Logf("Agent ID was changed to new ID")
break
}
}

s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, false)

t.Logf("Supervisor disconnected")

// Drain agent ID channel so we get a fresh ID from the new supervisor
select {
case <-agentIDChan:
default:
}

s = newSupervisor(t, "basic", map[string]string{
"url": server.addr,
"storage_dir": storageDir,
})
defer s.Shutdown()

waitForSupervisorConnection(server.supervisorConnected, true)

t.Logf("Supervisor connected")

var newRecievedAgentID string
select {
case newRecievedAgentID = <-agentIDChan:
case <-time.After(1 * time.Second):
t.Fatalf("failed to get second agent ID")
}

require.Equal(t, newID, newRecievedAgentID)
}
2 changes: 1 addition & 1 deletion cmd/opampsupervisor/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
google.golang.org/protobuf v1.34.1
gopkg.in/yaml.v3 v3.0.1
)

require (
Expand All @@ -32,5 +33,4 @@ require (
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/sys v0.18.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
28 changes: 27 additions & 1 deletion cmd/opampsupervisor/supervisor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ package config

import (
"net/http"
"os"
"path/filepath"
"runtime"

"go.opentelemetry.io/collector/config/configtls"
)
Expand All @@ -14,14 +17,37 @@ type Supervisor struct {
Server *OpAMPServer
Agent *Agent
Capabilities *Capabilities `mapstructure:"capabilities"`
Storage *Storage `mapstructure:"storage"`
Storage Storage `mapstructure:"storage"`
}

type Storage struct {
// Directory is the directory where the Supervisor will store its data.
Directory string `mapstructure:"directory"`
}

// DirectoryOrDefault returns the configured storage directory if it was configured,
// otherwise it returns the system default.
func (s Storage) DirectoryOrDefault() string {
if s.Directory == "" {
switch runtime.GOOS {
case "windows":
// Windows default is "%ProgramData%\Otelcol\Supervisor"
// If the ProgramData environment variable is not set,
// it falls back to C:\ProgramData
programDataDir := os.Getenv("ProgramData")
if programDataDir == "" {
programDataDir = `C:\ProgramData`
}
return filepath.Join(programDataDir, "Otelcol", "Supervisor")
default:
// Default for non-windows systems
return "/var/lib/otelcol/supervisor"
}
}

return s.Directory
}

// Capabilities is the set of capabilities that the Supervisor supports.
type Capabilities struct {
AcceptsRemoteConfig *bool `mapstructure:"accepts_remote_config"`
Expand Down
92 changes: 92 additions & 0 deletions cmd/opampsupervisor/supervisor/persistence.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package supervisor

import (
"crypto/rand"
"errors"
"os"
"time"

"github.com/oklog/ulid/v2"
"gopkg.in/yaml.v3"
)

// persistentState represents persistent state for the supervisor
type persistentState struct {
InstanceID ulid.ULID `yaml:"instance_id"`

// Path to the config file that the state should be saved to.
// This is not marshaled.
configPath string `yaml:"-"`
}

func (p *persistentState) SetInstanceID(id ulid.ULID) error {
p.InstanceID = id
return p.writeState()
}

func (p *persistentState) writeState() error {
by, err := yaml.Marshal(p)
if err != nil {
return err
}

return os.WriteFile(p.configPath, by, 0600)
}

// loadOrCreatePersistentState attempts to load the persistent state from disk. If it doesn't
// exist, a new persistent state file is created.
func loadOrCreatePersistentState(file string) (*persistentState, error) {
state, err := loadPersistentState(file)
switch {
case errors.Is(err, os.ErrNotExist):
return createNewPersistentState(file)
case err != nil:
return nil, err
default:
return state, nil
}
}

func loadPersistentState(file string) (*persistentState, error) {
var state *persistentState

by, err := os.ReadFile(file)
if err != nil {
return nil, err
}

if err := yaml.Unmarshal(by, &state); err != nil {
return nil, err
}

state.configPath = file

return state, nil
}

func createNewPersistentState(file string) (*persistentState, error) {
id, err := generateNewULID()
if err != nil {
return nil, err
}

p := &persistentState{
InstanceID: id,
configPath: file,
}

return p, p.writeState()
}

func generateNewULID() (ulid.ULID, error) {
entropy := ulid.Monotonic(rand.Reader, 0)
id, err := ulid.New(ulid.Timestamp(time.Now()), entropy)
if err != nil {
return ulid.ULID{}, err
}

return id, nil
}
Loading

0 comments on commit 125ff49

Please sign in to comment.