Skip to content

Commit

Permalink
Add a new MutableResult type to solver/result package that implements
Browse files Browse the repository at this point in the history
thread-safe Result.
Address review comments:
 * Re-use the input variable in exporters to avoid accidental re-use of
   stale values
  • Loading branch information
a-palchikov committed Aug 2, 2023
1 parent 079a81f commit 5555bd5
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 69 deletions.
14 changes: 6 additions & 8 deletions exporter/containerimage/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,21 +187,19 @@ func (e *imageExporterInstance) Config() *exporter.Config {
return exporter.NewConfigWithCompression(e.opts.RefCfg.Compression)
}

func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
return e.ExportImage(ctx, src, exptypes.InlineCache{}, sessionID)
}

func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.Source, inlineCache exptypes.InlineCache, sessionID string) (_ map[string]string, descref exporter.DescriptorReference, err error) {
func (e *imageExporterInstance) ExportImage(ctx context.Context, src exporter.Source, inlineCache exptypes.InlineCache, sessionID string) (_ map[string]string, descref exporter.DescriptorReference, err error) {
meta := make(map[string][]byte)
for k, v := range src.Metadata {
meta[k] = v
}
for k, v := range e.meta {
meta[k] = v
}
inp := *src
inp.Metadata = meta
src = &inp
src.Metadata = meta

opts := e.opts
as, _, err := ParseAnnotations(meta)
Expand All @@ -221,7 +219,7 @@ func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.S
}
}()

desc, err := e.opt.ImageWriter.Commit(ctx, src, sessionID, &opts)
desc, err := e.opt.ImageWriter.Commit(ctx, &src, sessionID, &opts)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -283,7 +281,7 @@ func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.S
tagDone(nil)

if e.unpack {
if err := e.unpackImage(ctx, img, src, session.NewGroup(sessionID)); err != nil {
if err := e.unpackImage(ctx, img, &src, session.NewGroup(sessionID)); err != nil {
return nil, nil, err
}
}
Expand Down Expand Up @@ -319,7 +317,7 @@ func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.S
}
}
if e.push {
err := e.pushImage(ctx, src, sessionID, targetName, desc.Digest)
err := e.pushImage(ctx, &src, sessionID, targetName, desc.Digest)
if err != nil {
return nil, nil, errors.Wrapf(err, "failed to push %v", targetName)
}
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type Exporter interface {
type ExporterInstance interface {
Name() string
Config() *Config
Export(ctx context.Context, src *Source, sessionID string) (map[string]string, DescriptorReference, error)
Export(ctx context.Context, src Source, sessionID string) (map[string]string, DescriptorReference, error)
}

type DescriptorReference interface {
Expand Down
4 changes: 2 additions & 2 deletions exporter/local/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (e *localExporter) Config() *exporter.Config {
return exporter.NewConfig()
}

func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

if e.opts.Epoch == nil {
if tm, ok, err := epoch.ParseSource(inp); err != nil {
if tm, ok, err := epoch.ParseSource(&inp); err != nil {
return nil, nil, err
} else if ok {
e.opts.Epoch = tm
Expand Down
10 changes: 4 additions & 6 deletions exporter/oci/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ func (e *imageExporterInstance) Config() *exporter.Config {
return exporter.NewConfigWithCompression(e.opts.RefCfg.Compression)
}

func (e *imageExporterInstance) Export(ctx context.Context, src *exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
func (e *imageExporterInstance) Export(ctx context.Context, src exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
return e.ExportImage(ctx, src, exptypes.InlineCache{}, sessionID)
}

func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.Source, inlineCache exptypes.InlineCache, sessionID string) (_ map[string]string, descref exporter.DescriptorReference, err error) {
func (e *imageExporterInstance) ExportImage(ctx context.Context, src exporter.Source, inlineCache exptypes.InlineCache, sessionID string) (_ map[string]string, descref exporter.DescriptorReference, err error) {
if e.opt.Variant == VariantDocker && len(src.Refs) > 0 {
return nil, nil, errors.Errorf("docker exporter does not currently support exporting manifest lists")
}
Expand All @@ -129,9 +129,7 @@ func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.S
for k, v := range e.meta {
meta[k] = v
}
inp := *src
inp.Metadata = meta
src = &inp
src.Metadata = meta

opts := e.opts
as, _, err := containerimage.ParseAnnotations(meta)
Expand All @@ -151,7 +149,7 @@ func (e *imageExporterInstance) ExportImage(ctx context.Context, src *exporter.S
}
}()

desc, err := e.opt.ImageWriter.Commit(ctx, src, sessionID, &opts)
desc, err := e.opt.ImageWriter.Commit(ctx, &src, sessionID, &opts)
if err != nil {
return nil, nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/tar/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (e *localExporterInstance) Config() *exporter.Config {
return exporter.NewConfig()
}

func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, exporter.DescriptorReference, error) {
var defers []func() error

defer func() {
Expand All @@ -68,7 +68,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp *exporter.Source
}()

if e.opts.Epoch == nil {
if tm, ok, err := epoch.ParseSource(inp); err != nil {
if tm, ok, err := epoch.ParseSource(&inp); err != nil {
return nil, nil, err
} else if ok {
e.opts.Epoch = tm
Expand Down
36 changes: 6 additions & 30 deletions frontend/dockerui/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"sync"

"github.com/containerd/containerd/platforms"
"github.com/moby/buildkit/exporter/containerimage/exptypes"
Expand All @@ -18,7 +17,7 @@ import (
type BuildFunc func(ctx context.Context, platform *ocispecs.Platform, idx int) (client.Reference, *image.Image, error)

func (bc *Client) Build(ctx context.Context, fn BuildFunc) (*ResultBuilder, error) {
res := &mutableResult{res: client.NewResult()}
res := client.NewMutableResult()

targets := make([]*ocispecs.Platform, 0, len(bc.TargetPlatforms))
for _, p := range bc.TargetPlatforms {
Expand Down Expand Up @@ -66,11 +65,11 @@ func (bc *Client) Build(ctx context.Context, fn BuildFunc) (*ResultBuilder, erro
k := platforms.Format(p)

if bc.MultiPlatformRequested {
res.addRef(k, ref)
res.addMeta(fmt.Sprintf("%s/%s", exptypes.ExporterImageConfigKey, k), config)
res.AddRef(k, ref)
res.AddMeta(fmt.Sprintf("%s/%s", exptypes.ExporterImageConfigKey, k), config)
} else {
res.setRef(ref)
res.addMeta(exptypes.ExporterImageConfigKey, config)
res.SetRef(ref)
res.AddMeta(exptypes.ExporterImageConfigKey, config)
}
expPlatforms.Platforms[i] = exptypes.Platform{
ID: k,
Expand All @@ -83,7 +82,7 @@ func (bc *Client) Build(ctx context.Context, fn BuildFunc) (*ResultBuilder, erro
return nil, err
}
return &ResultBuilder{
Result: res.res,
Result: res.Result(),
expPlatforms: expPlatforms,
}, nil
}
Expand Down Expand Up @@ -113,26 +112,3 @@ func (rb *ResultBuilder) EachPlatform(ctx context.Context, fn func(ctx context.C
}
return eg.Wait()
}

func (r *mutableResult) addMeta(k string, v []byte) {
r.mu.Lock()
r.res.AddMeta(k, v)
r.mu.Unlock()
}

func (r *mutableResult) addRef(k string, ref client.Reference) {
r.mu.Lock()
r.res.AddRef(k, ref)
r.mu.Unlock()
}

func (r *mutableResult) setRef(ref client.Reference) {
r.mu.Lock()
r.res.SetRef(ref)
r.mu.Unlock()
}

type mutableResult struct {
mu sync.Mutex
res *client.Result
}
5 changes: 5 additions & 0 deletions frontend/gateway/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
)

type Result = result.Result[Reference]
type MutableResult = result.MutableResult[Reference]

type Attestation = result.Attestation[Reference]

Expand All @@ -25,6 +26,10 @@ func NewResult() *Result {
return &Result{}
}

func NewMutableResult() *MutableResult {
return &MutableResult{}
}

type Client interface {
Solve(ctx context.Context, req SolveRequest) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (string, digest.Digest, []byte, error)
Expand Down
37 changes: 17 additions & 20 deletions solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -552,12 +552,12 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro

cacheExporters, inlineCacheExporter := splitCacheExporters(exp.CacheExporters)
var exporterResponse map[string]string
exporterResponse, descref, err = s.runExporters(ctx, exp.Exporters, inlineCacheExporter, j, cached, inp)
exporterResponse, descref, err = s.runExporters(ctx, exp.Exporters, inlineCacheExporter, j, *cached, *inp)
if err != nil {
return nil, err
}

cacheExporterResponse, err := runCacheExporters(ctx, cacheExporters, j, cached, inp)
cacheExporterResponse, err := runCacheExporters(ctx, cacheExporters, j, *cached, *inp)
if err != nil {
return nil, err
}
Expand All @@ -582,7 +582,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
}, nil
}

func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult], inp *result.Result[cache.ImmutableRef]) (map[string]string, error) {
func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *solver.Job, cached result.Result[solver.CachedResult], inp result.Result[cache.ImmutableRef]) (map[string]string, error) {
eg, ctx := errgroup.WithContext(ctx)
g := session.NewGroup(j.SessionID)
var cacheExporterResponse map[string]string
Expand All @@ -593,7 +593,7 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *
id := fmt.Sprint(j.SessionID, "-cache-", i)
err = inBuilderContext(ctx, j, exp.Exporter.Name(), id, func(ctx context.Context, _ session.Group) error {
prepareDone := progress.OneOff(ctx, "preparing build cache for export")
if err := result.EachRef(cached, inp, func(res solver.CachedResult, ref cache.ImmutableRef) error {
if err := result.EachRef(&cached, &inp, func(res solver.CachedResult, ref cache.ImmutableRef) error {
ctx = withDescHandlerCacheOpts(ctx, ref)

// Configure compression
Expand Down Expand Up @@ -634,40 +634,37 @@ func runCacheExporters(ctx context.Context, exporters []RemoteCacheExporter, j *
return cacheExporterResponse, nil
}

func runInlineCacheExporter(ctx context.Context, e exporter.ExporterInstance, inlineExporter inlineCacheExporter, j *solver.Job, cached *result.Result[solver.CachedResult]) (result *exptypes.InlineCache, err error) {
func runInlineCacheExporter(ctx context.Context, e exporter.ExporterInstance, inlineExporter inlineCacheExporter, j *solver.Job, cached result.Result[solver.CachedResult]) (result *exptypes.InlineCache, err error) {
result = &exptypes.InlineCache{
Platforms: make(map[string][]byte),
}
if inlineExporter == nil {
return result, nil
}
if err := inBuilderContext(ctx, j, "preparing layers for inline cache", j.SessionID+"-cache-inline", func(ctx context.Context, _ session.Group) error {
if res := cached.Ref; res != nil {
err := cached.EachPlatformRef(func(platformID string, res solver.CachedResult) error {
dtic, err := inlineCache(ctx, inlineExporter, res, e.Config().Compression(), session.NewGroup(j.SessionID))
if err != nil {
return err
}
if dtic != nil {
result.Cache = dtic
}
}
for k, res := range cached.Refs {
dtic, err := inlineCache(ctx, inlineExporter, res, e.Config().Compression(), session.NewGroup(j.SessionID))
if err != nil {
return err
if dtic == nil {
return nil
}
if dtic != nil {
result.Platforms[k] = dtic
if platformID == "" {
result.Cache = dtic
} else {
result.Platforms[platformID] = dtic
}
}
return nil
return nil
})
return err
}); err != nil {
return nil, err
}
return result, nil
}

func (s *Solver) runExporters(ctx context.Context, exporters []exporter.ExporterInstance, inlineCacheExporter inlineCacheExporter, job *solver.Job, cached *result.Result[solver.CachedResult], inp *result.Result[cache.ImmutableRef]) (exporterResponse map[string]string, descref exporter.DescriptorReference, err error) {
func (s *Solver) runExporters(ctx context.Context, exporters []exporter.ExporterInstance, inlineCacheExporter inlineCacheExporter, job *solver.Job, cached result.Result[solver.CachedResult], inp result.Result[cache.ImmutableRef]) (exporterResponse map[string]string, descref exporter.DescriptorReference, err error) {
eg, ctx := errgroup.WithContext(ctx)
sessionID := job.SessionID
resps := make([]map[string]string, len(exporters))
Expand Down Expand Up @@ -863,7 +860,7 @@ func getProvenance(ref solver.ResultProxy, br *provenanceBridge, id string, reqs

type imageExporterInstance interface {
exporter.ExporterInstance
ExportImage(ctx context.Context, src *exporter.Source, cache exptypes.InlineCache, sessionID string) (map[string]string, exporter.DescriptorReference, error)
ExportImage(ctx context.Context, src exporter.Source, cache exptypes.InlineCache, sessionID string) (map[string]string, exporter.DescriptorReference, error)
}

func asImageExporter(e exporter.ExporterInstance) (imageExporterInstance, bool) {
Expand Down
61 changes: 61 additions & 0 deletions solver/result/result.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package result

import (
"sync"

"github.com/pkg/errors"
)

Expand Down Expand Up @@ -85,6 +87,33 @@ func (r *Result[T]) EachRef(fn func(T) error) (err error) {
return err
}

// EachPlatformRef iterates through all underlying refs and invokes
// the given handler with the platform ID corresponding to the ref.
// For the single ref case, the platform ID is an empty string.
func (r *Result[T]) EachPlatformRef(fn func(string, T) error) (err error) {
var zero T
if r.Ref != zero {
err = fn("", r.Ref)
}
for p, r := range r.Refs {
if r != zero {
if err1 := fn(p, r); err1 != nil && err == nil {
err = err1
}
}
}
for p, as := range r.Attestations {
for _, a := range as {
if a.Ref != zero {
if err1 := fn(p, a.Ref); err1 != nil && err == nil {
err = err1
}
}
}
}
return err
}

// EachRef iterates over references in both a and b.
// a and b are assumed to be of the same size and map their references
// to the same set of keys
Expand Down Expand Up @@ -170,3 +199,35 @@ func ConvertResult[U comparable, V comparable](r *Result[U], fn func(U) (V, erro

return r2, nil
}

// Result returns the underlying Result value
func (r *MutableResult[T]) Result() *Result[T] {
return &r.res
}

// AddMeta adds the metadata specified with the given key/value pair
func (r *MutableResult[T]) AddMeta(k string, v []byte) {
r.mu.Lock()
r.res.AddMeta(k, v)
r.mu.Unlock()
}

// AddRef adds the specified reference for the platform given with k
func (r *MutableResult[T]) AddRef(k string, ref T) {
r.mu.Lock()
r.res.AddRef(k, ref)
r.mu.Unlock()
}

// SetRef sets the specified reference as the single reference
func (r *MutableResult[T]) SetRef(ref T) {
r.mu.Lock()
r.res.SetRef(ref)
r.mu.Unlock()
}

// MutableResult is a thread-safe version of Result
type MutableResult[T comparable] struct {
mu sync.Mutex
res Result[T]
}

0 comments on commit 5555bd5

Please sign in to comment.