Skip to content

Commit

Permalink
stabilize container hostnames against SessionID (dagger#5474)
Browse files Browse the repository at this point in the history
* Add test for local dir import cache reuse.

Signed-off-by: Erik Sipsma <erik@sipsma.dev>

* stabilize container hostnames against SessionID

SessionID is important to keep on host directories, because otherwise
the Buildkit solver's vertex is keyed only on a local client's path,
whose contents may differ when clients are running in containers with
different content mounted to the same path. This primarily affects
setups with a long-running external engine.

To prevent the SessionID from busting caches everywhere, hostnames are
now calculated by taking a stable hash of the *pb.Definition, which is a
pretty invasive process:

First it loops over the inner Ops and removes the relevant ephemeral
metadata from SourceOps. This results in a new Op with a different
digest, so it keeps track of the old and new digests along the way.

Next it loops over the old => new digest mapping and converts any
occurrences of the old digest to the new digest, both in the
definition's OpMetadata and in each inner Op's inputs. Any Ops whose
inputs had to be modified will result in yet another old => new digest
mapping, so this process repeats until everything stabilizes.

Finally, the inner Ops are sorted lexicographically because otherwise
they are in topological order, meaning two root Ops may change places
without changing semantics, but in this case we need everything to be
deterministic.

Signed-off-by: Alex Suraci <alex@dagger.io>

* fix up recursion/looping order

Signed-off-by: Alex Suraci <alex@dagger.io>

* use llbsolver.Load + source policy for stable digests

Signed-off-by: Alex Suraci <alex@dagger.io>

* be more selective with repo content that we pull in

otherwise the git add * call can fail, if there are .gitignored files or
other weird content in your local dir (e.g. bind-mounted repos for
shimming into go.mod replace rules)

Signed-off-by: Alex Suraci <alex@dagger.io>

* Revert "use llbsolver.Load + source policy for stable digests"

works great, but breaks the Darwin build. :(

This reverts commit 7c20b08.

Signed-off-by: Alex Suraci <alex@dagger.io>

* extract llbsolver.Load from Buildkit

goals: a) replace my hairy yolo'd implementation with a bona-fide one
from upstream, and b) gain memoization along the way.

Signed-off-by: Alex Suraci <alex@dagger.io>

---------

Signed-off-by: Erik Sipsma <erik@sipsma.dev>
Signed-off-by: Alex Suraci <alex@dagger.io>
Co-authored-by: Erik Sipsma <erik@sipsma.dev>
  • Loading branch information
vito and sipsma committed Jul 17, 2023
1 parent 7d2d263 commit c7a440a
Show file tree
Hide file tree
Showing 5 changed files with 311 additions and 8 deletions.
11 changes: 7 additions & 4 deletions core/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -1244,11 +1244,14 @@ func (container *Container) WithExec(ctx context.Context, gw bkgw.Client, progSo
execStNoHostname := fsSt.Run(runOpts...)

// next, marshal it to compute a deterministic hostname
constraints := llb.NewConstraints(llb.Platform(platform))
rootVtx := execStNoHostname.Root().Output().Vertex(ctx, constraints)
digest, _, _, _, err := rootVtx.Marshal(ctx, constraints) //nolint:dogsled
execDefNoHostname, err := execStNoHostname.Root().Marshal(ctx, llb.Platform(platform))
if err != nil {
return nil, fmt.Errorf("marshal: %w", err)
return nil, fmt.Errorf("marshal root: %w", err)
}
// compute a *stable* digest so that hostnames don't change across sessions
digest, err := stableDigest(execDefNoHostname.ToPB())
if err != nil {
return nil, fmt.Errorf("stable digest: %w", err)
}
hostname := hostHash(digest)
container.Hostname = hostname
Expand Down
263 changes: 263 additions & 0 deletions core/digest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package core

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"reflect"
"sort"

"github.com/moby/buildkit/solver/pb"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)

func stableDigest(value any) (digest.Digest, error) {
buf := new(bytes.Buffer)

if err := digestInto(value, buf); err != nil {
return "", err
}

return digest.FromReader(buf)
}

func digestInto(value any, dest io.Writer) (err error) {
defer func() {
if err := recover(); err != nil {
panic(fmt.Errorf("digest %T: %v", value, err))
}
}()

switch x := value.(type) {
case *pb.Definition:
if x == nil {
break
}

digest, err := digestLLB(context.TODO(), x, stabilizeSourcePolicy{})
if err != nil {
return err
}

_, err = fmt.Fprintln(dest, digest)
return err

case []byte:
// base64-encode bytes rather than treating it like a slice
return json.NewEncoder(dest).Encode(value)
}

rt := reflect.TypeOf(value)
rv := reflect.ValueOf(value)
if rt.Kind() == reflect.Ptr {
if rv.IsNil() {
_, err := fmt.Fprintln(dest, "nil")
return err
}
rt = rt.Elem()
rv = rv.Elem()
}

switch rt.Kind() {
case reflect.Struct:
if err := digestStructInto(rt, rv, dest); err != nil {
return fmt.Errorf("digest struct: %w", err)
}
case reflect.Slice, reflect.Array:
if err := digestSliceInto(rv, dest); err != nil {
return fmt.Errorf("digest slice/array: %w", err)
}
case reflect.Map:
if err := digestMapInto(rv, dest); err != nil {
return fmt.Errorf("digest map: %w", err)
}
case reflect.String,
reflect.Bool,
reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64,
reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64,
reflect.Float32, reflect.Float64:
if err := json.NewEncoder(dest).Encode(value); err != nil {
return err
}
default:
return fmt.Errorf("don't know how to digest %T", value)
}

return nil
}

func digestStructInto(rt reflect.Type, rv reflect.Value, dest io.Writer) error {
for i := 0; i < rt.NumField(); i++ {
name := rt.Field(i).Name
fmt.Fprintln(dest, name)
if err := digestInto(rv.Field(i).Interface(), dest); err != nil {
return fmt.Errorf("field %s: %w", name, err)
}
}

return nil
}

func digestSliceInto(rv reflect.Value, dest io.Writer) error {
for i := 0; i < rv.Len(); i++ {
fmt.Fprintln(dest, i)
if err := digestInto(rv.Index(i).Interface(), dest); err != nil {
return fmt.Errorf("index %d: %w", i, err)
}
}

return nil
}

func digestMapInto(rv reflect.Value, dest io.Writer) error {
keys := rv.MapKeys()
sort.Slice(keys, func(i, j int) bool {
return keys[i].String() < keys[j].String()
})

for _, k := range keys {
if err := digestInto(k.Interface(), dest); err != nil {
return fmt.Errorf("key %v: %w", k, err)
}
if err := digestInto(rv.MapIndex(k).Interface(), dest); err != nil {
return fmt.Errorf("value for key %v: %w", k, err)
}
}

return nil
}

type sourcePolicyEvaluator interface {
Evaluate(ctx context.Context, op *pb.Op) (bool, error)
}

// TODO(vito): this is an extracted/trimmed down implementation of
// llbsolver.Load from upstream Buildkit. Ideally we would use it directly but
// we have to avoid importing that package because it breaks the Darwin build.
func digestLLB(ctx context.Context, def *pb.Definition, polEngine sourcePolicyEvaluator) (digest.Digest, error) {
if len(def.Def) == 0 {
return "", errors.New("invalid empty definition")
}

allOps := make(map[digest.Digest]*pb.Op)
mutatedDigests := make(map[digest.Digest]digest.Digest) // key: old, val: new

var lastDgst digest.Digest

for _, dt := range def.Def {
var op pb.Op
if err := (&op).Unmarshal(dt); err != nil {
return "", errors.Wrap(err, "failed to parse llb proto op")
}
dgst := digest.FromBytes(dt)
if polEngine != nil {
mutated, err := polEngine.Evaluate(ctx, &op)
if err != nil {
return "", errors.Wrap(err, "error evaluating the source policy")
}
if mutated {
dtMutated, err := op.Marshal()
if err != nil {
return "", err
}
dgstMutated := digest.FromBytes(dtMutated)
mutatedDigests[dgst] = dgstMutated
dgst = dgstMutated
}
}
allOps[dgst] = &op
lastDgst = dgst
}

for dgst := range allOps {
_, err := recomputeDigests(ctx, allOps, mutatedDigests, dgst)
if err != nil {
return "", err
}
}

if len(allOps) < 2 {
return "", errors.Errorf("invalid LLB with %d vertexes", len(allOps))
}

for {
newDgst, ok := mutatedDigests[lastDgst]
if !ok || newDgst == lastDgst {
break
}
lastDgst = newDgst
}

lastOp := allOps[lastDgst]
delete(allOps, lastDgst)
if len(lastOp.Inputs) == 0 {
return "", errors.Errorf("invalid LLB with no inputs on last vertex")
}

dgst := lastOp.Inputs[0].Digest

return dgst, nil
}

func recomputeDigests(ctx context.Context, all map[digest.Digest]*pb.Op, visited map[digest.Digest]digest.Digest, dgst digest.Digest) (digest.Digest, error) {
if dgst, ok := visited[dgst]; ok {
return dgst, nil
}
op := all[dgst]

var mutated bool
for _, input := range op.Inputs {
if ctx.Err() != nil {
return "", ctx.Err()
}

iDgst, err := recomputeDigests(ctx, all, visited, input.Digest)
if err != nil {
return "", err
}
if input.Digest != iDgst {
mutated = true
input.Digest = iDgst
}
}

if !mutated {
visited[dgst] = dgst
return dgst, nil
}

dt, err := op.Marshal()
if err != nil {
return "", err
}
newDgst := digest.FromBytes(dt)
visited[dgst] = newDgst
all[newDgst] = op
delete(all, dgst)
return newDgst, nil
}

// stabilizeSourcePolicy removes ephemeral metadata from ops to prevent it from
// busting caches.
type stabilizeSourcePolicy struct{}

func (stabilizeSourcePolicy) Evaluate(ctx context.Context, op *pb.Op) (bool, error) {
if src := op.GetSource(); src != nil {
var modified bool
for k := range src.Attrs {
switch k {
case pb.AttrLocalSessionID,
pb.AttrLocalUniqueID,
pb.AttrSharedKeyHint: // contains session ID
delete(src.Attrs, k)
modified = true
}
}
return modified, nil
}

return false, nil
}
8 changes: 5 additions & 3 deletions core/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ func (host *Host) Directory(ctx context.Context, gw bkgw.Client, dirPath string,
pipelineName := fmt.Sprintf("%s %s", pipelineNamePrefix, absPath)
ctx, subRecorder := progrock.WithGroup(ctx, pipelineName, progrock.Weak())

localID := fmt.Sprintf("host:%s", absPath)
sessionID := gw.BuildOpts().SessionID
localID := fmt.Sprintf("host:%s:%s", sessionID, absPath)

localOpts := []llb.LocalOption{
// Custom name
Expand All @@ -71,8 +72,9 @@ func (host *Host) Directory(ctx context.Context, gw bkgw.Client, dirPath string,
llb.SharedKeyHint(localID),

// sync this dir from this session specifically, even if this ends up passed
// to a different session (e.g. a project container)
llb.SessionID(gw.BuildOpts().SessionID),
// to a different session (e.g. a project container). this also helps
// protect against cross-session solver collisions in Buildkit.
llb.SessionID(sessionID),
}

opName := fmt.Sprintf("copy %s", absPath)
Expand Down
35 changes: 35 additions & 0 deletions core/integration/cache_test.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package core

import (
"context"
"os"
"path/filepath"
"testing"

"dagger.io/dagger"
"github.com/dagger/dagger/core"
"github.com/dagger/dagger/internal/testutil"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -65,3 +69,34 @@ func TestCacheVolume(t *testing.T) {
require.NotEqual(t, idOrig, idDiff)
})
}

func TestLocalImportCacheReuse(t *testing.T) {
t.Parallel()

hostDirPath := t.TempDir()
err := os.WriteFile(filepath.Join(hostDirPath, "foo"), []byte("bar"), 0o644)
require.NoError(t, err)

runExec := func(ctx context.Context, t *testing.T, c *dagger.Client) string {
out, err := c.Container().From("alpine:3.16.2").
WithDirectory("/fromhost", c.Host().Directory(hostDirPath)).
WithExec([]string{"sh", "-c", "head -c 128 /dev/random | sha256sum"}).
Stdout(ctx)
require.NoError(t, err)
return out
}

c1, ctx1 := connect(t)
defer c1.Close()
ctx1, cancel1 := context.WithCancel(ctx1)
defer cancel1()
out1 := runExec(ctx1, t, c1)

c2, ctx2 := connect(t)
defer c2.Close()
ctx2, cancel2 := context.WithCancel(ctx2)
defer cancel2()
out2 := runExec(ctx2, t, c2)

require.Equal(t, out1, out2)
}
2 changes: 1 addition & 1 deletion core/integration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ func (ctr DaggerCLIContainer) WithLoadedProject(
require.NoError(ctr.t, err)

thisRepoDir := ctr.c.Host().Directory(thisRepoPath, dagger.HostDirectoryOpts{
Exclude: []string{".git", "bin", "docs", "website"},
Include: []string{"core", "sdk", "go.mod", "go.sum"},
})
projectArg := filepath.Join(cliContainerRepoMntPath, projectPath)

Expand Down

0 comments on commit c7a440a

Please sign in to comment.