Skip to content

Expose UpdateMarkerFile func with transactional lock #8449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
**/html_docs
data
internal/pkg/agent/operation/tests/downloads
pkg/core/process/testsignal/testsignal
pkg/core/process/testsignal/testsignal.exe

# Files
.DS_Store
Expand Down
20 changes: 20 additions & 0 deletions internal/pkg/agent/application/filelock/noop_locker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package filelock

import "context"

type noopLocker struct{}

func NewNoopLocker() *noopLocker {
return &noopLocker{}
}

func (*noopLocker) Lock() error { return nil }
func (*noopLocker) LockContext(_ context.Context) error { return nil }

func (*noopLocker) Unlock() error { return nil }

func (*noopLocker) Locked() bool { return false }
10 changes: 10 additions & 0 deletions internal/pkg/agent/application/upgrade/marker_access_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package upgrade

import (
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -13,6 +14,15 @@ import (
"github.com/elastic/elastic-agent-libs/file"
)

func readMarkerFileCommon(markerFile string) (bytes []byte, err error) {
markerFileBytes, err := os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil, nil
}
return markerFileBytes, nil
}

func writeMarkerFileCommon(markerFile string, markerBytes []byte, shouldFsync bool) error {
f, err := os.CreateTemp(
filepath.Dir(markerFile), fmt.Sprintf("%d-*.tmp", os.Getpid()))
Expand Down
23 changes: 4 additions & 19 deletions internal/pkg/agent/application/upgrade/marker_access_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,11 @@ package upgrade
import (
"errors"
"fmt"
"os"
)

// On non-Windows platforms, readMarkerFile simply reads the marker file.
// See marker_access_windows.go for behavior on Windows platforms.
func readMarkerFile(markerFile string) (bytes []byte, err error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return nil, fmt.Errorf("creating update marker locker for reading: %w", err)
}

func readMarkerFile(markerFile string, fileLock Locker) (bytes []byte, err error) {
err = fileLock.Lock()
if err != nil {
return nil, fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
Expand All @@ -31,22 +25,13 @@ func readMarkerFile(markerFile string) (bytes []byte, err error) {
err = errors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)
markerFileBytes, err := os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil, nil
}
return markerFileBytes, nil

return readMarkerFileCommon(markerFile)
}

// On non-Windows platforms, writeMarkerFile simply writes the marker file.
// See marker_access_windows.go for behavior on Windows platforms.
func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) (err error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool, fileLock Locker) (err error) {
err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for writing: %w", markerFile, err)
Expand Down
61 changes: 58 additions & 3 deletions internal/pkg/agent/application/upgrade/marker_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,22 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
)

func TestWriteMarkerFile(t *testing.T) {
tmpDir := t.TempDir()
markerFile := filepath.Join(tmpDir, markerFilename)

markerBytes := []byte("foo bar")
err := writeMarkerFile(markerFile, markerBytes, true)
err := writeMarkerFile(markerFile, markerBytes, true, noopLocker)
require.NoError(t, err)

data, err := os.ReadFile(markerFile)
Expand Down Expand Up @@ -62,15 +67,15 @@ func TestWriteMarkerFileWithTruncation(t *testing.T) {
}()

// Write a long marker file
err := writeMarkerFile(testMarkerFile, randomBytes(40), true)
err := writeMarkerFile(testMarkerFile, randomBytes(40), true, noopLocker)
require.NoError(t, err, "could not write long marker file")

// Get length of file
fileInfo, err := os.Stat(testMarkerFile)
require.NoError(t, err)
originalSize := fileInfo.Size()

err = writeMarkerFile(testMarkerFile, randomBytes(25), true)
err = writeMarkerFile(testMarkerFile, randomBytes(25), true, noopLocker)
require.NoError(t, err)

// Get length of file
Expand All @@ -95,6 +100,56 @@ func TestWriteMarkerFileWithTruncation(t *testing.T) {
close(errCh)
}

func TestUpdateMarkerFile(t *testing.T) {
marker := &UpdateMarker{
Version: "1.2.3",
VersionedHome: "/home",
Hash: "sha...hash",
UpdatedOn: time.Now(),
PrevVersion: "0.1.2",
PrevHash: "sha..hash",
PrevVersionedHome: "/home/elastic",
Acked: false,
Action: &fleetapi.ActionUpgrade{ActionID: "123", ActionType: "UPGRADAE"},
Details: details.NewDetails("1.2.3", details.StateRequested, "action-id"),
DesiredOutcome: OUTCOME_UPGRADE,
}
tmp := t.TempDir()
markerFile := filepath.Join(tmp, "marker")
require.NoError(t, saveMarkerToPath(marker, markerFile, true, noopLocker))

// update marker
var wg sync.WaitGroup
wg.Add(2)

// first concurrent update
go func() {
err := UpdateMarkerFile(markerFile, func(m *UpdateMarker) {
m.Version = "1.2.3-up"
})
assert.NoError(t, err)
wg.Done()
}()

// second update
go func() {
err := UpdateMarkerFile(markerFile, func(m *UpdateMarker) {
m.Hash = "sha...hash2"
})
assert.NoError(t, err)
wg.Done()
}()

wg.Wait()

// Assert
loadedMarker, err := loadMarker(markerFile, noopLocker)
assert.NoError(t, err)
assert.Equal(t, "1.2.3-up", loadedMarker.Version)
assert.Equal(t, "sha...hash2", loadedMarker.Hash)
assert.Equal(t, marker.VersionedHome, loadedMarker.VersionedHome)
}

func watchFileNotEmpty(t *testing.T, ctx context.Context, filePath string, errCh chan error, wg *sync.WaitGroup) {
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
Expand Down
26 changes: 5 additions & 21 deletions internal/pkg/agent/application/upgrade/marker_access_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/cenkalti/backoff/v4"
Expand All @@ -24,15 +23,10 @@ const minMarkerAccessRetries = 5
// mechanism is necessary since the marker file could be accessed by multiple
// processes (the Upgrade Watcher and the main Agent process) at the same time,
// which could fail on Windows.
func readMarkerFile(markerFile string) ([]byte, error) {
func readMarkerFile(markerFile string, fileLock Locker) ([]byte, error) {
var markerFileBytes []byte
readFn := func() error {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for reading: %w", err)
}

err = fileLock.Lock()
err := fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
Expand All @@ -44,12 +38,7 @@ func readMarkerFile(markerFile string) ([]byte, error) {
}
}(fileLock)

markerFileBytes, err = os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil
}

markerFileBytes, err = readMarkerFileCommon(markerFile)
return err
}

Expand All @@ -65,14 +54,9 @@ func readMarkerFile(markerFile string) ([]byte, error) {
// mechanism is necessary since the marker file could be accessed by multiple
// processes (the Upgrade Watcher and the main Agent process) at the same time,
// which could fail on Windows.
func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) error {
func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool, fileLock Locker) error {
writeFn := func() error {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

err = fileLock.Lock()
err := fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for writing: %w", markerFile, err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/upgrade/marker_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (mfw *MarkerFileWatcher) Run(ctx context.Context) error {
}

func (mfw *MarkerFileWatcher) processMarker(currentVersion string, commit string) {
marker, err := loadMarker(mfw.markerFilePath)
marker, err := LoadMarkerFromFile(mfw.markerFilePath)
if err != nil {
mfw.logger.Error(err)
return
Expand Down
64 changes: 58 additions & 6 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"encoding/json"
goerrors "errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -277,11 +278,56 @@ func CleanMarker(log *logger.Logger, dataDirPath string) error {
// LoadMarker loads the update marker. If the file does not exist it returns nil
// and no error.
func LoadMarker(dataDirPath string) (*UpdateMarker, error) {
return loadMarker(markerFilePath(dataDirPath))
markerFile := markerFilePath(dataDirPath)
return LoadMarkerFromFile(markerFile)
}

// LoadMarkerFromFile loads the update marker from file. If the file does not exist it returns nil
// and no error.
func LoadMarkerFromFile(markerFile string) (*UpdateMarker, error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return nil, fmt.Errorf("creating update marker locker for writing: %w", err)
}

return loadMarker(markerFile, fileLock)
}

func UpdateMarkerFile(markerFile string, opts ...func(*UpdateMarker)) error {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for reading: %w", err)
}
err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = goerrors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

noopLock := filelock.NewNoopLocker()
marker, err := loadMarker(markerFile, noopLock)
if err != nil {
return fmt.Errorf("failed to read marker file during updating: %w", err)
}

for _, opt := range opts {
opt(marker)
}

if err := saveMarkerToPath(marker, markerFile, true, noopLock); err != nil {
return fmt.Errorf("failed updating marker file: %w", err)
}

return nil
}

func loadMarker(markerFile string) (*UpdateMarker, error) {
markerBytes, err := readMarkerFile(markerFile)
func loadMarker(markerFile string, fileLock Locker) (*UpdateMarker, error) {
markerBytes, err := readMarkerFile(markerFile, fileLock)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -314,10 +360,16 @@ func loadMarker(markerFile string) (*UpdateMarker, error) {
// For critical upgrade transitions, pass shouldFsync as true so the marker
// file is immediately flushed to persistent storage.
func SaveMarker(marker *UpdateMarker, shouldFsync bool) error {
return saveMarkerToPath(marker, markerFilePath(paths.Data()), shouldFsync)
markerFile := markerFilePath(paths.Data())
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

return saveMarkerToPath(marker, markerFile, shouldFsync, fileLock)
}

func saveMarkerToPath(marker *UpdateMarker, markerFile string, shouldFsync bool) error {
func saveMarkerToPath(marker *UpdateMarker, markerFile string, shouldFsync bool, fileLock Locker) error {
makerSerializer := &updateMarkerSerializer{
Version: marker.Version,
Hash: marker.Hash,
Expand All @@ -336,7 +388,7 @@ func saveMarkerToPath(marker *UpdateMarker, markerFile string, shouldFsync bool)
return err
}

return writeMarkerFile(markerFile, markerBytes, shouldFsync)
return writeMarkerFile(markerFile, markerBytes, shouldFsync, fileLock)
}

func markerFilePath(dataDirPath string) string {
Expand Down
Loading