Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.9] Automated cherry pick of #57422: Rework method of updating atomic-updated data volumes #58458

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 23 additions & 1 deletion pkg/volume/configmap/configmap_test.go
Expand Up @@ -466,13 +466,35 @@ func TestPluginOptional(t *testing.T) {
}
}

datadirSymlink := path.Join(volumePath, "..data")
datadir, err := os.Readlink(datadirSymlink)
if err != nil && os.IsNotExist(err) {
t.Fatalf("couldn't find volume path's data dir, %s", datadirSymlink)
} else if err != nil {
t.Fatalf("couldn't read symlink, %s", datadirSymlink)
}
datadirPath := path.Join(volumePath, datadir)

infos, err := ioutil.ReadDir(volumePath)
if err != nil {
t.Fatalf("couldn't find volume path, %s", volumePath)
}
if len(infos) != 0 {
t.Errorf("empty directory, %s, not found", volumePath)
for _, fi := range infos {
if fi.Name() != "..data" && fi.Name() != datadir {
t.Errorf("empty data directory, %s, is not empty. Contains: %s", datadirSymlink, fi.Name())
}
}
}

infos, err = ioutil.ReadDir(datadirPath)
if err != nil {
t.Fatalf("couldn't find volume data path, %s", datadirPath)
}
if len(infos) != 0 {
t.Errorf("empty data directory, %s, is not empty. Contains: %s", datadirSymlink, infos[0].Name())
}

doTestCleanAndTeardown(plugin, testPodUID, testVolumeName, volumePath, t)
}

Expand Down
23 changes: 22 additions & 1 deletion pkg/volume/projected/projected_test.go
Expand Up @@ -905,12 +905,33 @@ func TestPluginOptional(t *testing.T) {
}
}

datadirSymlink := path.Join(volumePath, "..data")
datadir, err := os.Readlink(datadirSymlink)
if err != nil && os.IsNotExist(err) {
t.Fatalf("couldn't find volume path's data dir, %s", datadirSymlink)
} else if err != nil {
t.Fatalf("couldn't read symlink, %s", datadirSymlink)
}
datadirPath := path.Join(volumePath, datadir)

infos, err := ioutil.ReadDir(volumePath)
if err != nil {
t.Fatalf("couldn't find volume path, %s", volumePath)
}
if len(infos) != 0 {
t.Errorf("empty directory, %s, not found", volumePath)
for _, fi := range infos {
if fi.Name() != "..data" && fi.Name() != datadir {
t.Errorf("empty data volume directory, %s, is not empty. Contains: %s", datadirSymlink, fi.Name())
}
}
}

infos, err = ioutil.ReadDir(datadirPath)
if err != nil {
t.Fatalf("couldn't find volume data path, %s", datadirPath)
}
if len(infos) != 0 {
t.Errorf("empty data directory, %s, is not empty. Contains: %s", datadirSymlink, infos[0].Name())
}

defer doTestCleanAndTeardown(plugin, testPodUID, testVolumeName, volumePath, t)
Expand Down
23 changes: 22 additions & 1 deletion pkg/volume/secret/secret_test.go
Expand Up @@ -477,12 +477,33 @@ func TestPluginOptional(t *testing.T) {
}
}

datadirSymlink := path.Join(volumePath, "..data")
datadir, err := os.Readlink(datadirSymlink)
if err != nil && os.IsNotExist(err) {
t.Fatalf("couldn't find volume path's data dir, %s", datadirSymlink)
} else if err != nil {
t.Fatalf("couldn't read symlink, %s", datadirSymlink)
}
datadirPath := path.Join(volumePath, datadir)

infos, err := ioutil.ReadDir(volumePath)
if err != nil {
t.Fatalf("couldn't find volume path, %s", volumePath)
}
if len(infos) != 0 {
t.Errorf("empty directory, %s, not found", volumePath)
for _, fi := range infos {
if fi.Name() != "..data" && fi.Name() != datadir {
t.Errorf("empty data volume directory, %s, is not empty. Contains: %s", datadirSymlink, fi.Name())
}
}
}

infos, err = ioutil.ReadDir(datadirPath)
if err != nil {
t.Fatalf("couldn't find volume data path, %s", datadirPath)
}
if len(infos) != 0 {
t.Errorf("empty data directory, %s, is not empty. Contains: %s", datadirSymlink, infos[0].Name())
}

defer doTestCleanAndTeardown(plugin, testPodUID, testVolumeName, volumePath, t)
Expand Down
158 changes: 77 additions & 81 deletions pkg/volume/util/atomic_writer.go
Expand Up @@ -88,14 +88,15 @@ const (
// The Write algorithm is:
//
// 1. The payload is validated; if the payload is invalid, the function returns
// 2. The user-visible portion of the volume is walked to determine whether any
// 2.  The current timestamped directory is detected by reading the data directory
// symlink
// 3. The old version of the volume is walked to determine whether any
// portion of the payload was deleted and is still present on disk.
// If the payload is already present on disk and there are no deleted files,
// the function returns
// 3. A check is made to determine whether data present in the payload has changed
// 4.  A new timestamped dir is created
// 5. The payload is written to the new timestamped directory
// 6.  Symlinks and directory for new user-visible files are created (if needed).
// 4. The data in the current timestamped directory is compared to the projected
// data to determine if an update is required.
// 5.  A new timestamped dir is created
// 6. The payload is written to the new timestamped directory
// 7.  Symlinks and directory for new user-visible files are created (if needed).
//
// For example, consider the files:
// <target-dir>/podName
Expand All @@ -104,16 +105,12 @@ const (
//
// The user visible files are symbolic links into the internal data directory:
// <target-dir>/podName -> ..data/podName
// <target-dir>/usr/labels -> ../..data/usr/labels
// <target-dir>/k8s/annotations -> ../..data/k8s/annotations
//
// Relative links are created into the data directory for files in subdirectories.
// <target-dir>/usr -> ..data/usr
// <target-dir>/k8s -> ..data/k8s
//
// The data directory itself is a link to a timestamped directory with
// the real data:
// <target-dir>/..data -> ..2016_02_01_15_04_05.12345678/
// 7.  The current timestamped directory is detected by reading the data directory
// symlink
// 8.  A symlink to the new timestamped directory ..data_tmp is created that will
// become the new data directory
// 9.  The new data directory symlink is renamed to the data directory; rename is atomic
Expand All @@ -128,53 +125,63 @@ func (w *AtomicWriter) Write(payload map[string]FileProjection) error {
}

// (2)
pathsToRemove, err := w.pathsToRemove(cleanPayload)
dataDirPath := path.Join(w.targetDir, dataDirName)
oldTsDir, err := os.Readlink(dataDirPath)
if err != nil {
glog.Errorf("%s: error determining user-visible files to remove: %v", w.logContext, err)
return err
if !os.IsNotExist(err) {
glog.Errorf("%s: error reading link for data directory: %v", w.logContext, err)
return err
}
// although Readlink() returns "" on err, don't be fragile by relying on it (since it's not specified in docs)
// empty oldTsDir indicates that it didn't exist
oldTsDir = ""
}
oldTsPath := path.Join(w.targetDir, oldTsDir)

// (3)
if should, err := w.shouldWritePayload(cleanPayload); err != nil {
glog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err)
return err
} else if !should && len(pathsToRemove) == 0 {
glog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir)
return nil
} else {
glog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir)
var pathsToRemove sets.String
// if there was no old version, there's nothing to remove
if len(oldTsDir) != 0 {
// (3)
pathsToRemove, err = w.pathsToRemove(cleanPayload, oldTsPath)
if err != nil {
glog.Errorf("%s: error determining user-visible files to remove: %v", w.logContext, err)
return err
}

// (4)
if should, err := shouldWritePayload(cleanPayload, oldTsPath); err != nil {
glog.Errorf("%s: error determining whether payload should be written to disk: %v", w.logContext, err)
return err
} else if !should && len(pathsToRemove) == 0 {
glog.V(4).Infof("%s: no update required for target directory %v", w.logContext, w.targetDir)
return nil
} else {
glog.V(4).Infof("%s: write required for target directory %v", w.logContext, w.targetDir)
}
}

// (4)
// (5)
tsDir, err := w.newTimestampDir()
if err != nil {
glog.V(4).Infof("%s: error creating new ts data directory: %v", w.logContext, err)
return err
}
tsDirName := filepath.Base(tsDir)

// (5)
// (6)
if err = w.writePayloadToDir(cleanPayload, tsDir); err != nil {
glog.Errorf("%s: error writing payload to ts data directory %s: %v", w.logContext, tsDir, err)
return err
} else {
glog.V(4).Infof("%s: performed write of new data to ts data directory: %s", w.logContext, tsDir)
}

// (6)
// (7)
if err = w.createUserVisibleFiles(cleanPayload); err != nil {
glog.Errorf("%s: error creating visible symlinks in %s: %v", w.logContext, w.targetDir, err)
return err
}

// (7)
_, tsDirName := filepath.Split(tsDir)
dataDirPath := path.Join(w.targetDir, dataDirName)
oldTsDir, err := os.Readlink(dataDirPath)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("%s: error reading link for data directory: %v", w.logContext, err)
return err
}

// (8)
newDataDirPath := path.Join(w.targetDir, newDataDirName)
if err = os.Symlink(tsDirName, newDataDirPath); err != nil {
Expand Down Expand Up @@ -206,7 +213,7 @@ func (w *AtomicWriter) Write(payload map[string]FileProjection) error {

// (11)
if len(oldTsDir) > 0 {
if err = os.RemoveAll(path.Join(w.targetDir, oldTsDir)); err != nil {
if err = os.RemoveAll(oldTsPath); err != nil {
glog.Errorf("%s: error removing old data directory %s: %v", w.logContext, oldTsDir, err)
return err
}
Expand Down Expand Up @@ -270,9 +277,9 @@ func validatePath(targetPath string) error {
}

// shouldWritePayload returns whether the payload should be written to disk.
func (w *AtomicWriter) shouldWritePayload(payload map[string]FileProjection) (bool, error) {
func shouldWritePayload(payload map[string]FileProjection, oldTsDir string) (bool, error) {
for userVisiblePath, fileProjection := range payload {
shouldWrite, err := w.shouldWriteFile(path.Join(w.targetDir, userVisiblePath), fileProjection.Data)
shouldWrite, err := shouldWriteFile(path.Join(oldTsDir, userVisiblePath), fileProjection.Data)
if err != nil {
return false, err
}
Expand All @@ -286,7 +293,7 @@ func (w *AtomicWriter) shouldWritePayload(payload map[string]FileProjection) (bo
}

// shouldWriteFile returns whether a new version of a file should be written to disk.
func (w *AtomicWriter) shouldWriteFile(path string, content []byte) (bool, error) {
func shouldWriteFile(path string, content []byte) (bool, error) {
_, err := os.Lstat(path)
if os.IsNotExist(err) {
return true, nil
Expand All @@ -300,27 +307,23 @@ func (w *AtomicWriter) shouldWriteFile(path string, content []byte) (bool, error
return (bytes.Compare(content, contentOnFs) != 0), nil
}

// pathsToRemove walks the user-visible portion of the target directory and
// pathsToRemove walks the current version of the data directory and
// determines which paths should be removed (if any) after the payload is
// written to the target directory.
func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection) (sets.String, error) {
func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection, oldTsDir string) (sets.String, error) {
paths := sets.NewString()
visitor := func(path string, info os.FileInfo, err error) error {
if path == w.targetDir {
return nil
}

relativePath := strings.TrimPrefix(path, w.targetDir)
relativePath := strings.TrimPrefix(path, oldTsDir)
relativePath = strings.TrimPrefix(relativePath, string(os.PathSeparator))
if strings.HasPrefix(relativePath, "..") {
if relativePath == "" {
return nil
}

paths.Insert(relativePath)
return nil
}

err := filepath.Walk(w.targetDir, visitor)
err := filepath.Walk(oldTsDir, visitor)
if os.IsNotExist(err) {
return nil, nil
} else if err != nil {
Expand Down Expand Up @@ -348,7 +351,7 @@ func (w *AtomicWriter) pathsToRemove(payload map[string]FileProjection) (sets.St

// newTimestampDir creates a new timestamp directory
func (w *AtomicWriter) newTimestampDir() (string, error) {
tsDir, err := ioutil.TempDir(w.targetDir, fmt.Sprintf("..%s.", time.Now().Format("1981_02_01_15_04_05")))
tsDir, err := ioutil.TempDir(w.targetDir, time.Now().UTC().Format("..2006_01_02_15_04_05."))
if err != nil {
glog.Errorf("%s: unable to create new temp directory: %v", w.logContext, err)
return "", err
Expand Down Expand Up @@ -405,34 +408,22 @@ func (w *AtomicWriter) writePayloadToDir(payload map[string]FileProjection, dir
//
// Viz:
// For files: "bar", "foo/bar", "baz/bar", "foo/baz/blah"
// the following symlinks and subdirectories are created:
// bar -> ..data/bar
// foo/bar -> ../..data/foo/bar
// baz/bar -> ../..data/baz/bar
// foo/baz/blah -> ../../..data/foo/baz/blah
// the following symlinks are created:
// bar -> ..data/bar
// foo -> ..data/foo
// baz -> ..data/baz
func (w *AtomicWriter) createUserVisibleFiles(payload map[string]FileProjection) error {
for userVisiblePath := range payload {
dir, _ := filepath.Split(userVisiblePath)
subDirs := 0
if len(dir) > 0 {
// If dir is not empty, the projection path contains at least one
// subdirectory (example: userVisiblePath := "foo/bar").
// Since filepath.Split leaves a trailing path separator, in this
// example, dir = "foo/". In order to calculate the number of
// subdirectories, we must subtract 1 from the number returned by split.
subDirs = len(strings.Split(dir, string(os.PathSeparator))) - 1
err := os.MkdirAll(path.Join(w.targetDir, dir), os.ModePerm)
if err != nil {
return err
}
slashpos := strings.Index(userVisiblePath, string(os.PathSeparator))
if slashpos == -1 {
slashpos = len(userVisiblePath)
}
_, err := os.Readlink(path.Join(w.targetDir, userVisiblePath))
linkname := userVisiblePath[:slashpos]
_, err := os.Readlink(path.Join(w.targetDir, linkname))
if err != nil && os.IsNotExist(err) {
// The link into the data directory for this path doesn't exist; create it,
// respecting the number of subdirectories necessary to link
// correctly back into the data directory.
visibleFile := path.Join(w.targetDir, userVisiblePath)
dataDirFile := path.Join(strings.Repeat("../", subDirs), dataDirName, userVisiblePath)
// The link into the data directory for this path doesn't exist; create it
visibleFile := path.Join(w.targetDir, linkname)
dataDirFile := path.Join(dataDirName, linkname)

err = os.Symlink(dataDirFile, visibleFile)
if err != nil {
Expand All @@ -446,13 +437,18 @@ func (w *AtomicWriter) createUserVisibleFiles(payload map[string]FileProjection)
// removeUserVisiblePaths removes the set of paths from the user-visible
// portion of the writer's target directory.
func (w *AtomicWriter) removeUserVisiblePaths(paths sets.String) error {
orderedPaths := paths.List()
for ii := len(orderedPaths) - 1; ii >= 0; ii-- {
if err := os.Remove(path.Join(w.targetDir, orderedPaths[ii])); err != nil {
glog.Errorf("%s: error pruning old user-visible path %s: %v", w.logContext, orderedPaths[ii], err)
return err
ps := string(os.PathSeparator)
var lasterr error
for p := range paths {
// only remove symlinks from the volume root directory (i.e. items that don't contain '/')
if strings.Contains(p, ps) {
continue
}
if err := os.Remove(path.Join(w.targetDir, p)); err != nil {
glog.Errorf("%s: error pruning old user-visible path %s: %v", w.logContext, p, err)
lasterr = err
}
}

return nil
return lasterr
}