-
Notifications
You must be signed in to change notification settings - Fork 18
/
utils.go
119 lines (101 loc) · 2.85 KB
/
utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
// SPDX-FileCopyrightText: 2022 SAP SE or an SAP affiliate company and Open Component Model contributors.
//
// SPDX-License-Identifier: Apache-2.0
package ocireg
import (
"context"
"fmt"
"io"
"sync"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
"github.com/containerd/log"
"github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
"github.com/open-component-model/ocm/pkg/blobaccess"
"github.com/open-component-model/ocm/pkg/common/accessio"
"github.com/open-component-model/ocm/pkg/contexts/oci/artdesc"
"github.com/open-component-model/ocm/pkg/contexts/oci/cpi"
"github.com/open-component-model/ocm/pkg/docker/resolve"
"github.com/open-component-model/ocm/pkg/logging"
)
// TODO: add cache
type dataAccess struct {
accessio.NopCloser
lock sync.Mutex
fetcher remotes.Fetcher
desc artdesc.Descriptor
reader io.ReadCloser
}
var _ cpi.DataAccess = (*dataAccess)(nil)
func NewDataAccess(fetcher remotes.Fetcher, digest digest.Digest, mimeType string, delayed bool) (*dataAccess, error) {
var reader io.ReadCloser
var err error
desc := artdesc.Descriptor{
MediaType: mimeType,
Digest: digest,
Size: accessio.BLOB_UNKNOWN_SIZE,
}
if !delayed {
reader, err = fetcher.Fetch(dummyContext, desc)
if err != nil {
return nil, err
}
}
return &dataAccess{
fetcher: fetcher,
desc: desc,
reader: reader,
}, nil
}
func (d *dataAccess) Get() ([]byte, error) {
return readAll(d.Reader())
}
func (d *dataAccess) Reader() (io.ReadCloser, error) {
d.lock.Lock()
reader := d.reader
d.reader = nil
d.lock.Unlock()
if reader != nil {
return reader, nil
}
return d.fetcher.Fetch(dummyContext, d.desc)
}
func readAll(reader io.ReadCloser, err error) ([]byte, error) {
if err != nil {
return nil, err
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return nil, err
}
return data, nil
}
func push(ctx context.Context, p resolve.Pusher, blob blobaccess.BlobAccess) error {
desc := *artdesc.DefaultBlobDescriptor(blob)
return pushData(ctx, p, desc, blob)
}
func pushData(ctx context.Context, p resolve.Pusher, desc artdesc.Descriptor, data blobaccess.DataAccess) error {
key := remotes.MakeRefKey(ctx, desc)
if desc.Size == 0 {
desc.Size = -1
}
logging.Logger().Debug("*** push blob", "mediatype", desc.MediaType, "digest", desc.Digest, "key", key)
req, err := p.Push(ctx, desc, data)
if err != nil {
if errdefs.IsAlreadyExists(err) {
logging.Logger().Debug("blob already exists", "mediatype", desc.MediaType, "digest", desc.Digest)
return nil
}
return fmt.Errorf("failed to push: %w", err)
}
return req.Commit(ctx, desc.Size, desc.Digest)
}
var dummyContext = nologger()
func nologger() context.Context {
ctx := context.Background()
logger := logrus.New()
logger.Level = logrus.ErrorLevel
return log.WithLogger(ctx, logrus.NewEntry(logger))
}