forked from vmware-archive/atc
/
source_repository.go
153 lines (128 loc) · 4.57 KB
/
source_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package exec
import (
"fmt"
"io"
"strings"
"sync"
"github.com/concourse/atc/worker"
)
// SourceName is just a string, with its own type to make interfaces using it
// more self-documenting.
type SourceName string
// SourceRepository is the mapping from a SourceName to an ArtifactSource.
// Steps will both populate this map with new artifacts (e.g. the resource
// fetched by a Get step), and look up required artifacts (e.g. the inputs
// configured for a Task step).
//
// There is only one SourceRepository for the duration of a build plan's
// execution.
//
// SourceRepository is, itself, an ArtifactSource. As an ArtifactSource it acts
// as the set of all ArtifactSources it contains, as if they were each in
// subdirectories corresponding to their SourceName.
type SourceRepository struct {
repo map[SourceName]ArtifactSource
repoL sync.RWMutex
}
// NewSourceRepository constructs a new repository.
func NewSourceRepository() *SourceRepository {
return &SourceRepository{
repo: make(map[SourceName]ArtifactSource),
}
}
// RegisterSource inserts an ArtifactSource into the map under the given
// SourceName. Producers of artifacts, e.g. the Get step and the Task step,
// will call this after they've successfully produced their artifact(s).
func (repo *SourceRepository) RegisterSource(name SourceName, source ArtifactSource) {
repo.repoL.Lock()
repo.repo[name] = source
repo.repoL.Unlock()
}
// SourceFor looks up an ArtifactSource for the given SourceName. Consumers of
// artifacts, e.g. the Task step, will call this to locate their dependencies.
func (repo *SourceRepository) SourceFor(name SourceName) (ArtifactSource, bool) {
repo.repoL.RLock()
source, found := repo.repo[name]
repo.repoL.RUnlock()
return source, found
}
// StreamTo will stream all currently registered artifacts to the destination.
// This is used by the Put step, which currently does not have an explicit set
// of dependencies, and instead just pulls in everything.
//
// Each ArtifactSource will be streamed to a subdirectory matching its
// SourceName.
func (repo *SourceRepository) StreamTo(dest ArtifactDestination) error {
sources := map[SourceName]ArtifactSource{}
repo.repoL.RLock()
for k, v := range repo.repo {
sources[k] = v
}
repo.repoL.RUnlock()
for name, src := range sources {
err := src.StreamTo(subdirectoryDestination{dest, string(name)})
if err != nil {
return err
}
}
return nil
}
// StreamFile streams a single file out of the repository, using the first path
// segment to determine the ArtifactSource to stream out of. For example,
// StreamFile("a/b.yml") will look up the "a" ArtifactSource and return the
// result of StreamFile("b.yml") on it.
//
// If the ArtifactSource determined by the path is not present,
// FileNotFoundError will be returned.
func (repo *SourceRepository) StreamFile(path string) (io.ReadCloser, error) {
sources := map[SourceName]ArtifactSource{}
repo.repoL.RLock()
for k, v := range repo.repo {
sources[k] = v
}
repo.repoL.RUnlock()
for name, src := range sources {
if strings.HasPrefix(path, string(name)+"/") {
return src.StreamFile(path[len(name)+1:])
}
}
return nil, FileNotFoundError{Path: path}
}
// VolumeOn returns nothing, as it's impossible for there to be a single volume
// representing all ArtifactSources.
func (repo *SourceRepository) VolumeOn(worker worker.Worker) (worker.Volume, bool, error) {
return nil, false, nil
}
// ScopedTo returns a new SourceRepository restricted to the given set of
// SourceNames. This is used by the Put step to stream in the sources that did
// not have a volume available on its destination.
func (repo *SourceRepository) ScopedTo(names ...SourceName) (*SourceRepository, error) {
newRepo := NewSourceRepository()
for _, name := range names {
source, found := repo.SourceFor(name)
if !found {
return nil, fmt.Errorf("source does not exist in repository: %s", name)
}
newRepo.RegisterSource(name, source)
}
return newRepo, nil
}
// AsMap extracts the current contents of the SourceRepository into a new map
// and returns it. Changes to the returned map or the SourceRepository will not
// affect each other.
func (repo *SourceRepository) AsMap() map[SourceName]ArtifactSource {
result := make(map[SourceName]ArtifactSource)
repo.repoL.RLock()
for name, source := range repo.repo {
result[name] = source
}
repo.repoL.RUnlock()
return result
}
type subdirectoryDestination struct {
destination ArtifactDestination
subdirectory string
}
func (dest subdirectoryDestination) StreamIn(dst string, src io.Reader) error {
return dest.destination.StreamIn(dest.subdirectory+"/"+dst, src)
}