-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
input_resources.go
146 lines (132 loc) · 5.46 KB
/
input_resources.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
/*
Copyright 2019 The Tekton Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package resources
import (
"context"
"fmt"
"path/filepath"
"github.com/tektoncd/pipeline/pkg/apis/pipeline"
"github.com/tektoncd/pipeline/pkg/apis/pipeline/v1beta1"
"github.com/tektoncd/pipeline/pkg/apis/resource/v1alpha1/storage"
"github.com/tektoncd/pipeline/pkg/artifacts"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
)
func getBoundResource(resourceName string, boundResources []v1beta1.TaskResourceBinding) (*v1beta1.TaskResourceBinding, error) {
for _, br := range boundResources {
if br.Name == resourceName {
return &br, nil
}
}
return nil, fmt.Errorf("couldnt find resource named %q in bound resources %v", resourceName, boundResources)
}
// AddInputResource reads the inputs resources and adds the corresponding container steps
// This function reads the `paths` to check if resource copies needs to be fetched from previous tasks output(from PVC)
// 1. If resource has paths declared then serially copies the resource from previous task output paths into current resource destination.
// 2. If resource has custom destination directory using targetPath then that directory is created and resource is fetched / copied
// from previous task
// 3. If resource has paths declared then fresh copy of resource is not fetched
func AddInputResource(
ctx context.Context,
kubeclient kubernetes.Interface,
images pipeline.Images,
taskName string,
taskSpec *v1beta1.TaskSpec,
taskRun *v1beta1.TaskRun,
inputResources map[string]v1beta1.PipelineResourceInterface,
) (*v1beta1.TaskSpec, error) {
if taskSpec == nil || taskSpec.Resources == nil || taskSpec.Resources.Inputs == nil {
return taskSpec, nil
}
taskSpec = taskSpec.DeepCopy()
pvcName := taskRun.GetPipelineRunPVCName()
mountPVC := false
mountSecrets := false
prNameFromLabel := taskRun.Labels[pipeline.PipelineRunLabelKey]
if prNameFromLabel == "" {
prNameFromLabel = pvcName
}
as := artifacts.GetArtifactStorage(ctx, images, prNameFromLabel, kubeclient)
// Iterate in reverse through the list, each element prepends but we want the first one to remain first.
for i := len(taskSpec.Resources.Inputs) - 1; i >= 0; i-- {
input := taskSpec.Resources.Inputs[i]
if taskRun.Spec.Resources == nil {
if input.Optional {
continue
}
return nil, fmt.Errorf("couldnt find resource named %q, no bounded resources", input.Name)
}
boundResource, err := getBoundResource(input.Name, taskRun.Spec.Resources.Inputs)
// Continue if the declared resource is optional and not specified in TaskRun
// boundResource is nil if the declared resource in Task does not have any resource specified in the TaskRun
if input.Optional && boundResource == nil {
continue
} else if err != nil {
// throw an error for required resources, if not specified in the TaskRun
return nil, fmt.Errorf("failed to get bound resource: %w", err)
}
resource, ok := inputResources[boundResource.Name]
if !ok || resource == nil {
return nil, fmt.Errorf("failed to Get Pipeline Resource for task %s with boundResource %v", taskName, boundResource)
}
var copyStepsFromPrevTasks []v1beta1.Step
dPath := destinationPath(input.Name, input.TargetPath)
// if taskrun is fetching resource from previous task then execute copy step instead of fetching new copy
// to the desired destination directory, as long as the resource exports output to be copied
if v1beta1.AllowedOutputResources[resource.GetType()] && taskRun.HasPipelineRunOwnerReference() {
for _, path := range boundResource.Paths {
cpSteps := as.GetCopyFromStorageToSteps(boundResource.Name, path, dPath)
if as.GetType() == pipeline.ArtifactStoragePVCType {
mountPVC = true
for _, s := range cpSteps {
s.VolumeMounts = []corev1.VolumeMount{storage.GetPvcMount(pvcName)}
copyStepsFromPrevTasks = append(copyStepsFromPrevTasks,
storage.CreateDirStep(images.ShellImage, boundResource.Name, dPath),
s)
}
} else {
// bucket
copyStepsFromPrevTasks = append(copyStepsFromPrevTasks, cpSteps...)
}
}
}
// source is copied from previous task so skip fetching download container definition
if len(copyStepsFromPrevTasks) > 0 {
taskSpec.Steps = append(copyStepsFromPrevTasks, taskSpec.Steps...)
mountSecrets = true
} else {
// Allow the resource to mutate the task.
modifier, err := resource.GetInputTaskModifier(taskSpec, dPath)
if err != nil {
return nil, err
}
if err := v1beta1.ApplyTaskModifier(taskSpec, modifier); err != nil {
return nil, fmt.Errorf("unabled to apply Resource %s: %w", boundResource.Name, err)
}
}
}
if mountPVC {
taskSpec.Volumes = append(taskSpec.Volumes, GetPVCVolume(pvcName))
}
if mountSecrets {
taskSpec.Volumes = appendNewSecretsVolumes(taskSpec.Volumes, as.GetSecretsVolumes()...)
}
return taskSpec, nil
}
const workspaceDir = "/workspace"
func destinationPath(name, path string) string {
if path == "" {
return filepath.Join(workspaceDir, name)
}
return filepath.Join(workspaceDir, path)
}