From 3622ee3c141e581b9307ffdc00424b7eb1f14746 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 6 Jun 2021 18:05:28 -0700 Subject: [PATCH 1/5] Move asset copying logic to pkg --- cmd/kops/get_assets.go | 66 +------------------- upup/pkg/fi/assettasks/BUILD.bazel | 3 + upup/pkg/fi/assettasks/copy.go | 99 ++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 63 deletions(-) create mode 100644 upup/pkg/fi/assettasks/copy.go diff --git a/cmd/kops/get_assets.go b/cmd/kops/get_assets.go index 671ed0ddf9801..d708b5e2fe132 100644 --- a/cmd/kops/get_assets.go +++ b/cmd/kops/get_assets.go @@ -1,5 +1,5 @@ /* -Copyright 2020 The Kubernetes Authors. +Copyright 2021 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -22,7 +22,6 @@ import ( "fmt" "io" - "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/assettasks" "k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/templates" @@ -58,9 +57,6 @@ type AssetResult struct { Files []*File `json:"files,omitempty"` } -type copyAssetsTarget struct { -} - func NewCmdGetAssets(f *util.Factory, out io.Writer, getOptions *GetOptions) *cobra.Command { options := GetAssetsOptions{ GetOptions: getOptions, @@ -120,7 +116,6 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * Images: make([]*Image, 0, len(updateClusterResults.ImageAssets)), Files: make([]*File, 0, len(updateClusterResults.FileAssets)), } - tasks := map[string]fi.Task{} seen := map[string]bool{} for _, imageAsset := range updateClusterResults.ImageAssets { @@ -132,24 +127,6 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * result.Images = append(result.Images, &image) seen[image.Canonical] = true } - - if options.Copy && imageAsset.DownloadLocation != imageAsset.CanonicalLocation { - ctx := &fi.ModelBuilderContext{ - Tasks: tasks, - } - - copyImageTask := &assettasks.CopyImage{ - Name: fi.String(imageAsset.DownloadLocation), - SourceImage: fi.String(imageAsset.CanonicalLocation), - TargetImage: fi.String(imageAsset.DownloadLocation), - Lifecycle: fi.LifecycleSync, - } - - if err := ctx.EnsureTask(copyImageTask); err != nil { - return fmt.Errorf("error adding image-copy task: %v", err) - } - tasks = ctx.Tasks - } } seen = map[string]bool{} @@ -163,41 +140,12 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * result.Files = append(result.Files, &file) seen[file.Canonical] = true } - - // test if the asset needs to be copied - if options.Copy && fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { - ctx := &fi.ModelBuilderContext{ - Tasks: tasks, - } - - copyFileTask := &assettasks.CopyFile{ - Name: fi.String(fileAsset.CanonicalURL.String()), - TargetFile: fi.String(fileAsset.DownloadURL.String()), - SourceFile: fi.String(fileAsset.CanonicalURL.String()), - SHA: fi.String(fileAsset.SHAValue), - Lifecycle: fi.LifecycleSync, - } - - if err := ctx.EnsureTask(copyFileTask); err != nil { - return fmt.Errorf("error adding file-copy task: %v", err) - } - tasks = ctx.Tasks - } } if options.Copy { - var options fi.RunTasksOptions - options.InitDefaults() - - context, err := fi.NewContext(©AssetsTarget{}, updateClusterResults.Cluster, nil, nil, nil, nil, true, tasks) - if err != nil { - return fmt.Errorf("error building context: %v", err) - } - defer context.Close() - - err = context.RunTasks(options) + err := assettasks.Copy(updateClusterResults.ImageAssets, updateClusterResults.FileAssets, updateClusterResults.Cluster) if err != nil { - return fmt.Errorf("error running tasks: %v", err) + return err } } @@ -260,11 +208,3 @@ func fileOutputTable(files []*File, out io.Writer) error { columns := []string{"CANONICAL", "DOWNLOAD", "SHA"} return t.Render(files, out, columns...) } - -func (c copyAssetsTarget) Finish(taskMap map[string]fi.Task) error { - return nil -} - -func (c copyAssetsTarget) ProcessDeletions() bool { - return false -} diff --git a/upup/pkg/fi/assettasks/BUILD.bazel b/upup/pkg/fi/assettasks/BUILD.bazel index 60e98fa88f52c..bc3ef23fc73d2 100644 --- a/upup/pkg/fi/assettasks/BUILD.bazel +++ b/upup/pkg/fi/assettasks/BUILD.bazel @@ -3,6 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", srcs = [ + "copy.go", "copyfile.go", "copyfile_fitask.go", "copyimage.go", @@ -14,6 +15,8 @@ go_library( visibility = ["//visibility:public"], deps = [ "//pkg/acls:go_default_library", + "//pkg/apis/kops:go_default_library", + "//pkg/assets:go_default_library", "//upup/pkg/fi:go_default_library", "//util/pkg/hashing:go_default_library", "//util/pkg/vfs:go_default_library", diff --git a/upup/pkg/fi/assettasks/copy.go b/upup/pkg/fi/assettasks/copy.go new file mode 100644 index 0000000000000..b6b9fe84ecf05 --- /dev/null +++ b/upup/pkg/fi/assettasks/copy.go @@ -0,0 +1,99 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package assettasks + +import ( + "fmt" + + "k8s.io/kops/pkg/apis/kops" + "k8s.io/kops/pkg/assets" + "k8s.io/kops/upup/pkg/fi" +) + +type copyAssetsTarget struct { +} + +func (c copyAssetsTarget) Finish(taskMap map[string]fi.Task) error { + return nil +} + +func (c copyAssetsTarget) ProcessDeletions() bool { + return false +} + +func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, cluster *kops.Cluster) error { + tasks := map[string]fi.Task{} + + for _, imageAsset := range imageAssets { + if imageAsset.DownloadLocation != imageAsset.CanonicalLocation { + ctx := &fi.ModelBuilderContext{ + Tasks: tasks, + } + + copyImageTask := &CopyImage{ + Name: fi.String(imageAsset.DownloadLocation), + SourceImage: fi.String(imageAsset.CanonicalLocation), + TargetImage: fi.String(imageAsset.DownloadLocation), + Lifecycle: fi.LifecycleSync, + } + + if err := ctx.EnsureTask(copyImageTask); err != nil { + return fmt.Errorf("error adding image-copy task: %v", err) + } + tasks = ctx.Tasks + } + } + + for _, fileAsset := range fileAssets { + + // test if the asset needs to be copied + if fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { + ctx := &fi.ModelBuilderContext{ + Tasks: tasks, + } + + copyFileTask := &CopyFile{ + Name: fi.String(fileAsset.CanonicalURL.String()), + TargetFile: fi.String(fileAsset.DownloadURL.String()), + SourceFile: fi.String(fileAsset.CanonicalURL.String()), + SHA: fi.String(fileAsset.SHAValue), + Lifecycle: fi.LifecycleSync, + } + + if err := ctx.EnsureTask(copyFileTask); err != nil { + return fmt.Errorf("error adding file-copy task: %v", err) + } + tasks = ctx.Tasks + } + } + + var options fi.RunTasksOptions + options.InitDefaults() + + context, err := fi.NewContext(©AssetsTarget{}, cluster, nil, nil, nil, nil, true, tasks) + if err != nil { + return fmt.Errorf("error building context: %v", err) + } + defer context.Close() + + err = context.RunTasks(options) + if err != nil { + return fmt.Errorf("error running tasks: %v", err) + } + + return nil +} From 3dff315e2763c90c05187e470d58066f97fb8253 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 6 Jun 2021 21:08:33 -0700 Subject: [PATCH 2/5] Limit concurrency of asset copy tasks --- upup/pkg/fi/assettasks/copy.go | 42 +++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/upup/pkg/fi/assettasks/copy.go b/upup/pkg/fi/assettasks/copy.go index b6b9fe84ecf05..545abe7c127f7 100644 --- a/upup/pkg/fi/assettasks/copy.go +++ b/upup/pkg/fi/assettasks/copy.go @@ -18,7 +18,9 @@ package assettasks import ( "fmt" + "sort" + "k8s.io/klog/v2" "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/assets" "k8s.io/kops/upup/pkg/fi" @@ -90,10 +92,44 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus } defer context.Close() - err = context.RunTasks(options) - if err != nil { - return fmt.Errorf("error running tasks: %v", err) + ch := make(chan error, 5) + for i := 0; i < cap(ch); i++ { + ch <- nil + } + + gotError := false + names := make([]string, 0, len(tasks)) + for name := range tasks { + names = append(names, name) + } + sort.Strings(names) + for _, name := range names { + task := tasks[name] + err := <-ch + if err != nil { + klog.Warning(err) + gotError = true + } + go func(n string, t fi.Task) { + err := t.Run(context) + if err != nil { + err = fmt.Errorf("%s: %v", n, err) + } + ch <- err + }(name, task) } + for i := 0; i < cap(ch); i++ { + err = <-ch + if err != nil { + klog.Warning(err) + gotError = true + } + } + + close(ch) + if gotError { + return fmt.Errorf("not all assets copied successfully") + } return nil } From c08479186e40fb934bfce26e432c4188cce1036d Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 6 Jun 2021 22:45:46 -0700 Subject: [PATCH 3/5] Remove unneeded Task machinery from copy tasks --- upup/pkg/fi/assettasks/BUILD.bazel | 2 - upup/pkg/fi/assettasks/copy.go | 37 +++++----- upup/pkg/fi/assettasks/copyfile.go | 63 +++-------------- upup/pkg/fi/assettasks/copyfile_fitask.go | 51 -------------- upup/pkg/fi/assettasks/copyimage.go | 81 ---------------------- upup/pkg/fi/assettasks/copyimage_fitask.go | 51 -------------- 6 files changed, 30 insertions(+), 255 deletions(-) delete mode 100644 upup/pkg/fi/assettasks/copyfile_fitask.go delete mode 100644 upup/pkg/fi/assettasks/copyimage_fitask.go diff --git a/upup/pkg/fi/assettasks/BUILD.bazel b/upup/pkg/fi/assettasks/BUILD.bazel index bc3ef23fc73d2..148ef3b3d7ef4 100644 --- a/upup/pkg/fi/assettasks/BUILD.bazel +++ b/upup/pkg/fi/assettasks/BUILD.bazel @@ -5,9 +5,7 @@ go_library( srcs = [ "copy.go", "copyfile.go", - "copyfile_fitask.go", "copyimage.go", - "copyimage_fitask.go", "docker_api.go", "docker_cli.go", ], diff --git a/upup/pkg/fi/assettasks/copy.go b/upup/pkg/fi/assettasks/copy.go index 545abe7c127f7..c28f432d21cfd 100644 --- a/upup/pkg/fi/assettasks/copy.go +++ b/upup/pkg/fi/assettasks/copy.go @@ -42,44 +42,45 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus for _, imageAsset := range imageAssets { if imageAsset.DownloadLocation != imageAsset.CanonicalLocation { - ctx := &fi.ModelBuilderContext{ - Tasks: tasks, - } - copyImageTask := &CopyImage{ Name: fi.String(imageAsset.DownloadLocation), SourceImage: fi.String(imageAsset.CanonicalLocation), TargetImage: fi.String(imageAsset.DownloadLocation), - Lifecycle: fi.LifecycleSync, } - if err := ctx.EnsureTask(copyImageTask); err != nil { - return fmt.Errorf("error adding image-copy task: %v", err) + if existing, ok := tasks[*copyImageTask.Name]; ok { + if *existing.(*CopyImage).SourceImage != *copyImageTask.SourceImage { + return fmt.Errorf("different sources for same image target %s: %s vs %s", *copyImageTask.Name, *copyImageTask.SourceImage, *existing.(*CopyImage).SourceImage) + } } - tasks = ctx.Tasks + + tasks[*copyImageTask.Name] = copyImageTask } } for _, fileAsset := range fileAssets { - - // test if the asset needs to be copied if fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { - ctx := &fi.ModelBuilderContext{ - Tasks: tasks, - } - copyFileTask := &CopyFile{ Name: fi.String(fileAsset.CanonicalURL.String()), TargetFile: fi.String(fileAsset.DownloadURL.String()), SourceFile: fi.String(fileAsset.CanonicalURL.String()), SHA: fi.String(fileAsset.SHAValue), - Lifecycle: fi.LifecycleSync, } - if err := ctx.EnsureTask(copyFileTask); err != nil { - return fmt.Errorf("error adding file-copy task: %v", err) + if existing, ok := tasks[*copyFileTask.Name]; ok { + e, ok := existing.(*CopyFile) + if !ok { + return fmt.Errorf("different types for copy target %s", *copyFileTask.Name) + } + if *e.TargetFile != *copyFileTask.TargetFile { + return fmt.Errorf("different targets for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.TargetFile, *e.TargetFile) + } + if *e.SHA != *copyFileTask.SHA { + return fmt.Errorf("different sha for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.SHA, *e.SHA) + } } - tasks = ctx.Tasks + + tasks[*copyFileTask.Name] = copyFileTask } } diff --git a/upup/pkg/fi/assettasks/copyfile.go b/upup/pkg/fi/assettasks/copyfile.go index 6bf1973ba43cc..bdb5ab7a0a1ca 100644 --- a/upup/pkg/fi/assettasks/copyfile.go +++ b/upup/pkg/fi/assettasks/copyfile.go @@ -32,20 +32,11 @@ import ( // CopyFile copies an from a source file repository, to a target repository, // typically used for highly secure clusters. -// +kops:fitask type CopyFile struct { Name *string SourceFile *string TargetFile *string SHA *string - Lifecycle fi.Lifecycle -} - -var _ fi.CompareWithID = &CopyFile{} - -func (e *CopyFile) CompareWithID() *string { - // or should this be the SHA? - return e.Name } // fileExtensionForSHA returns the expected extension for the given hash @@ -61,13 +52,12 @@ func fileExtensionForSHA(sha string) (string, error) { } } -// Find attempts to find a file. -func (e *CopyFile) Find(c *fi.Context) (*CopyFile, error) { +func (e *CopyFile) Run(c *fi.Context) error { expectedSHA := strings.TrimSpace(fi.StringValue(e.SHA)) shaExtension, err := fileExtensionForSHA(expectedSHA) if err != nil { - return nil, err + return err } targetSHAFile := fi.StringValue(e.TargetFile) + shaExtension @@ -77,50 +67,19 @@ func (e *CopyFile) Find(c *fi.Context) (*CopyFile, error) { if os.IsNotExist(err) { klog.V(4).Infof("unable to download: %q, assuming target file is not present, and if not present may not be an error: %v", targetSHAFile, err) - return nil, nil + } else { + klog.V(4).Infof("unable to download: %q, %v", targetSHAFile, err) } - klog.V(4).Infof("unable to download: %q, %v", targetSHAFile, err) - // TODO should we throw err here? - return nil, nil - } - targetSHA := string(targetSHABytes) - - if strings.TrimSpace(targetSHA) == expectedSHA { - actual := &CopyFile{ - Name: e.Name, - TargetFile: e.TargetFile, - SHA: e.SHA, - SourceFile: e.SourceFile, - Lifecycle: e.Lifecycle, - } - klog.V(8).Infof("found matching target sha1 for file: %q", fi.StringValue(e.TargetFile)) - return actual, nil - } - - klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", fi.StringValue(e.TargetFile)) - return nil, nil - -} + } else { + targetSHA := string(targetSHABytes) -// Run is the default run method. -func (e *CopyFile) Run(c *fi.Context) error { - return fi.DefaultDeltaRunMethod(e, c) -} + if strings.TrimSpace(targetSHA) == expectedSHA { + klog.V(8).Infof("found matching target sha for file: %q", fi.StringValue(e.TargetFile)) + return nil + } -func (s *CopyFile) CheckChanges(a, e, changes *CopyFile) error { - if fi.StringValue(e.Name) == "" { - return fi.RequiredField("Name") + klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", fi.StringValue(e.TargetFile)) } - if fi.StringValue(e.SourceFile) == "" { - return fi.RequiredField("SourceFile") - } - if fi.StringValue(e.TargetFile) == "" { - return fi.RequiredField("TargetFile") - } - return nil -} - -func (_ *CopyFile) Render(c *fi.Context, a, e, changes *CopyFile) error { source := fi.StringValue(e.SourceFile) target := fi.StringValue(e.TargetFile) diff --git a/upup/pkg/fi/assettasks/copyfile_fitask.go b/upup/pkg/fi/assettasks/copyfile_fitask.go deleted file mode 100644 index 45fb32191603a..0000000000000 --- a/upup/pkg/fi/assettasks/copyfile_fitask.go +++ /dev/null @@ -1,51 +0,0 @@ -// +build !ignore_autogenerated - -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by fitask. DO NOT EDIT. - -package assettasks - -import ( - "k8s.io/kops/upup/pkg/fi" -) - -// CopyFile - -var _ fi.HasLifecycle = &CopyFile{} - -// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle -func (o *CopyFile) GetLifecycle() fi.Lifecycle { - return o.Lifecycle -} - -// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle -func (o *CopyFile) SetLifecycle(lifecycle fi.Lifecycle) { - o.Lifecycle = lifecycle -} - -var _ fi.HasName = &CopyFile{} - -// GetName returns the Name of the object, implementing fi.HasName -func (o *CopyFile) GetName() *string { - return o.Name -} - -// String is the stringer function for the task, producing readable output using fi.TaskAsString -func (o *CopyFile) String() string { - return fi.TaskAsString(o) -} diff --git a/upup/pkg/fi/assettasks/copyimage.go b/upup/pkg/fi/assettasks/copyimage.go index 0edd51388041a..993ada3ca229b 100644 --- a/upup/pkg/fi/assettasks/copyimage.go +++ b/upup/pkg/fi/assettasks/copyimage.go @@ -25,94 +25,13 @@ import ( // CopyImage copies a docker image from a source registry, to a target registry, // typically used for highly secure clusters. -// +kops:fitask type CopyImage struct { Name *string SourceImage *string TargetImage *string - Lifecycle fi.Lifecycle -} - -var _ fi.CompareWithID = &CopyImage{} - -func (e *CopyImage) CompareWithID() *string { - return e.Name -} - -func (e *CopyImage) Find(c *fi.Context) (*CopyImage, error) { - return nil, nil - - // The problem here is that we can tag a local image with the remote tag, but there is no way to know - // if that has actually been pushed to the remote registry without doing a docker push - - // The solution is probably to query the registries directly, but that is a little bit more code... - - // For now, we just always do the copy; it isn't _too_ slow when things have already been pushed - - //d, err := newDocker() - //if err != nil { - // return nil, err - //} - // - //source := fi.StringValue(e.SourceImage) - //target := fi.StringValue(e.TargetImage) - // - //targetImage, err := d.findImage(target) - //if err != nil { - // return nil, err - //} - //if targetImage == nil { - // klog.V(4).Infof("target image %q not found", target) - // return nil, nil - //} - // - //// We want to verify that the target image matches - //if err := d.pullImage(source); err != nil { - // return nil, err - //} - // - //sourceImage, err := d.findImage(source) - //if err != nil { - // return nil, err - //} - //if sourceImage == nil { - // return nil, fmt.Errorf("source image %q not found", source) - //} - // - //if sourceImage.ID == targetImage.ID { - // actual := &CopyImage{} - // actual.Name = e.Name - // actual.SourceImage = e.SourceImage - // actual.TargetImage = e.TargetImage - // klog.Infof("Found image %q = %s", target, sourceImage.ID) - // return actual, nil - //} - // - //klog.V(2).Infof("Target image %q does not match source %q: %q vs %q", - // target, source, - // targetImage.ID, sourceImage.ID) - // - //return nil, nil } func (e *CopyImage) Run(c *fi.Context) error { - return fi.DefaultDeltaRunMethod(e, c) -} - -func (s *CopyImage) CheckChanges(a, e, changes *CopyImage) error { - if fi.StringValue(e.Name) == "" { - return fi.RequiredField("Name") - } - if fi.StringValue(e.SourceImage) == "" { - return fi.RequiredField("SourceImage") - } - if fi.StringValue(e.TargetImage) == "" { - return fi.RequiredField("TargetImage") - } - return nil -} - -func (_ *CopyImage) Render(c *fi.Context, a, e, changes *CopyImage) error { api, err := newDockerAPI() if err != nil { return err diff --git a/upup/pkg/fi/assettasks/copyimage_fitask.go b/upup/pkg/fi/assettasks/copyimage_fitask.go deleted file mode 100644 index 9e8e470d739d3..0000000000000 --- a/upup/pkg/fi/assettasks/copyimage_fitask.go +++ /dev/null @@ -1,51 +0,0 @@ -// +build !ignore_autogenerated - -/* -Copyright The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Code generated by fitask. DO NOT EDIT. - -package assettasks - -import ( - "k8s.io/kops/upup/pkg/fi" -) - -// CopyImage - -var _ fi.HasLifecycle = &CopyImage{} - -// GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle -func (o *CopyImage) GetLifecycle() fi.Lifecycle { - return o.Lifecycle -} - -// SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle -func (o *CopyImage) SetLifecycle(lifecycle fi.Lifecycle) { - o.Lifecycle = lifecycle -} - -var _ fi.HasName = &CopyImage{} - -// GetName returns the Name of the object, implementing fi.HasName -func (o *CopyImage) GetName() *string { - return o.Name -} - -// String is the stringer function for the task, producing readable output using fi.TaskAsString -func (o *CopyImage) String() string { - return fi.TaskAsString(o) -} From 4b05805f9dd53be435ee7ce639cf198089c9ffb5 Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 6 Jun 2021 23:05:13 -0700 Subject: [PATCH 4/5] Remove copy tasks dependency on pkg/fi --- upup/pkg/fi/assettasks/BUILD.bazel | 1 - upup/pkg/fi/assettasks/copy.go | 66 +++++++++++------------------ upup/pkg/fi/assettasks/copyfile.go | 39 ++++++++--------- upup/pkg/fi/assettasks/copyimage.go | 13 +++--- 4 files changed, 51 insertions(+), 68 deletions(-) diff --git a/upup/pkg/fi/assettasks/BUILD.bazel b/upup/pkg/fi/assettasks/BUILD.bazel index 148ef3b3d7ef4..9afa4d2182688 100644 --- a/upup/pkg/fi/assettasks/BUILD.bazel +++ b/upup/pkg/fi/assettasks/BUILD.bazel @@ -15,7 +15,6 @@ go_library( "//pkg/acls:go_default_library", "//pkg/apis/kops:go_default_library", "//pkg/assets:go_default_library", - "//upup/pkg/fi:go_default_library", "//util/pkg/hashing:go_default_library", "//util/pkg/vfs:go_default_library", "//vendor/github.com/docker/docker/api/types:go_default_library", diff --git a/upup/pkg/fi/assettasks/copy.go b/upup/pkg/fi/assettasks/copy.go index c28f432d21cfd..a297b2f381b62 100644 --- a/upup/pkg/fi/assettasks/copy.go +++ b/upup/pkg/fi/assettasks/copy.go @@ -23,76 +23,60 @@ import ( "k8s.io/klog/v2" "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/assets" - "k8s.io/kops/upup/pkg/fi" ) -type copyAssetsTarget struct { -} - -func (c copyAssetsTarget) Finish(taskMap map[string]fi.Task) error { - return nil -} - -func (c copyAssetsTarget) ProcessDeletions() bool { - return false +type assetTask interface { + Run() error } func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, cluster *kops.Cluster) error { - tasks := map[string]fi.Task{} + tasks := map[string]assetTask{} for _, imageAsset := range imageAssets { if imageAsset.DownloadLocation != imageAsset.CanonicalLocation { copyImageTask := &CopyImage{ - Name: fi.String(imageAsset.DownloadLocation), - SourceImage: fi.String(imageAsset.CanonicalLocation), - TargetImage: fi.String(imageAsset.DownloadLocation), + Name: imageAsset.DownloadLocation, + SourceImage: imageAsset.CanonicalLocation, + TargetImage: imageAsset.DownloadLocation, } - if existing, ok := tasks[*copyImageTask.Name]; ok { - if *existing.(*CopyImage).SourceImage != *copyImageTask.SourceImage { - return fmt.Errorf("different sources for same image target %s: %s vs %s", *copyImageTask.Name, *copyImageTask.SourceImage, *existing.(*CopyImage).SourceImage) + if existing, ok := tasks[copyImageTask.Name]; ok { + if existing.(*CopyImage).SourceImage != copyImageTask.SourceImage { + return fmt.Errorf("different sources for same image target %s: %s vs %s", copyImageTask.Name, copyImageTask.SourceImage, existing.(*CopyImage).SourceImage) } } - tasks[*copyImageTask.Name] = copyImageTask + tasks[copyImageTask.Name] = copyImageTask } } for _, fileAsset := range fileAssets { if fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { copyFileTask := &CopyFile{ - Name: fi.String(fileAsset.CanonicalURL.String()), - TargetFile: fi.String(fileAsset.DownloadURL.String()), - SourceFile: fi.String(fileAsset.CanonicalURL.String()), - SHA: fi.String(fileAsset.SHAValue), + Name: fileAsset.CanonicalURL.String(), + TargetFile: fileAsset.DownloadURL.String(), + SourceFile: fileAsset.CanonicalURL.String(), + SHA: fileAsset.SHAValue, + Cluster: cluster, } - if existing, ok := tasks[*copyFileTask.Name]; ok { + if existing, ok := tasks[copyFileTask.Name]; ok { e, ok := existing.(*CopyFile) if !ok { - return fmt.Errorf("different types for copy target %s", *copyFileTask.Name) + return fmt.Errorf("different types for copy target %s", copyFileTask.Name) } - if *e.TargetFile != *copyFileTask.TargetFile { - return fmt.Errorf("different targets for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.TargetFile, *e.TargetFile) + if e.TargetFile != copyFileTask.TargetFile { + return fmt.Errorf("different targets for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.TargetFile, e.TargetFile) } - if *e.SHA != *copyFileTask.SHA { - return fmt.Errorf("different sha for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.SHA, *e.SHA) + if e.SHA != copyFileTask.SHA { + return fmt.Errorf("different sha for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.SHA, e.SHA) } } - tasks[*copyFileTask.Name] = copyFileTask + tasks[copyFileTask.Name] = copyFileTask } } - var options fi.RunTasksOptions - options.InitDefaults() - - context, err := fi.NewContext(©AssetsTarget{}, cluster, nil, nil, nil, nil, true, tasks) - if err != nil { - return fmt.Errorf("error building context: %v", err) - } - defer context.Close() - ch := make(chan error, 5) for i := 0; i < cap(ch); i++ { ch <- nil @@ -111,8 +95,8 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus klog.Warning(err) gotError = true } - go func(n string, t fi.Task) { - err := t.Run(context) + go func(n string, t assetTask) { + err := t.Run() if err != nil { err = fmt.Errorf("%s: %v", n, err) } @@ -121,7 +105,7 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus } for i := 0; i < cap(ch); i++ { - err = <-ch + err := <-ch if err != nil { klog.Warning(err) gotError = true diff --git a/upup/pkg/fi/assettasks/copyfile.go b/upup/pkg/fi/assettasks/copyfile.go index bdb5ab7a0a1ca..fe669ddc7f469 100644 --- a/upup/pkg/fi/assettasks/copyfile.go +++ b/upup/pkg/fi/assettasks/copyfile.go @@ -25,7 +25,7 @@ import ( "k8s.io/klog/v2" "k8s.io/kops/pkg/acls" - "k8s.io/kops/upup/pkg/fi" + "k8s.io/kops/pkg/apis/kops" "k8s.io/kops/util/pkg/hashing" "k8s.io/kops/util/pkg/vfs" ) @@ -33,10 +33,11 @@ import ( // CopyFile copies an from a source file repository, to a target repository, // typically used for highly secure clusters. type CopyFile struct { - Name *string - SourceFile *string - TargetFile *string - SHA *string + Name string + SourceFile string + TargetFile string + SHA string + Cluster *kops.Cluster } // fileExtensionForSHA returns the expected extension for the given hash @@ -52,15 +53,15 @@ func fileExtensionForSHA(sha string) (string, error) { } } -func (e *CopyFile) Run(c *fi.Context) error { - expectedSHA := strings.TrimSpace(fi.StringValue(e.SHA)) +func (e *CopyFile) Run() error { + expectedSHA := strings.TrimSpace(e.SHA) shaExtension, err := fileExtensionForSHA(expectedSHA) if err != nil { return err } - targetSHAFile := fi.StringValue(e.TargetFile) + shaExtension + targetSHAFile := e.TargetFile + shaExtension targetSHABytes, err := vfs.Context.ReadFile(targetSHAFile) if err != nil { @@ -74,20 +75,20 @@ func (e *CopyFile) Run(c *fi.Context) error { targetSHA := string(targetSHABytes) if strings.TrimSpace(targetSHA) == expectedSHA { - klog.V(8).Infof("found matching target sha for file: %q", fi.StringValue(e.TargetFile)) + klog.V(8).Infof("found matching target sha for file: %q", e.TargetFile) return nil } - klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", fi.StringValue(e.TargetFile)) + klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", e.TargetFile) } - source := fi.StringValue(e.SourceFile) - target := fi.StringValue(e.TargetFile) - sourceSha := fi.StringValue(e.SHA) + source := e.SourceFile + target := e.TargetFile + sourceSha := e.SHA klog.V(2).Infof("copying bits from %q to %q", source, target) - if err := transferFile(c, source, target, sourceSha); err != nil { + if err := transferFile(e.Cluster, source, target, sourceSha); err != nil { return fmt.Errorf("unable to transfer %q to %q: %v", source, target, err) } @@ -96,7 +97,7 @@ func (e *CopyFile) Run(c *fi.Context) error { // transferFile downloads a file from the source location, validates the file matches the SHA, // and uploads the file to the target location. -func transferFile(c *fi.Context, source string, target string, sha string) error { +func transferFile(cluster *kops.Cluster, source string, target string, sha string) error { // TODO drop file to disk, as vfs reads file into memory. We load kubelet into memory for instance. // TODO in s3 can we do a copy file ... would need to test @@ -147,21 +148,21 @@ func transferFile(c *fi.Context, source string, target string, sha string) error } klog.Infof("uploading %q to %q", source, objectStore) - if err := writeFile(c, uploadVFS, data); err != nil { + if err := writeFile(cluster, uploadVFS, data); err != nil { return err } b := []byte(shaHash.Hex()) - if err := writeFile(c, shaVFS, b); err != nil { + if err := writeFile(cluster, shaVFS, b); err != nil { return err } return nil } -func writeFile(c *fi.Context, p vfs.Path, data []byte) error { +func writeFile(cluster *kops.Cluster, p vfs.Path, data []byte) error { - acl, err := acls.GetACL(p, c.Cluster) + acl, err := acls.GetACL(p, cluster) if err != nil { return err } diff --git a/upup/pkg/fi/assettasks/copyimage.go b/upup/pkg/fi/assettasks/copyimage.go index 993ada3ca229b..cdfe037bae537 100644 --- a/upup/pkg/fi/assettasks/copyimage.go +++ b/upup/pkg/fi/assettasks/copyimage.go @@ -20,18 +20,17 @@ import ( "fmt" "k8s.io/klog/v2" - "k8s.io/kops/upup/pkg/fi" ) // CopyImage copies a docker image from a source registry, to a target registry, // typically used for highly secure clusters. type CopyImage struct { - Name *string - SourceImage *string - TargetImage *string + Name string + SourceImage string + TargetImage string } -func (e *CopyImage) Run(c *fi.Context) error { +func (e *CopyImage) Run() error { api, err := newDockerAPI() if err != nil { return err @@ -43,8 +42,8 @@ func (e *CopyImage) Run(c *fi.Context) error { } - source := fi.StringValue(e.SourceImage) - target := fi.StringValue(e.TargetImage) + source := e.SourceImage + target := e.TargetImage klog.Infof("copying docker image from %q to %q", source, target) From a983c65a48919bd49b5d8e044f6000b420b4bc7b Mon Sep 17 00:00:00 2001 From: John Gardiner Myers Date: Sun, 6 Jun 2021 23:09:31 -0700 Subject: [PATCH 5/5] Move assettasks to pkg/assets --- cmd/kops/BUILD.bazel | 1 - cmd/kops/get_assets.go | 4 +-- pkg/assets/BUILD.bazel | 18 +++++++++-- .../pkg/fi/assettasks => pkg/assets}/copy.go | 5 ++- .../fi/assettasks => pkg/assets}/copyfile.go | 2 +- .../assets}/copyfile_test.go | 2 +- .../fi/assettasks => pkg/assets}/copyimage.go | 2 +- .../assettasks => pkg/assets}/docker_api.go | 2 +- .../assettasks => pkg/assets}/docker_cli.go | 2 +- upup/pkg/fi/assettasks/BUILD.bazel | 31 ------------------- 10 files changed, 25 insertions(+), 44 deletions(-) rename {upup/pkg/fi/assettasks => pkg/assets}/copy.go (95%) rename {upup/pkg/fi/assettasks => pkg/assets}/copyfile.go (99%) rename {upup/pkg/fi/assettasks => pkg/assets}/copyfile_test.go (99%) rename {upup/pkg/fi/assettasks => pkg/assets}/copyimage.go (98%) rename {upup/pkg/fi/assettasks => pkg/assets}/docker_api.go (99%) rename {upup/pkg/fi/assettasks => pkg/assets}/docker_cli.go (98%) delete mode 100644 upup/pkg/fi/assettasks/BUILD.bazel diff --git a/cmd/kops/BUILD.bazel b/cmd/kops/BUILD.bazel index b29916970561c..5ec7942d7ec7f 100644 --- a/cmd/kops/BUILD.bazel +++ b/cmd/kops/BUILD.bazel @@ -90,7 +90,6 @@ go_library( "//pkg/util/templater:go_default_library", "//pkg/validation:go_default_library", "//upup/pkg/fi:go_default_library", - "//upup/pkg/fi/assettasks:go_default_library", "//upup/pkg/fi/cloudup:go_default_library", "//upup/pkg/fi/cloudup/awsup:go_default_library", "//upup/pkg/fi/utils:go_default_library", diff --git a/cmd/kops/get_assets.go b/cmd/kops/get_assets.go index d708b5e2fe132..a25f397bc0be8 100644 --- a/cmd/kops/get_assets.go +++ b/cmd/kops/get_assets.go @@ -22,7 +22,7 @@ import ( "fmt" "io" - "k8s.io/kops/upup/pkg/fi/assettasks" + "k8s.io/kops/pkg/assets" "k8s.io/kubectl/pkg/util/i18n" "k8s.io/kubectl/pkg/util/templates" "sigs.k8s.io/yaml" @@ -143,7 +143,7 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * } if options.Copy { - err := assettasks.Copy(updateClusterResults.ImageAssets, updateClusterResults.FileAssets, updateClusterResults.Cluster) + err := assets.Copy(updateClusterResults.ImageAssets, updateClusterResults.FileAssets, updateClusterResults.Cluster) if err != nil { return err } diff --git a/pkg/assets/BUILD.bazel b/pkg/assets/BUILD.bazel index 1d004fd682490..aaca334527dba 100644 --- a/pkg/assets/BUILD.bazel +++ b/pkg/assets/BUILD.bazel @@ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "go_default_library", - srcs = ["builder.go"], + srcs = [ + "builder.go", + "copy.go", + "copyfile.go", + "copyimage.go", + "docker_api.go", + "docker_cli.go", + ], importpath = "k8s.io/kops/pkg/assets", visibility = ["//visibility:public"], deps = [ + "//pkg/acls:go_default_library", "//pkg/apis/kops:go_default_library", "//pkg/apis/kops/util:go_default_library", "//pkg/kubemanifest:go_default_library", @@ -14,6 +22,9 @@ go_library( "//util/pkg/mirrors:go_default_library", "//util/pkg/vfs:go_default_library", "//vendor/github.com/blang/semver/v4:go_default_library", + "//vendor/github.com/docker/docker/api/types:go_default_library", + "//vendor/github.com/docker/docker/api/types/filters:go_default_library", + "//vendor/github.com/docker/docker/client:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], @@ -21,7 +32,10 @@ go_library( go_test( name = "go_default_test", - srcs = ["builder_test.go"], + srcs = [ + "builder_test.go", + "copyfile_test.go", + ], data = glob(["testdata/**"]), embed = [":go_default_library"], deps = [ diff --git a/upup/pkg/fi/assettasks/copy.go b/pkg/assets/copy.go similarity index 95% rename from upup/pkg/fi/assettasks/copy.go rename to pkg/assets/copy.go index a297b2f381b62..7167334065161 100644 --- a/upup/pkg/fi/assettasks/copy.go +++ b/pkg/assets/copy.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "fmt" @@ -22,14 +22,13 @@ import ( "k8s.io/klog/v2" "k8s.io/kops/pkg/apis/kops" - "k8s.io/kops/pkg/assets" ) type assetTask interface { Run() error } -func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, cluster *kops.Cluster) error { +func Copy(imageAssets []*ImageAsset, fileAssets []*FileAsset, cluster *kops.Cluster) error { tasks := map[string]assetTask{} for _, imageAsset := range imageAssets { diff --git a/upup/pkg/fi/assettasks/copyfile.go b/pkg/assets/copyfile.go similarity index 99% rename from upup/pkg/fi/assettasks/copyfile.go rename to pkg/assets/copyfile.go index fe669ddc7f469..9f31db1d1168a 100644 --- a/upup/pkg/fi/assettasks/copyfile.go +++ b/pkg/assets/copyfile.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "bytes" diff --git a/upup/pkg/fi/assettasks/copyfile_test.go b/pkg/assets/copyfile_test.go similarity index 99% rename from upup/pkg/fi/assettasks/copyfile_test.go rename to pkg/assets/copyfile_test.go index 31bf142813c42..1b8033f4c91e9 100644 --- a/upup/pkg/fi/assettasks/copyfile_test.go +++ b/pkg/assets/copyfile_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "testing" diff --git a/upup/pkg/fi/assettasks/copyimage.go b/pkg/assets/copyimage.go similarity index 98% rename from upup/pkg/fi/assettasks/copyimage.go rename to pkg/assets/copyimage.go index cdfe037bae537..5328b778b076b 100644 --- a/upup/pkg/fi/assettasks/copyimage.go +++ b/pkg/assets/copyimage.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "fmt" diff --git a/upup/pkg/fi/assettasks/docker_api.go b/pkg/assets/docker_api.go similarity index 99% rename from upup/pkg/fi/assettasks/docker_api.go rename to pkg/assets/docker_api.go index 09c3a5870c327..20a75fc966db5 100644 --- a/upup/pkg/fi/assettasks/docker_api.go +++ b/pkg/assets/docker_api.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "context" diff --git a/upup/pkg/fi/assettasks/docker_cli.go b/pkg/assets/docker_cli.go similarity index 98% rename from upup/pkg/fi/assettasks/docker_cli.go rename to pkg/assets/docker_cli.go index 5a9b6e50c8f7a..ec52fd59b4996 100644 --- a/upup/pkg/fi/assettasks/docker_cli.go +++ b/pkg/assets/docker_cli.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package assettasks +package assets import ( "fmt" diff --git a/upup/pkg/fi/assettasks/BUILD.bazel b/upup/pkg/fi/assettasks/BUILD.bazel deleted file mode 100644 index 9afa4d2182688..0000000000000 --- a/upup/pkg/fi/assettasks/BUILD.bazel +++ /dev/null @@ -1,31 +0,0 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") - -go_library( - name = "go_default_library", - srcs = [ - "copy.go", - "copyfile.go", - "copyimage.go", - "docker_api.go", - "docker_cli.go", - ], - importpath = "k8s.io/kops/upup/pkg/fi/assettasks", - visibility = ["//visibility:public"], - deps = [ - "//pkg/acls:go_default_library", - "//pkg/apis/kops:go_default_library", - "//pkg/assets:go_default_library", - "//util/pkg/hashing:go_default_library", - "//util/pkg/vfs:go_default_library", - "//vendor/github.com/docker/docker/api/types:go_default_library", - "//vendor/github.com/docker/docker/api/types/filters:go_default_library", - "//vendor/github.com/docker/docker/client:go_default_library", - "//vendor/k8s.io/klog/v2:go_default_library", - ], -) - -go_test( - name = "go_default_test", - srcs = ["copyfile_test.go"], - embed = [":go_default_library"], -)