forked from distribution/distribution
-
Notifications
You must be signed in to change notification settings - Fork 0
/
push.go
137 lines (118 loc) · 3.43 KB
/
push.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package client
import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/distribution/manifest"
)
// simultaneousLayerPushWindow is the size of the parallel layer push window.
// A layer may not be pushed until the layer preceeding it by the length of the
// push window has been successfully pushed.
const simultaneousLayerPushWindow = 4
type pushFunction func(fsLayer manifest.FSLayer) error
// Push implements a client push workflow for the image defined by the given
// name and tag pair, using the given ObjectStore for local manifest and layer
// storage
func Push(c Client, objectStore ObjectStore, name, tag string) error {
manifest, err := objectStore.Manifest(name, tag)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"name": name,
"tag": tag,
}).Info("No image found")
return err
}
errChans := make([]chan error, len(manifest.FSLayers))
for i := range manifest.FSLayers {
errChans[i] = make(chan error)
}
cancelCh := make(chan struct{})
// Iterate over each layer in the manifest, simultaneously pushing no more
// than simultaneousLayerPushWindow layers at a time. If an error is
// received from a layer push, we abort the push.
for i := 0; i < len(manifest.FSLayers)+simultaneousLayerPushWindow; i++ {
dependentLayer := i - simultaneousLayerPushWindow
if dependentLayer >= 0 {
err := <-errChans[dependentLayer]
if err != nil {
log.WithField("error", err).Warn("Push aborted")
close(cancelCh)
return err
}
}
if i < len(manifest.FSLayers) {
go func(i int) {
select {
case errChans[i] <- pushLayer(c, objectStore, name, manifest.FSLayers[i]):
case <-cancelCh: // recv broadcast notification about cancelation
}
}(i)
}
}
err = c.PutImageManifest(name, tag, manifest)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"manifest": manifest,
}).Warn("Unable to upload manifest")
return err
}
return nil
}
func pushLayer(c Client, objectStore ObjectStore, name string, fsLayer manifest.FSLayer) error {
log.WithField("layer", fsLayer).Info("Pushing layer")
layer, err := objectStore.Layer(fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
layerReader, err := layer.Reader()
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to read local layer")
return err
}
defer layerReader.Close()
if layerReader.CurrentSize() != layerReader.Size() {
log.WithFields(log.Fields{
"layer": fsLayer,
"currentSize": layerReader.CurrentSize(),
"size": layerReader.Size(),
}).Warn("Local layer incomplete")
return fmt.Errorf("Local layer incomplete")
}
length, err := c.BlobLength(name, fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to check existence of remote layer")
return err
}
if length >= 0 {
log.WithField("layer", fsLayer).Info("Layer already exists")
return nil
}
location, err := c.InitiateBlobUpload(name)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
err = c.UploadBlob(location, layerReader, int(layerReader.CurrentSize()), fsLayer.BlobSum)
if err != nil {
log.WithFields(log.Fields{
"error": err,
"layer": fsLayer,
}).Warn("Unable to upload layer")
return err
}
return nil
}