Skip to content

Commit

Permalink
Add local output mode to pod utilities.
Browse files Browse the repository at this point in the history
  • Loading branch information
cjwagner committed Jul 24, 2019
1 parent 53b6b0a commit dbfce42
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 77 deletions.
19 changes: 13 additions & 6 deletions prow/cmd/build/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,9 +708,11 @@ func decorateSteps(steps []coreapi.Container, dc prowjobv1.DecorationConfig, too

// injectedSteps returns initial containers, a final container and an additional volume.
func injectedSteps(encodedJobSpec string, dc prowjobv1.DecorationConfig, injectedSource bool, toolsMount coreapi.VolumeMount, entries []wrapper.Options) ([]coreapi.Container, *coreapi.Container, *coreapi.Volume, error) {
gcsVol, gcsMount, gcsOptions := decorate.GCSOptions(dc)
// localMode parameter is false and outputMount is nil because we don't have a local decoration
// mode for "agent: build" jobs yet. (We need a mkpod equivalent for builds first.)
gcsVol, gcsMount, gcsOptions := decorate.GCSOptions(dc, false)

sidecar, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, encodedJobSpec, decorate.RequirePassingEntries, entries...)
sidecar, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, nil, encodedJobSpec, decorate.RequirePassingEntries, entries...)
if err != nil {
return nil, nil, nil, fmt.Errorf("inject sidecar: %v", err)
}
Expand All @@ -719,14 +721,14 @@ func injectedSteps(encodedJobSpec string, dc prowjobv1.DecorationConfig, injecte
if injectedSource {
cloneLogMount = &logMount
}
initUpload, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, cloneLogMount, encodedJobSpec)
initUpload, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, cloneLogMount, nil, encodedJobSpec)
if err != nil {
return nil, nil, nil, fmt.Errorf("inject initupload: %v", err)
}

placer := decorate.PlaceEntrypoint(dc.UtilityImages.Entrypoint, toolsMount)

return []coreapi.Container{placer, *initUpload}, sidecar, &gcsVol, nil
return []coreapi.Container{placer, *initUpload}, sidecar, gcsVol, nil
}

// determineTimeout decides the timeout value used for build
Expand All @@ -753,14 +755,19 @@ func decorateBuild(spec *buildv1alpha1.BuildSpec, encodedJobSpec string, dc prow
return fmt.Errorf("decorate steps: %v", err)
}

befores, after, vol, err := injectedSteps(encodedJobSpec, dc, injectedSource, toolsMount, entries)
befores, after, gcsVol, err := injectedSteps(encodedJobSpec, dc, injectedSource, toolsMount, entries)
if err != nil {
return fmt.Errorf("add injected steps: %v", err)
}

spec.Steps = append(befores, spec.Steps...)
spec.Steps = append(spec.Steps, *after)
spec.Volumes = append(spec.Volumes, toolsVolume, *vol)
spec.Volumes = append(spec.Volumes, toolsVolume)
if gcsVol != nil {
// This check isn't strictly necessary until/unless we add a local mode for build jobs.
// /shrug
spec.Volumes = append(spec.Volumes, *gcsVol)
}
return nil
}

Expand Down
20 changes: 10 additions & 10 deletions prow/cmd/build/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ func TestInjectedSteps(t *testing.T) {
dc := prowjobv1.DecorationConfig{
UtilityImages: &prowjobv1.UtilityImages{},
}
gcsVol, gcsMount, gcsOptions := decorate.GCSOptions(dc)
gcsVol, gcsMount, gcsOptions := decorate.GCSOptions(dc, false)
_, tm := tools()

cases := []struct {
Expand All @@ -1630,31 +1630,31 @@ func TestInjectedSteps(t *testing.T) {
name: "add logMount to init upload when using source",
src: true,
expected: func(entries []wrapper.Options) ([]corev1.Container, *corev1.Container, *corev1.Volume, error) {
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, &logMount, ejs)
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, &logMount, nil, ejs)
if err != nil {
t.Fatalf("failed to create init upload: %v", err)
}
before := []corev1.Container{decorate.PlaceEntrypoint(dc.UtilityImages.Entrypoint, tm), *iu}
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, ejs, decorate.RequirePassingEntries, entries...)
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, nil, ejs, decorate.RequirePassingEntries, entries...)
if err != nil {
t.Fatalf("failed to create sidecar: %v", err)
}
return before, after, &gcsVol, nil
return before, after, gcsVol, nil
},
},
{
name: "do not add logMount to init upload when not using source",
expected: func(entries []wrapper.Options) ([]corev1.Container, *corev1.Container, *corev1.Volume, error) {
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, nil, ejs)
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, nil, nil, ejs)
if err != nil {
t.Fatalf("failed to create init upload: %v", err)
}
before := []corev1.Container{decorate.PlaceEntrypoint(dc.UtilityImages.Entrypoint, tm), *iu}
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, ejs, decorate.RequirePassingEntries, entries...)
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, nil, ejs, decorate.RequirePassingEntries, entries...)
if err != nil {
t.Fatalf("failed to create sidecar: %v", err)
}
return before, after, &gcsVol, nil
return before, after, gcsVol, nil
},
},
{
Expand All @@ -1674,16 +1674,16 @@ func TestInjectedSteps(t *testing.T) {
},
},
expected: func(entries []wrapper.Options) ([]corev1.Container, *corev1.Container, *corev1.Volume, error) {
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, nil, ejs)
iu, err := decorate.InitUpload(dc.UtilityImages.InitUpload, gcsOptions, gcsMount, nil, nil, ejs)
if err != nil {
t.Fatalf("failed to create init upload: %v", err)
}
before := []corev1.Container{decorate.PlaceEntrypoint(dc.UtilityImages.Entrypoint, tm), *iu}
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, ejs, decorate.RequirePassingEntries, entries...)
after, err := decorate.Sidecar(dc.UtilityImages.Sidecar, gcsOptions, gcsMount, logMount, nil, ejs, decorate.RequirePassingEntries, entries...)
if err != nil {
t.Fatalf("failed to create sidecar: %v", err)
}
return before, after, &gcsVol, nil
return before, after, gcsVol, nil
},
},
}
Expand Down
9 changes: 9 additions & 0 deletions prow/gcsupload/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,18 @@ type Options struct {
// paths that are parsed to get more granular
// fields.
gcsPath gcs.Path

// LocalOutputDir specifies a directory where files should be copied INSTEAD of uploading to GCS.
// This option is useful for testing jobs that use the pod-utilities without actually uploading.
LocalOutputDir string `json:"local_output_dir,omitempty"`
}

// Validate ensures that the set of options are
// self-consistent and valid.
func (o *Options) Validate() error {
if o.LocalOutputDir != "" {
return nil
}
if o.gcsPath.String() != "" {
o.Bucket = o.gcsPath.Bucket()
o.PathPrefix = o.gcsPath.Object()
Expand Down Expand Up @@ -127,6 +134,8 @@ func (o *Options) AddFlags(fs *flag.FlagSet) {
fs.BoolVar(&o.DryRun, "dry-run", true, "do not interact with GCS")

fs.Var(&o.mediaTypes, "media-type", "Optional comma-delimited set of extension media types. Each entry is colon-delimited {extension}:{media-type}, for example, log:text/plain.")

fs.StringVar(&o.LocalOutputDir, "local-output-dir", "", "If specified, files are copied to this dir instead of uploading to GCS.")
}

const (
Expand Down
53 changes: 33 additions & 20 deletions prow/gcsupload/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,23 +46,29 @@ func (o Options) Run(spec *downwardapi.JobSpec, extra map[string]gcs.UploadFunc)

uploadTargets := o.assembleTargets(spec, extra)

if !o.DryRun {
ctx := context.Background()
gcsClient, err := storage.NewClient(ctx, option.WithCredentialsFile(o.GcsCredentialsFile))
if o.DryRun {
for destination := range uploadTargets {
logrus.WithField("dest", destination).Info("Would upload")
}
return nil
}

if o.LocalOutputDir == "" {
gcsClient, err := storage.NewClient(context.Background(), option.WithCredentialsFile(o.GcsCredentialsFile))
if err != nil {
return fmt.Errorf("could not connect to GCS: %v", err)
}

if err := gcs.Upload(gcsClient.Bucket(o.Bucket), uploadTargets); err != nil {
return fmt.Errorf("failed to upload to GCS: %v", err)
}
logrus.Info("Finished upload to GCS")
} else {
for destination := range uploadTargets {
logrus.WithField("dest", destination).Info("Would upload")
if err := gcs.LocalExport(o.LocalOutputDir, uploadTargets); err != nil {
return fmt.Errorf("failed to copy files to %q: %v", o.LocalOutputDir, err)
}
logrus.Infof("Finished copying files to %q.", o.LocalOutputDir)
}

logrus.Info("Finished upload to GCS")
return nil
}

Expand All @@ -71,21 +77,28 @@ func (o Options) assembleTargets(spec *downwardapi.JobSpec, extra map[string]gcs

uploadTargets := map[string]gcs.UploadFunc{}

// ensure that an alias exists for any
// job we're uploading artifacts for
if alias := gcs.AliasForSpec(spec); alias != "" {
fullBasePath := "gs://" + path.Join(o.Bucket, jobBasePath)
uploadTargets[alias] = gcs.DataUploadWithMetadata(strings.NewReader(fullBasePath), map[string]string{
"x-goog-meta-link": fullBasePath,
})
}
// Skip the alias and latest build files in local mode.
if o.LocalOutputDir == "" {
// ensure that an alias exists for any
// job we're uploading artifacts for
if alias := gcs.AliasForSpec(spec); alias != "" {
fullBasePath := "gs://" + path.Join(o.Bucket, jobBasePath)
uploadTargets[alias] = gcs.DataUploadWithMetadata(strings.NewReader(fullBasePath), map[string]string{
"x-goog-meta-link": fullBasePath,
})
}

if latestBuilds := gcs.LatestBuildForSpec(spec, builder); len(latestBuilds) > 0 {
for _, latestBuild := range latestBuilds {
dir, filename := path.Split(latestBuild)
metadataFromFileName, attrs := gcs.AttributesFromFileName(filename)
uploadTargets[path.Join(dir, metadataFromFileName)] = gcs.DataUploadWithAttributes(strings.NewReader(spec.BuildID), attrs)
if latestBuilds := gcs.LatestBuildForSpec(spec, builder); len(latestBuilds) > 0 {
for _, latestBuild := range latestBuilds {
dir, filename := path.Split(latestBuild)
metadataFromFileName, attrs := gcs.AttributesFromFileName(filename)
uploadTargets[path.Join(dir, metadataFromFileName)] = gcs.DataUploadWithAttributes(strings.NewReader(spec.BuildID), attrs)
}
}
} else {
// Remove the gcs path prefix in local mode so that items are rooted in the output dir without
// excessive directory nesting.
gcsPath = ""
}

for _, item := range o.Items {
Expand Down
Loading

0 comments on commit dbfce42

Please sign in to comment.