Skip to content

Commit

Permalink
gateway: pass executor with build and not access worker directly
Browse files Browse the repository at this point in the history
Running interactive container APIs was done by giving
the gateway implementation access to worker controller
directly, but it should be passed with a build job instead.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
(cherry picked from commit 0971dffaab93d91e51af984b44c745b35b3c5b4d)
(cherry picked from commit 564f884e7bb6db9c63e03c3b081ea71e15aa7980)
  • Loading branch information
tonistiigi committed Jan 31, 2024
1 parent 7718bd5 commit 5026d95
Show file tree
Hide file tree
Showing 13 changed files with 98 additions and 39 deletions.
4 changes: 2 additions & 2 deletions cmd/buildkitd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -677,8 +677,8 @@ func newController(c *cli.Context, cfg *config.Config) (*control.Controller, err
return nil, err
}
frontends := map[string]frontend.Frontend{}
frontends["dockerfile.v0"] = forwarder.NewGatewayForwarder(wc, dockerfile.Build)
frontends["gateway.v0"] = gateway.NewGatewayFrontend(wc)
frontends["dockerfile.v0"] = forwarder.NewGatewayForwarder(wc.Infos(), dockerfile.Build)
frontends["gateway.v0"] = gateway.NewGatewayFrontend(wc.Infos())

cacheStorage, err := bboltcachestorage.NewStore(filepath.Join(cfg.Root, "cache.db"))
if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import (
"net"
"syscall"

"github.com/containerd/containerd/mount"
"github.com/docker/docker/pkg/idtools"
resourcestypes "github.com/moby/buildkit/executor/resources/types"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver/pb"
)

Expand All @@ -28,8 +29,13 @@ type Meta struct {
RemoveMountStubsRecursive bool
}

type MountableRef interface {
Mount() ([]mount.Mount, func() error, error)
IdentityMapping() *idtools.IdentityMapping
}

type Mountable interface {
Mount(ctx context.Context, readonly bool) (snapshot.Mountable, error)
Mount(ctx context.Context, readonly bool) (MountableRef, error)
}

type Mount struct {
Expand Down
3 changes: 2 additions & 1 deletion frontend/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
Expand All @@ -17,7 +18,7 @@ type Result = result.Result[solver.ResultProxy]
type Attestation = result.Attestation[solver.ResultProxy]

type Frontend interface {
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (*Result, error)
Solve(ctx context.Context, llb FrontendLLBBridge, exec executor.Executor, opt map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (*Result, error)
}

type FrontendLLBBridge interface {
Expand Down
9 changes: 4 additions & 5 deletions frontend/gateway/container/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type Mount struct {
WorkerRef *worker.WorkerRef
}

func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
func NewContainer(ctx context.Context, cm cache.Manager, exec executor.Executor, sm *session.Manager, g session.Group, req NewContainerRequest) (client.Container, error) {
ctx, cancel := context.WithCancel(ctx)
eg, ctx := errgroup.WithContext(ctx)
platform := opspb.Platform{
Expand All @@ -63,7 +63,7 @@ func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g s
hostname: req.Hostname,
extraHosts: req.ExtraHosts,
platform: platform,
executor: w.Executor(),
executor: exec,
sm: sm,
group: g,
errGroup: eg,
Expand All @@ -86,9 +86,8 @@ func NewContainer(ctx context.Context, w worker.Worker, sm *session.Manager, g s
}

name := fmt.Sprintf("container %s", req.ContainerID)
mm := mounts.NewMountManager(name, w.CacheManager(), sm)
p, err := PrepareMounts(ctx, mm, w.CacheManager(), g, "", mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) {
cm := w.CacheManager()
mm := mounts.NewMountManager(name, cm, sm)
p, err := PrepareMounts(ctx, mm, cm, g, "", mnts, refs, func(m *opspb.Mount, ref cache.ImmutableRef) (cache.MutableRef, error) {
if m.Input != opspb.Empty {
cm = refs[m.Input].Worker.CacheManager()
}
Expand Down
9 changes: 6 additions & 3 deletions frontend/gateway/forwarder/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

cacheutil "github.com/moby/buildkit/cache/util"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/frontend/gateway/container"
Expand All @@ -26,7 +27,7 @@ import (
"golang.org/x/sync/errgroup"
)

func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, w worker.Infos, sid string, sm *session.Manager) (*BridgeClient, error) {
func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*opspb.Definition, w worker.Infos, sid string, sm *session.Manager) (*BridgeClient, error) {
bc := &BridgeClient{
opts: opts,
inputs: inputs,
Expand All @@ -35,6 +36,7 @@ func LLBBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLL
sm: sm,
workers: w,
workerRefByID: make(map[string]*worker.WorkerRef),
executor: exec,
}
bc.buildOpts = bc.loadBuildOpts()
return bc, nil
Expand All @@ -52,6 +54,7 @@ type BridgeClient struct {
workerRefByID map[string]*worker.WorkerRef
buildOpts client.BuildOpts
ctrs []client.Container
executor executor.Executor
}

func (c *BridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*client.Result, error) {
Expand Down Expand Up @@ -293,13 +296,13 @@ func (c *BridgeClient) NewContainer(ctx context.Context, req client.NewContainer
return nil, err
}

w, err := c.workers.GetDefault()
cm, err := c.workers.DefaultCacheManager()
if err != nil {
return nil, err
}

group := session.NewGroup(c.sid)
ctr, err := container.NewContainer(ctx, w, c.sm, group, ctrReq)
ctr, err := container.NewContainer(ctx, cm, c.executor, c.sm, group, ctrReq)
if err != nil {
return nil, err
}
Expand Down
5 changes: 3 additions & 2 deletions frontend/gateway/forwarder/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package forwarder
import (
"context"

"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/session"
Expand All @@ -22,8 +23,8 @@ type GatewayForwarder struct {
f client.BuildFunc
}

func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (retRes *frontend.Result, retErr error) {
c, err := LLBBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm)
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*pb.Definition, sid string, sm *session.Manager) (retRes *frontend.Result, retErr error) {
c, err := LLBBridgeToGatewayClient(ctx, llbBridge, exec, opts, inputs, gf.workers, sid, sm)
if err != nil {
return nil, err
}
Expand Down
29 changes: 13 additions & 16 deletions frontend/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func filterPrefix(opts map[string]string, pfx string) map[string]string {
return m
}

func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) {
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, opts map[string]string, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*frontend.Result, error) {
source, ok := opts[keySource]
if !ok {
return nil, errors.Errorf("no source specified for gateway")
Expand Down Expand Up @@ -141,7 +141,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
}
}
} else {
c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers, sid, sm)
c, err := forwarder.LLBBridgeToGatewayClient(ctx, llbBridge, exec, opts, inputs, gf.workers, sid, sm)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -281,18 +281,13 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
}
}

lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid, sm)
lbf, ctx, err := serveLLBBridgeForwarder(ctx, llbBridge, exec, gf.workers, inputs, sid, sm)
defer lbf.conn.Close() //nolint
if err != nil {
return nil, err
}
defer lbf.Discard()

w, err := gf.workers.GetDefault()
if err != nil {
return nil, err
}

mdmnt, release, err := metadataMount(frontendDef)
if err != nil {
return nil, err
Expand All @@ -305,7 +300,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
mnts = append(mnts, *mdmnt)
}

_, err = w.Executor().Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
_, err = exec.Run(ctx, "", container.MountWithSession(rootFS, session.NewGroup(sid)), mnts, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
if err != nil {
if errdefs.IsCanceled(ctx, err) && lbf.isErrServerClosed {
err = errors.Errorf("frontend grpc server closed unexpectedly")
Expand Down Expand Up @@ -434,11 +429,11 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
return lbf.result, nil
}

func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
return newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) LLBBridgeForwarder {
return newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm)
}

func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) *llbBridgeForwarder {
lbf := &llbBridgeForwarder{
callCtx: ctx,
llbBridge: llbBridge,
Expand All @@ -451,13 +446,14 @@ func newBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
sid: sid,
sm: sm,
ctrs: map[string]gwclient.Container{},
executor: exec,
}
return lbf
}

func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
func serveLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, exec executor.Executor, workers worker.Infos, inputs map[string]*opspb.Definition, sid string, sm *session.Manager) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
lbf := newBridgeForwarder(ctx, llbBridge, workers, inputs, sid, sm)
lbf := newBridgeForwarder(ctx, llbBridge, exec, workers, inputs, sid, sm)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
pb.RegisterLLBBridgeServer(server, lbf)
Expand Down Expand Up @@ -552,6 +548,7 @@ type llbBridgeForwarder struct {
isErrServerClosed bool
sid string
sm *session.Manager
executor executor.Executor
*pipe
ctrs map[string]gwclient.Container
ctrsMu sync.Mutex
Expand Down Expand Up @@ -1042,7 +1039,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
// and we want the context to live for the duration of the container.
group := session.NewGroup(lbf.sid)

w, err := lbf.workers.GetDefault()
cm, err := lbf.workers.DefaultCacheManager()
if err != nil {
return nil, stack.Enable(err)
}
Expand All @@ -1052,7 +1049,7 @@ func (lbf *llbBridgeForwarder) NewContainer(ctx context.Context, in *pb.NewConta
return nil, stack.Enable(err)
}

ctr, err := container.NewContainer(context.Background(), w, lbf.sm, group, ctrReq)
ctr, err := container.NewContainer(context.Background(), cm, lbf.executor, lbf.sm, group, ctrReq)
if err != nil {
return nil, stack.Enable(err)
}
Expand Down
7 changes: 2 additions & 5 deletions snapshot/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,11 @@ import (
"github.com/containerd/containerd/pkg/userns"
"github.com/containerd/containerd/snapshots"
"github.com/docker/docker/pkg/idtools"
"github.com/moby/buildkit/executor"
"github.com/pkg/errors"
)

type Mountable interface {
// ID() string
Mount() ([]mount.Mount, func() error, error)
IdentityMapping() *idtools.IdentityMapping
}
type Mountable = executor.MountableRef

// Snapshotter defines interface that any snapshot implementation should satisfy
type Snapshotter interface {
Expand Down
32 changes: 32 additions & 0 deletions solver/llbsolver/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/moby/buildkit/cache/remotecache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
resourcestypes "github.com/moby/buildkit/executor/resources/types"
"github.com/moby/buildkit/frontend"
gw "github.com/moby/buildkit/frontend/gateway/client"
"github.com/moby/buildkit/identity"
Expand Down Expand Up @@ -39,6 +41,10 @@ type llbBridge struct {
cms map[string]solver.CacheManager
cmsMu sync.Mutex
sm *session.Manager

executorOnce sync.Once
executorErr error
executor executor.Executor
}

func (b *llbBridge) Warn(ctx context.Context, dgst digest.Digest, msg string, opts frontend.WarnOpts) error {
Expand Down Expand Up @@ -159,6 +165,32 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
return res, nil
}

func (b *llbBridge) Run(ctx context.Context, id string, rootfs executor.Mount, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (resourcestypes.Recorder, error) {
if err := b.loadExecutor(); err != nil {
return nil, err
}
return b.executor.Run(ctx, id, rootfs, mounts, process, started)
}

func (b *llbBridge) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
if err := b.loadExecutor(); err != nil {
return err
}
return b.executor.Exec(ctx, id, process)
}

func (b *llbBridge) loadExecutor() error {
b.executorOnce.Do(func() {
w, err := b.resolveWorker()
if err != nil {
b.executorErr = err
return
}
b.executor = w.Executor()
})
return b.executorErr
}

type resultProxy struct {
id string
b *provenanceBridge
Expand Down
2 changes: 1 addition & 1 deletion solver/llbsolver/provenance.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func (b *provenanceBridge) Solve(ctx context.Context, req frontend.SolveRequest,
return nil, errors.Errorf("invalid frontend: %s", req.Frontend)
}
wb := &provenanceBridge{llbBridge: b.llbBridge, req: &req}
res, err = f.Solve(ctx, wb, req.FrontendOpt, req.FrontendInputs, sid, b.llbBridge.sm)
res, err = f.Solve(ctx, wb, b.llbBridge, req.FrontendOpt, req.FrontendInputs, sid, b.llbBridge.sm)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion solver/llbsolver/solver.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req fro
br := s.bridge(j)
var fwd gateway.LLBBridgeForwarder
if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" {
fwd = gateway.NewBridgeForwarder(ctx, br, s.workerController, req.FrontendInputs, sessionID, s.sm)
fwd = gateway.NewBridgeForwarder(ctx, br, br, s.workerController.Infos(), req.FrontendInputs, sessionID, s.sm)
defer fwd.Discard()
// Register build before calling s.recordBuildHistory, because
// s.recordBuildHistory can block for several seconds on
Expand Down
2 changes: 1 addition & 1 deletion worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,6 @@ type Worker interface {
}

type Infos interface {
GetDefault() (Worker, error)
DefaultCacheManager() (cache.Manager, error)
WorkerInfos() []client.WorkerInfo
}
23 changes: 23 additions & 0 deletions worker/workercontroller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package worker
import (
"github.com/containerd/containerd/filters"
"github.com/hashicorp/go-multierror"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/pkg/errors"
)
Expand Down Expand Up @@ -81,3 +82,25 @@ func (c *Controller) WorkerInfos() []client.WorkerInfo {
}
return out
}

func (c *Controller) Infos() Infos {
return &infosController{c: c}
}

type infosController struct {
c *Controller
}

var _ Infos = &infosController{}

func (c *infosController) DefaultCacheManager() (cache.Manager, error) {
w, err := c.c.GetDefault()
if err != nil {
return nil, err
}
return w.CacheManager(), nil
}

func (c *infosController) WorkerInfos() []client.WorkerInfo {
return c.c.WorkerInfos()
}

0 comments on commit 5026d95

Please sign in to comment.