forked from vmware-archive/atc
/
artifact_repository.go
163 lines (137 loc) · 4.97 KB
/
artifact_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
package worker
import (
"fmt"
"io"
"strings"
"sync"
)
// ArtifactName is just a string, with its own type to make interfaces using it
// more self-documenting.
type ArtifactName string
// ArtifactRepository is the mapping from a ArtifactName to an ArtifactSource.
// Steps will both populate this map with new artifacts (e.g. the resource
// fetched by a Get step), and look up required artifacts (e.g. the inputs
// configured for a Task step).
//
// There is only one ArtifactRepository for the duration of a build plan's
// execution.
//
// ArtifactRepository is, itself, an ArtifactSource. As an ArtifactSource it acts
// as the set of all ArtifactSources it contains, as if they were each in
// subdirectories corresponding to their ArtifactName.
type ArtifactRepository struct {
repo map[ArtifactName]ArtifactSource
repoL sync.RWMutex
}
// NewArtifactRepository constructs a new repository.
func NewArtifactRepository() *ArtifactRepository {
return &ArtifactRepository{
repo: make(map[ArtifactName]ArtifactSource),
}
}
// RegisterSource inserts an ArtifactSource into the map under the given
// ArtifactName. Producers of artifacts, e.g. the Get step and the Task step,
// will call this after they've successfully produced their artifact(s).
func (repo *ArtifactRepository) RegisterSource(name ArtifactName, source ArtifactSource) {
repo.repoL.Lock()
repo.repo[name] = source
repo.repoL.Unlock()
}
// SourceFor looks up a Source for the given ArtifactName. Consumers of
// artifacts, e.g. the Task step, will call this to locate their dependencies.
func (repo *ArtifactRepository) SourceFor(name ArtifactName) (ArtifactSource, bool) {
repo.repoL.RLock()
source, found := repo.repo[name]
repo.repoL.RUnlock()
return source, found
}
// StreamTo will stream all currently registered artifacts to the destination.
// This is used by the Put step, which currently does not have an explicit set
// of dependencies, and instead just pulls in everything.
//
// Each ArtifactSource will be streamed to a subdirectory matching its
// ArtifactName.
func (repo *ArtifactRepository) StreamTo(dest ArtifactDestination) error {
sources := map[ArtifactName]ArtifactSource{}
repo.repoL.RLock()
for k, v := range repo.repo {
sources[k] = v
}
repo.repoL.RUnlock()
for name, src := range sources {
err := src.StreamTo(subdirectoryDestination{dest, string(name)})
if err != nil {
return err
}
}
return nil
}
// StreamFile streams a single file out of the repository, using the first path
// segment to determine the ArtifactSource to stream out of. For example,
// StreamFile("a/b.yml") will look up the "a" ArtifactSource and return the
// result of StreamFile("b.yml") on it.
//
// If the ArtifactSource determined by the path is not present,
// FileNotFoundError will be returned.
func (repo *ArtifactRepository) StreamFile(path string) (io.ReadCloser, error) {
sources := map[ArtifactName]ArtifactSource{}
repo.repoL.RLock()
for k, v := range repo.repo {
sources[k] = v
}
repo.repoL.RUnlock()
for name, src := range sources {
if strings.HasPrefix(path, string(name)+"/") {
return src.StreamFile(path[len(name)+1:])
}
}
return nil, FileNotFoundError{Path: path}
}
// VolumeOn returns nothing, as it's impossible for there to be a single volume
// representing all ArtifactSources.
func (repo *ArtifactRepository) VolumeOn(worker Worker) (Volume, bool, error) {
return nil, false, nil
}
// ScopedTo returns a new ArtifactRepository restricted to the given set of
// ArtifactNames. This is used by the Put step to stream in the sources that did
// not have a volume available on its destination.
func (repo *ArtifactRepository) ScopedTo(names ...ArtifactName) (*ArtifactRepository, error) {
newRepo := NewArtifactRepository()
for _, name := range names {
source, found := repo.SourceFor(name)
if !found {
return nil, fmt.Errorf("source does not exist in repository: %s", name)
}
newRepo.RegisterSource(name, source)
}
return newRepo, nil
}
// AsMap extracts the current contents of the ArtifactRepository into a new map
// and returns it. Changes to the returned map or the ArtifactRepository will not
// affect each other.
func (repo *ArtifactRepository) AsMap() map[ArtifactName]ArtifactSource {
result := make(map[ArtifactName]ArtifactSource)
repo.repoL.RLock()
for name, source := range repo.repo {
result[name] = source
}
repo.repoL.RUnlock()
return result
}
type subdirectoryDestination struct {
destination ArtifactDestination
subdirectory string
}
func (dest subdirectoryDestination) StreamIn(dst string, src io.Reader) error {
return dest.destination.StreamIn(dest.subdirectory+"/"+dst, src)
}
// FileNotFoundError is the error to return from StreamFile when the given path
// does not exist.
type FileNotFoundError struct {
Path string
}
// Error prints a helpful message including the file path. The user will see
// this message if e.g. their task config path does not exist.
func (err FileNotFoundError) Error() string {
return fmt.Sprintf("file not found: %s", err.Path)
}