-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
155 lines (135 loc) · 4.26 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package images
import (
"context"
"sync"
"github.com/containerd/containerd/content"
c8derrdefs "github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/leases"
"github.com/containerd/containerd/log"
"github.com/containerd/containerd/namespaces"
"github.com/docker/docker/distribution"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
func imageKey(dgst digest.Digest) string {
return "moby-image-" + dgst.String()
}
// imageStoreWithLease wraps the configured image store with one that deletes the lease
// reigstered for a given image ID, if one exists
//
// This is used by the main image service to wrap delete calls to the real image store.
type imageStoreWithLease struct {
image.Store
leases leases.Manager
// Normally we'd pass namespace down through a context.Context, however...
// The interface for image store doesn't allow this, so we store it here.
ns string
}
func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) {
ctx := namespaces.WithNamespace(context.TODO(), s.ns)
if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) {
return nil, errors.Wrap(err, "error deleting lease")
}
return s.Store.Delete(id)
}
// iamgeStoreForPull is created for each pull It wraps an underlying image store
// to handle registering leases for content fetched in a single image pull.
type imageStoreForPull struct {
distribution.ImageConfigStore
leases leases.Manager
ingested *contentStoreForPull
}
func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) {
id, err := s.ImageConfigStore.Put(ctx, config)
if err != nil {
return "", err
}
return id, s.updateLease(ctx, id)
}
func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) {
id, err := s.ImageConfigStore.Get(ctx, dgst)
if err != nil {
return nil, err
}
return id, s.updateLease(ctx, dgst)
}
func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error {
leaseID := imageKey(dgst)
lease, err := s.leases.Create(ctx, leases.WithID(leaseID))
if err != nil {
if !c8derrdefs.IsAlreadyExists(err) {
return errors.Wrap(err, "error creating lease")
}
lease = leases.Lease{ID: leaseID}
}
digested := s.ingested.getDigested()
resource := leases.Resource{
Type: "content",
}
for _, dgst := range digested {
log.G(ctx).WithFields(logrus.Fields{
"digest": dgst,
"lease": lease.ID,
}).Debug("Adding content digest to lease")
resource.ID = dgst.String()
if err := s.leases.AddResource(ctx, lease, resource); err != nil {
return errors.Wrapf(err, "error adding content digest to lease: %s", dgst)
}
}
return nil
}
// contentStoreForPull is used to wrap the configured content store to
// add lease management for a single `pull`
// It stores all committed digests so that `imageStoreForPull` can add
// the digsted resources to the lease for an image.
type contentStoreForPull struct {
distribution.ContentStore
leases leases.Manager
mu sync.Mutex
digested []digest.Digest
}
func (c *contentStoreForPull) addDigested(dgst digest.Digest) {
c.mu.Lock()
c.digested = append(c.digested, dgst)
c.mu.Unlock()
}
func (c *contentStoreForPull) getDigested() []digest.Digest {
c.mu.Lock()
digested := make([]digest.Digest, len(c.digested))
copy(digested, c.digested)
c.mu.Unlock()
return digested
}
func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {
w, err := c.ContentStore.Writer(ctx, opts...)
if err != nil {
if c8derrdefs.IsAlreadyExists(err) {
var cfg content.WriterOpts
for _, o := range opts {
if err := o(&cfg); err != nil {
return nil, err
}
}
c.addDigested(cfg.Desc.Digest)
}
return nil, err
}
return &contentWriter{
cs: c,
Writer: w,
}, nil
}
type contentWriter struct {
cs *contentStoreForPull
content.Writer
}
func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := w.Writer.Commit(ctx, size, expected, opts...)
if err == nil || c8derrdefs.IsAlreadyExists(err) {
w.cs.addDigested(expected)
}
return err
}