Skip to content

Expose UpdateMarkerFile func with transactional lock #8449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
**/html_docs
data
internal/pkg/agent/operation/tests/downloads
pkg/core/process/testsignal/testsignal
pkg/core/process/testsignal/testsignal.exe

# Files
.DS_Store
Expand Down
12 changes: 11 additions & 1 deletion internal/pkg/agent/application/upgrade/marker_access_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package upgrade

import (
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -13,7 +14,16 @@ import (
"github.com/elastic/elastic-agent-libs/file"
)

func writeMarkerFileCommon(markerFile string, markerBytes []byte, shouldFsync bool) error {
func readMarkerFileCommon(markerFile string) (bytes []byte, err error) {
markerFileBytes, err := os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil, nil
}
return markerFileBytes, nil
}

func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) error {
f, err := os.CreateTemp(
filepath.Dir(markerFile), fmt.Sprintf("%d-*.tmp", os.Getpid()))
if err != nil {
Expand Down
45 changes: 1 addition & 44 deletions internal/pkg/agent/application/upgrade/marker_access_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ package upgrade

import (
"errors"
"fmt"
"os"
)

Expand All @@ -19,48 +18,6 @@ func readMarkerFile(markerFile string) (bytes []byte, err error) {
// marker doesn't exist, nothing to do
return nil, nil
}
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return nil, fmt.Errorf("creating update marker locker for reading: %w", err)
}

err = fileLock.Lock()
if err != nil {
return nil, fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}

defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = errors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)
markerFileBytes, err := os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil, nil
}
return markerFileBytes, nil
}

// On non-Windows platforms, writeMarkerFile simply writes the marker file.
// See marker_access_windows.go for behavior on Windows platforms.
func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) (err error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for writing: %w", markerFile, err)
}

defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = errors.Join(err, fmt.Errorf("unlocking marker file after writing: %w", errUnlock))
}
}(fileLock)
return writeMarkerFileCommon(markerFile, markerBytes, shouldFsync)
return readMarkerFileCommon(markerFile)
}
54 changes: 54 additions & 0 deletions internal/pkg/agent/application/upgrade/marker_access_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,14 @@ import (
"path/filepath"
"sync"
"testing"
"time"

"github.com/fsnotify/fsnotify"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/elastic-agent/internal/pkg/agent/application/upgrade/details"
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
)

func TestReadNotExistingMarkerFile(t *testing.T) {
Expand Down Expand Up @@ -144,6 +148,56 @@ func TestWriteMarkerFileWithTruncation(t *testing.T) {
close(errCh)
}

func TestUpdateMarkerFile(t *testing.T) {
marker := &UpdateMarker{
Version: "1.2.3",
VersionedHome: "/home",
Hash: "sha...hash",
UpdatedOn: time.Now(),
PrevVersion: "0.1.2",
PrevHash: "sha..hash",
PrevVersionedHome: "/home/elastic",
Acked: false,
Action: &fleetapi.ActionUpgrade{ActionID: "123", ActionType: "UPGRADAE"},
Details: details.NewDetails("1.2.3", details.StateRequested, "action-id"),
DesiredOutcome: OUTCOME_UPGRADE,
}
tmp := t.TempDir()
markerFile := filepath.Join(tmp, "marker")
require.NoError(t, saveMarkerToPath(marker, markerFile, true))

// update marker
var wg sync.WaitGroup
wg.Add(2)

// first concurrent update
go func() {
err := UpdateMarkerFile(markerFile, func(m *UpdateMarker) {
m.Version = "1.2.3-up"
})
assert.NoError(t, err)
wg.Done()
}()

// second update
go func() {
err := UpdateMarkerFile(markerFile, func(m *UpdateMarker) {
m.Hash = "sha...hash2"
})
assert.NoError(t, err)
wg.Done()
}()

wg.Wait()

// Assert
loadedMarker, err := loadMarker(markerFile)
assert.NoError(t, err)
assert.Equal(t, "1.2.3-up", loadedMarker.Version)
assert.Equal(t, "sha...hash2", loadedMarker.Hash)
assert.Equal(t, marker.VersionedHome, loadedMarker.VersionedHome)
}

func watchFileNotEmpty(t *testing.T, ctx context.Context, filePath string, errCh chan error, wg *sync.WaitGroup) {
watcher, err := fsnotify.NewWatcher()
require.NoError(t, err)
Expand Down
59 changes: 3 additions & 56 deletions internal/pkg/agent/application/upgrade/marker_access_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,31 +29,11 @@ func readMarkerFile(markerFile string) ([]byte, error) {
// marker doesn't exist, nothing to do
return nil, nil
}

var markerFileBytes []byte
var err error
readFn := func() error {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for reading: %w", err)
}

err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}

defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = errors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

markerFileBytes, err = os.ReadFile(markerFile)
if errors.Is(err, os.ErrNotExist) {
// marker doesn't exist, nothing to do
return nil
}

markerFileBytes, err = readMarkerFileCommon(markerFile)
return err
}

Expand All @@ -64,39 +44,6 @@ func readMarkerFile(markerFile string) ([]byte, error) {
return markerFileBytes, nil
}

// On Windows, writeMarkerFile tries to write the marker file, retrying with
// randomized exponential backoff up to markerAccessTimeout duration. This retry
// mechanism is necessary since the marker file could be accessed by multiple
// processes (the Upgrade Watcher and the main Agent process) at the same time,
// which could fail on Windows.
func writeMarkerFile(markerFile string, markerBytes []byte, shouldFsync bool) error {
writeFn := func() error {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for writing: %w", markerFile, err)
}

defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = errors.Join(err, fmt.Errorf("unlocking marker file after writing: %w", errUnlock))
}
}(fileLock)
return writeMarkerFileCommon(markerFile, markerBytes, shouldFsync)
}

if err := accessMarkerFileWithRetries(writeFn); err != nil {
return fmt.Errorf("failed to write upgrade marker file [%s] despite retrying: %w", markerFile, err)
}

return nil
}

func accessMarkerFileWithRetries(accessFn func() error) error {
expBackoff := backoff.NewExponentialBackOff()
expBackoff.InitialInterval = markerAccessBackoffInitialInterval
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/agent/application/upgrade/marker_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (mfw *MarkerFileWatcher) Run(ctx context.Context) error {
}

func (mfw *MarkerFileWatcher) processMarker(currentVersion string, commit string) {
marker, err := loadMarker(mfw.markerFilePath)
marker, err := LoadMarkerFromFile(mfw.markerFilePath)
if err != nil {
mfw.logger.Error(err)
return
Expand Down
95 changes: 91 additions & 4 deletions internal/pkg/agent/application/upgrade/step_mark.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package upgrade

import (
"encoding/json"
goerrors "errors"
"fmt"
"os"
"path/filepath"
Expand Down Expand Up @@ -264,9 +265,25 @@ func UpdateActiveCommit(log *logger.Logger, topDirPath, hash string) error {
}

// CleanMarker removes a marker from disk.
func CleanMarker(log *logger.Logger, dataDirPath string) error {
func CleanMarker(log *logger.Logger, dataDirPath string) (err error) {
markerFile := markerFilePath(dataDirPath)
log.Infow("Removing marker file", "file.path", markerFile)

fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

if err := fileLock.Lock(); err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = goerrors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

if err := os.Remove(markerFile); !os.IsNotExist(err) {
return err
}
Expand All @@ -277,7 +294,61 @@ func CleanMarker(log *logger.Logger, dataDirPath string) error {
// LoadMarker loads the update marker. If the file does not exist it returns nil
// and no error.
func LoadMarker(dataDirPath string) (*UpdateMarker, error) {
return loadMarker(markerFilePath(dataDirPath))
markerFile := markerFilePath(dataDirPath)
return LoadMarkerFromFile(markerFile)
}

// LoadMarkerFromFile loads the update marker from file. If the file does not exist it returns nil
// and no error.
func LoadMarkerFromFile(markerFile string) (marker *UpdateMarker, err error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return nil, fmt.Errorf("creating update marker locker for writing: %w", err)
}

if err := fileLock.Lock(); err != nil {
return nil, fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = goerrors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

return loadMarker(markerFile)
}

func UpdateMarkerFile(markerFile string, opts ...func(*UpdateMarker)) (err error) {
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for reading: %w", err)
}
err = fileLock.Lock()
if err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = goerrors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

marker, err := loadMarker(markerFile)
if err != nil {
return fmt.Errorf("failed to read marker file during updating: %w", err)
}

for _, opt := range opts {
opt(marker)
}

if err := saveMarkerToPath(marker, markerFile, true); err != nil {
return fmt.Errorf("failed updating marker file: %w", err)
}

return nil
}

func loadMarker(markerFile string) (*UpdateMarker, error) {
Expand Down Expand Up @@ -313,8 +384,24 @@ func loadMarker(markerFile string) (*UpdateMarker, error) {
// SaveMarker serializes and persists the given upgrade marker to disk.
// For critical upgrade transitions, pass shouldFsync as true so the marker
// file is immediately flushed to persistent storage.
func SaveMarker(marker *UpdateMarker, shouldFsync bool) error {
return saveMarkerToPath(marker, markerFilePath(paths.Data()), shouldFsync)
func SaveMarker(marker *UpdateMarker, shouldFsync bool) (err error) {
markerFile := markerFilePath(paths.Data())
fileLock, err := newMarkerFileLocker(markerFile)
if err != nil {
return fmt.Errorf("creating update marker locker for writing: %w", err)
}

if err := fileLock.Lock(); err != nil {
return fmt.Errorf("locking update marker file %q for reading: %w", markerFile, err)
}
defer func(fileLock Locker) {
errUnlock := fileLock.Unlock()
if errUnlock != nil {
err = goerrors.Join(err, fmt.Errorf("unlocking marker file after reading: %w", errUnlock))
}
}(fileLock)

return saveMarkerToPath(marker, markerFile, shouldFsync)
}

func saveMarkerToPath(marker *UpdateMarker, markerFile string, shouldFsync bool) error {
Expand Down