Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ BINARY_NAME := nginx-agent
PROJECT_DIR = cmd/agent
PROJECT_FILE = main.go
COLLECTOR_PATH ?= /etc/nginx-agent/opentelemetry-collector-agent.yaml
MANIFEST_DIR ?= /var/lib/nginx-agent
LIB_DIR ?= /var/lib/nginx-agent
DIRS = $(BUILD_DIR) $(TEST_BUILD_DIR) $(BUILD_DIR)/$(DOCS_DIR) $(BUILD_DIR)/$(DOCS_DIR)/$(PROTO_DIR)
$(shell mkdir -p $(DIRS))

Expand Down Expand Up @@ -195,7 +195,7 @@ run: build ## Run code

dev: ## Run agent executable
@echo "🚀 Running App"
NGINX_AGENT_COLLECTOR_CONFIG_PATH=$(COLLECTOR_PATH) NGINX_AGENT_MANIFEST_DIR=$(MANIFEST_DIR) $(GORUN) -ldflags=$(DEBUG_LDFLAGS) $(PROJECT_DIR)/$(PROJECT_FILE)
NGINX_AGENT_COLLECTOR_CONFIG_PATH=$(COLLECTOR_PATH) NGINX_AGENT_LIB_DIR=$(LIB_DIR) $(GORUN) -ldflags=$(DEBUG_LDFLAGS) $(PROJECT_DIR)/$(PROJECT_FILE)

race-condition-dev: ## Run agent executable with race condition detection
@echo "🏎️ Running app with race condition detection enabled"
Expand Down
8 changes: 4 additions & 4 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func ResolveConfig() (*Config, error) {
Watchers: resolveWatchers(),
Features: viperInstance.GetStringSlice(FeaturesKey),
Labels: resolveLabels(),
ManifestDir: viperInstance.GetString(ManifestDirPathKey),
LibDir: viperInstance.GetString(LibDirPathKey),
}

defaultCollector(collector, config)
Expand Down Expand Up @@ -380,9 +380,9 @@ func registerFlags() {
"If the default path doesn't exist, log messages are output to stdout/stderr.",
)
fs.String(
ManifestDirPathKey,
DefManifestDir,
"Specifies the path to the directory containing the manifest files",
LibDirPathKey,
DefLibDir,
"Specifies the path to the nginx-agent lib directory",
)

fs.StringSlice(AllowedDirectoriesKey,
Expand Down
2 changes: 1 addition & 1 deletion internal/config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ const (
DefCollectorExtensionsHealthTLServerNameKey = ""

// File defaults
DefManifestDir = "/var/lib/nginx-agent"
DefLibDir = "/var/lib/nginx-agent"
)

func DefaultFeatures() []string {
Expand Down
2 changes: 1 addition & 1 deletion internal/config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
InstanceWatcherMonitoringFrequencyKey = "watchers_instance_watcher_monitoring_frequency"
InstanceHealthWatcherMonitoringFrequencyKey = "watchers_instance_health_watcher_monitoring_frequency"
FileWatcherKey = "watchers_file_watcher"
ManifestDirPathKey = "manifest_dir"
LibDirPathKey = "lib_dir"
)

var (
Expand Down
2 changes: 1 addition & 1 deletion internal/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type (
Version string `yaml:"-"`
Path string `yaml:"-"`
UUID string `yaml:"-"`
ManifestDir string `yaml:"-"`
LibDir string `yaml:"-"`
AllowedDirectories []string `yaml:"allowed_directories" mapstructure:"allowed_directories"`
Features []string `yaml:"features" mapstructure:"features"`
}
Expand Down
142 changes: 117 additions & 25 deletions internal/file/file_manager_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"fmt"
"log/slog"
"os"
"path"
"path/filepath"
"sync"

"google.golang.org/grpc"
Expand All @@ -38,11 +40,11 @@ const (

type (
fileOperator interface {
Write(ctx context.Context, fileContent []byte, file *mpi.FileMeta) error
CreateFileDirectories(ctx context.Context, fileMeta *mpi.FileMeta, filePermission os.FileMode) error
Write(ctx context.Context, fileContent []byte, fileName, filePermissions string) error
CreateFileDirectories(ctx context.Context, fileName string) error
WriteChunkedFile(
ctx context.Context,
file *mpi.File,
fileName, filePermissions string,
header *mpi.FileDataChunkHeader,
stream grpc.ServerStreamingClient[mpi.FileDataChunk],
) error
Expand All @@ -54,20 +56,22 @@ type (
) (mpi.FileDataChunk_Content, error)
WriteManifestFile(ctx context.Context, updatedFiles map[string]*model.ManifestFile,
manifestDir, manifestPath string) (writeError error)
MoveFile(ctx context.Context, sourcePath, destPath string) error
}

fileServiceOperatorInterface interface {
File(ctx context.Context, file *mpi.File, fileActions map[string]*model.FileCache) error
File(ctx context.Context, file *mpi.File, tempFilePath, expectedHash string) error
UpdateOverview(ctx context.Context, instanceID string, filesToUpdate []*mpi.File, configPath string,
iteration int) error
ChunkedFile(ctx context.Context, file *mpi.File) error
ChunkedFile(ctx context.Context, file *mpi.File, tempFilePath, expectedHash string) error
IsConnected() bool
UpdateFile(
ctx context.Context,
instanceID string,
fileToUpdate *mpi.File,
) error
SetIsConnected(isConnected bool)
MoveFilesFromTempDirectory(ctx context.Context, fileAction *model.FileCache, tempDir string) error
UpdateClient(ctx context.Context, fileServiceClient mpi.FileServiceClient)
}

Expand Down Expand Up @@ -118,8 +122,8 @@ func NewFileManagerService(fileServiceClient mpi.FileServiceClient, agentConfig
rollbackFileContents: make(map[string][]byte),
currentFilesOnDisk: make(map[string]*mpi.File),
previousManifestFiles: make(map[string]*model.ManifestFile),
manifestFilePath: agentConfig.ManifestDir + "/manifest.json",
rollbackManifest: true,
manifestFilePath: agentConfig.LibDir + "/manifest.json",
manifestLock: manifestLock,
}
}
Expand Down Expand Up @@ -169,7 +173,12 @@ func (fms *FileManagerService) ConfigApply(ctx context.Context,
fms.rollbackFileContents = fileContent
fms.fileActions = diffFiles

fileErr := fms.executeFileActions(ctx)
tempDir, tempDirError := fms.createTempConfigDirectory(ctx)
if tempDirError != nil {
return model.Error, tempDirError
}

fileErr := fms.executeFileActions(ctx, tempDir)
if fileErr != nil {
fms.rollbackManifest = false
return model.RollbackRequired, fileErr
Expand Down Expand Up @@ -208,15 +217,16 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)

continue
case model.Delete, model.Update:
content := fms.rollbackFileContents[fileAction.File.GetFileMeta().GetName()]
err := fms.fileOperator.Write(ctx, content, fileAction.File.GetFileMeta())
fileMeta := fileAction.File.GetFileMeta()
content := fms.rollbackFileContents[fileMeta.GetName()]
err := fms.fileOperator.Write(ctx, content, fileMeta.GetName(), fileMeta.GetPermissions())
if err != nil {
return err
}

// currentFilesOnDisk needs to be updated after rollback action is performed
fileAction.File.GetFileMeta().Hash = files.GenerateHash(content)
fms.currentFilesOnDisk[fileAction.File.GetFileMeta().GetName()] = fileAction.File
fileMeta.Hash = files.GenerateHash(content)
fms.currentFilesOnDisk[fileMeta.GetName()] = fileAction.File
case model.Unchanged:
fallthrough
default:
Expand All @@ -226,8 +236,9 @@ func (fms *FileManagerService) Rollback(ctx context.Context, instanceID string)

if fms.rollbackManifest {
slog.DebugContext(ctx, "Rolling back manifest file", "manifest_previous", fms.previousManifestFiles)
manifestFileErr := fms.fileOperator.WriteManifestFile(ctx, fms.previousManifestFiles,
fms.agentConfig.ManifestDir, fms.manifestFilePath)
manifestFileErr := fms.fileOperator.WriteManifestFile(
ctx, fms.previousManifestFiles, fms.agentConfig.LibDir, fms.manifestFilePath,
)
if manifestFileErr != nil {
return manifestFileErr
}
Expand Down Expand Up @@ -443,7 +454,7 @@ func (fms *FileManagerService) UpdateManifestFile(ctx context.Context,
updatedFiles = manifestFiles
}

return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.ManifestDir, fms.manifestFilePath)
return fms.fileOperator.WriteManifestFile(ctx, updatedFiles, fms.agentConfig.LibDir, fms.manifestFilePath)
}

func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, map[string]*mpi.File, error) {
Expand Down Expand Up @@ -474,37 +485,103 @@ func (fms *FileManagerService) manifestFile() (map[string]*model.ManifestFile, m
return manifestFiles, fileMap, nil
}

func (fms *FileManagerService) executeFileActions(ctx context.Context) error {
func (fms *FileManagerService) executeFileActions(ctx context.Context, tempDir string) (actionError error) {
// Download files to temporary location
downloadError := fms.downloadUpdatedFilesToTempLocation(ctx, tempDir)
if downloadError != nil {
return downloadError
}

// Remove temp files if there is a failure moving or deleting files
actionError = fms.moveOrDeleteFiles(ctx, tempDir, actionError)
if actionError != nil {
fms.deleteTempFiles(ctx, tempDir)
}

return actionError
}

func (fms *FileManagerService) downloadUpdatedFilesToTempLocation(
ctx context.Context, tempDir string,
) (updateError error) {
for _, fileAction := range fms.fileActions {
if fileAction.Action == model.Add || fileAction.Action == model.Update {
tempFilePath := filepath.Join(tempDir, fileAction.File.GetFileMeta().GetName())

slog.DebugContext(
ctx,
"Downloading file to temp location",
"file", tempFilePath,
)

updateErr := fms.fileUpdate(ctx, fileAction.File, tempFilePath)
if updateErr != nil {
updateError = updateErr
break
}
}
}

// Remove temp files if there is an error downloading any files
if updateError != nil {
fms.deleteTempFiles(ctx, tempDir)
}

return updateError
}

func (fms *FileManagerService) moveOrDeleteFiles(ctx context.Context, tempDir string, actionError error) error {
actionsLoop:
for _, fileAction := range fms.fileActions {
switch fileAction.Action {
case model.Delete:
slog.DebugContext(ctx, "File action, deleting file", "file", fileAction.File.GetFileMeta().GetName())
slog.DebugContext(ctx, "Deleting file", "file", fileAction.File.GetFileMeta().GetName())
if err := os.Remove(fileAction.File.GetFileMeta().GetName()); err != nil && !os.IsNotExist(err) {
return fmt.Errorf("error deleting file: %s error: %w",
actionError = fmt.Errorf("error deleting file: %s error: %w",
fileAction.File.GetFileMeta().GetName(), err)

break actionsLoop
}

continue
case model.Add, model.Update:
slog.DebugContext(ctx, "File action, add or update file", "file", fileAction.File.GetFileMeta().GetName())
updateErr := fms.fileUpdate(ctx, fileAction.File)
if updateErr != nil {
return updateErr
err := fms.fileServiceOperator.MoveFilesFromTempDirectory(ctx, fileAction, tempDir)
if err != nil {
actionError = err

break actionsLoop
}
case model.Unchanged:
slog.DebugContext(ctx, "File unchanged")
}
}

return nil
return actionError
}

func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File) error {
func (fms *FileManagerService) deleteTempFiles(ctx context.Context, tempDir string) {
for _, fileAction := range fms.fileActions {
if fileAction.Action == model.Add || fileAction.Action == model.Update {
tempFile := path.Join(tempDir, fileAction.File.GetFileMeta().GetName())
if err := os.Remove(tempFile); err != nil && !os.IsNotExist(err) {
slog.ErrorContext(
ctx, "Error deleting temp file",
"file", fileAction.File.GetFileMeta().GetName(),
"error", err,
)
}
}
}
}

func (fms *FileManagerService) fileUpdate(ctx context.Context, file *mpi.File, tempFilePath string) error {
expectedHash := fms.fileActions[file.GetFileMeta().GetName()].File.GetFileMeta().GetHash()

if file.GetFileMeta().GetSize() <= int64(fms.agentConfig.Client.Grpc.MaxFileSize) {
return fms.fileServiceOperator.File(ctx, file, fms.fileActions)
return fms.fileServiceOperator.File(ctx, file, tempFilePath, expectedHash)
}

return fms.fileServiceOperator.ChunkedFile(ctx, file)
return fms.fileServiceOperator.ChunkedFile(ctx, file, tempFilePath, expectedHash)
}

func (fms *FileManagerService) checkAllowedDirectory(checkFiles []*mpi.File) error {
Expand Down Expand Up @@ -566,6 +643,21 @@ func (fms *FileManagerService) convertToFile(manifestFile *model.ManifestFile) *
}
}

func (fms *FileManagerService) createTempConfigDirectory(ctx context.Context) (string, error) {
tempDir, tempDirError := os.MkdirTemp(fms.agentConfig.LibDir, "config")
if tempDirError != nil {
return "", fmt.Errorf("failed creating temp config directory: %w", tempDirError)
}
defer func(path string) {
err := os.RemoveAll(path)
if err != nil {
slog.ErrorContext(ctx, "error removing temp config directory", "path", path, "err", err)
}
}(tempDir)

return tempDir, nil
}

// ConvertToMapOfFiles converts a list of files to a map of file caches (file and action) with the file name as the key
func ConvertToMapOfFileCache(convertFiles []*mpi.File) map[string]*model.FileCache {
filesMap := make(map[string]*model.FileCache)
Expand Down
Loading
Loading