Skip to content

Commit

Permalink
Merge pull request #2581 from imeoer/nydus-compression-type
Browse files Browse the repository at this point in the history
Support for exporting nydus compression type
  • Loading branch information
tonistiigi committed Nov 8, 2022
2 parents f39f6b2 + 3e9e898 commit 5b7315c
Show file tree
Hide file tree
Showing 38 changed files with 2,471 additions and 33 deletions.
50 changes: 50 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,56 @@ jobs:
name: coverage
path: ./coverage

test-nydus:
runs-on: ubuntu-20.04
needs: [base]
strategy:
fail-fast: false
matrix:
pkg:
- ./client
worker:
- containerd
- oci
typ:
- integration
exclude:
- pkg: ./client ./cmd/buildctl ./worker/containerd ./solver ./frontend
typ: dockerfile
include:
- pkg: ./...
skip-integration-tests: 1
typ: integration
steps:
-
name: Checkout
uses: actions/checkout@v3
-
name: Expose GitHub Runtime
uses: crazy-max/ghaction-github-runtime@v2
-
name: Set up QEMU
uses: docker/setup-qemu-action@v2
-
name: Set up Docker Buildx
uses: docker/setup-buildx-action@v2
with:
version: ${{ env.BUILDX_VERSION }}
driver-opts: image=${{ env.REPO_SLUG_ORIGIN }}
buildkitd-flags: --debug
-
name: Test pkg=${{ matrix.pkg }} ; typ=${{ matrix.typ }} ; skipit=${{ matrix.skip-integration-tests }} ; worker=${{ matrix.worker }}
run: |
if [ -n "${{ matrix.worker }}" ]; then
export TESTFLAGS="${TESTFLAGS} --tags=nydus --run=//worker=${{ matrix.worker }}$"
fi
./hack/test ${{ matrix.typ }}
env:
BUILDKITD_TAGS: nydus
TESTPKGS: ${{ matrix.pkg }}
SKIP_INTEGRATION_TESTS: ${{ matrix.skip-integration-tests }}
CACHE_FROM: type=gha,scope=${{ env.CACHE_GHA_SCOPE_IT }} type=gha,scope=${{ env.CACHE_GHA_SCOPE_BINARIES }}

test-s3:
runs-on: ubuntu-20.04
needs:
Expand Down
10 changes: 10 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ARG CNI_VERSION=v1.1.0
ARG STARGZ_SNAPSHOTTER_VERSION=v0.12.0
ARG NERDCTL_VERSION=v0.17.1
ARG DNSNAME_VERSION=v1.3.1
ARG NYDUS_VERSION=v2.1.0

# ALPINE_VERSION sets version for the base layers
ARG ALPINE_VERSION=3.15
Expand Down Expand Up @@ -192,6 +193,14 @@ RUN --mount=target=/root/.cache,type=cache \
xx-verify --static /out/containerd-stargz-grpc && \
xx-verify --static /out/ctr-remote

FROM gobuild-base AS nydus
ARG NYDUS_VERSION
ARG TARGETOS
ARG TARGETARCH
SHELL ["/bin/bash", "-c"]
RUN wget https://github.com/dragonflyoss/image-service/releases/download/$NYDUS_VERSION/nydus-static-$NYDUS_VERSION-$TARGETOS-$TARGETARCH.tgz
RUN mkdir -p /out/nydus-static && tar xzvf nydus-static-$NYDUS_VERSION-$TARGETOS-$TARGETARCH.tgz -C /out

FROM buildkit-export AS buildkit-linux
COPY --link --from=binaries / /usr/bin/
ENTRYPOINT ["buildkitd"]
Expand Down Expand Up @@ -235,6 +244,7 @@ RUN apk add --no-cache shadow shadow-uidmap sudo vim iptables ip6tables dnsmasq
ENV BUILDKIT_INTEGRATION_CONTAINERD_EXTRA="containerd-1.4=/opt/containerd-alt-14/bin,containerd-1.5=/opt/containerd-alt-15/bin"
ENV BUILDKIT_INTEGRATION_SNAPSHOTTER=stargz
ENV CGO_ENABLED=0
COPY --link --from=nydus /out/nydus-static/* /usr/bin/
COPY --link --from=stargz-snapshotter /out/* /usr/bin/
COPY --link --from=rootlesskit /rootlesskit /usr/bin/
COPY --link --from=containerd-alt-14 /out/containerd* /opt/containerd-alt-14/bin/
Expand Down
2 changes: 1 addition & 1 deletion cache/blobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func computeBlobChain(ctx context.Context, sr *immutableRef, createIfNeeded bool
return nil, errors.WithStack(ErrNoBlobs)
}

compressorFunc, finalize := comp.Type.Compress(comp)
compressorFunc, finalize := comp.Type.Compress(ctx, comp)
mediaType := comp.Type.MediaType()

var lowerRef *immutableRef
Expand Down
9 changes: 6 additions & 3 deletions cache/blobs_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,14 @@ func (sr *immutableRef) tryComputeOverlayBlob(ctx context.Context, lower, upper
if err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to get compressed stream")
}
err = overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower)
compressed.Close()
if err != nil {
// Close ensure compressorFunc does some finalization works.
defer compressed.Close()
if err := overlay.WriteUpperdir(ctx, io.MultiWriter(compressed, dgstr.Hash()), upperdir, lower); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to write compressed diff")
}
if err := compressed.Close(); err != nil {
return emptyDesc, false, errors.Wrap(err, "failed to close compressed diff writer")
}
if labels == nil {
labels = map[string]string{}
}
Expand Down
16 changes: 16 additions & 0 deletions cache/compression.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
//go:build !nydus
// +build !nydus

package cache

import (
"context"

"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache/config"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
)

func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
return refCfg.Compression.Force
}
148 changes: 148 additions & 0 deletions cache/compression_nydus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
//go:build nydus
// +build nydus

package cache

import (
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"

"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/moby/buildkit/cache/config"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/util/compression"
digest "github.com/opencontainers/go-digest"
ocispecs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"

nydusify "github.com/containerd/nydus-snapshotter/pkg/converter"
)

func init() {
additionalAnnotations = append(
additionalAnnotations,
nydusify.LayerAnnotationNydusBlob, nydusify.LayerAnnotationNydusBootstrap, nydusify.LayerAnnotationNydusBlobIDs,
)
}

// Nydus compression type can't be mixed with other compression types in the same image,
// so if `source` is this kind of layer, but the target is other compression type, we
// should do the forced compression.
func needsForceCompression(ctx context.Context, cs content.Store, source ocispecs.Descriptor, refCfg config.RefConfig) bool {
if refCfg.Compression.Force {
return true
}
isNydusBlob, _ := compression.Nydus.Is(ctx, cs, source)
if refCfg.Compression.Type == compression.Nydus {
return !isNydusBlob
}
return isNydusBlob
}

// MergeNydus does two steps:
// 1. Extracts nydus bootstrap from nydus format (nydus blob + nydus bootstrap) for each layer.
// 2. Merge all nydus bootstraps into a final bootstrap (will as an extra layer).
// The nydus bootstrap size is very small, so the merge operation is fast.
func MergeNydus(ctx context.Context, ref ImmutableRef, comp compression.Config, s session.Group) (*ocispecs.Descriptor, error) {
iref, ok := ref.(*immutableRef)
if !ok {
return nil, fmt.Errorf("unsupported ref")
}
refs := iref.layerChain()
if len(refs) == 0 {
return nil, fmt.Errorf("refs can't be empty")
}

// Extracts nydus bootstrap from nydus format for each layer.
var cm *cacheManager
layers := []nydusify.Layer{}
blobIDs := []string{}
for _, ref := range refs {
blobDesc, err := getBlobWithCompressionWithRetry(ctx, ref, comp, s)
if err != nil {
return nil, errors.Wrapf(err, "get compression blob %q", comp.Type)
}
ra, err := ref.cm.ContentStore.ReaderAt(ctx, blobDesc)
if err != nil {
return nil, errors.Wrapf(err, "get reader for compression blob %q", comp.Type)
}
defer ra.Close()
if cm == nil {
cm = ref.cm
}
blobIDs = append(blobIDs, blobDesc.Digest.Hex())
layers = append(layers, nydusify.Layer{
Digest: blobDesc.Digest,
ReaderAt: ra,
})
}

// Merge all nydus bootstraps into a final nydus bootstrap.
pr, pw := io.Pipe()
go func() {
defer pw.Close()
if _, err := nydusify.Merge(ctx, layers, pw, nydusify.MergeOption{
WithTar: true,
}); err != nil {
pw.CloseWithError(errors.Wrapf(err, "merge nydus bootstrap"))
}
}()

// Compress final nydus bootstrap to tar.gz and write into content store.
cw, err := content.OpenWriter(ctx, cm.ContentStore, content.WithRef("nydus-merge-"+iref.getChainID().String()))
if err != nil {
return nil, errors.Wrap(err, "open content store writer")
}
defer cw.Close()

gw := gzip.NewWriter(cw)
uncompressedDgst := digest.SHA256.Digester()
compressed := io.MultiWriter(gw, uncompressedDgst.Hash())
if _, err := io.Copy(compressed, pr); err != nil {
return nil, errors.Wrapf(err, "copy bootstrap targz into content store")
}
if err := gw.Close(); err != nil {
return nil, errors.Wrap(err, "close gzip writer")
}

compressedDgst := cw.Digest()
if err := cw.Commit(ctx, 0, compressedDgst, content.WithLabels(map[string]string{
containerdUncompressed: uncompressedDgst.Digest().String(),
})); err != nil {
if !errdefs.IsAlreadyExists(err) {
return nil, errors.Wrap(err, "commit to content store")
}
}
if err := cw.Close(); err != nil {
return nil, errors.Wrap(err, "close content store writer")
}

info, err := cm.ContentStore.Info(ctx, compressedDgst)
if err != nil {
return nil, errors.Wrap(err, "get info from content store")
}

blobIDsBytes, err := json.Marshal(blobIDs)
if err != nil {
return nil, errors.Wrap(err, "marshal blob ids")
}

desc := ocispecs.Descriptor{
Digest: compressedDgst,
Size: info.Size,
MediaType: ocispecs.MediaTypeImageLayerGzip,
Annotations: map[string]string{
containerdUncompressed: uncompressedDgst.Digest().String(),
// Use this annotation to identify nydus bootstrap layer.
nydusify.LayerAnnotationNydusBootstrap: "true",
// Track all blob digests for nydus snapshotter.
nydusify.LayerAnnotationNydusBlobIDs: string(blobIDsBytes),
},
}

return &desc, nil
}
2 changes: 1 addition & 1 deletion cache/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func getConverter(ctx context.Context, cs content.Store, desc ocispecs.Descripto
}

c := conversion{target: comp}
c.compress, c.finalize = comp.Type.Compress(comp)
c.compress, c.finalize = comp.Type.Compress(ctx, comp)
c.decompress = from.Decompress

return (&c).convert, nil
Expand Down
4 changes: 3 additions & 1 deletion cache/refs.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"golang.org/x/sync/errgroup"
)

var additionalAnnotations = append(compression.EStargzAnnotations, containerdUncompressed)

// Ref is a reference to cacheable objects.
type Ref interface {
Mountable
Expand Down Expand Up @@ -878,7 +880,7 @@ func filterAnnotationsForSave(a map[string]string) (b map[string]string) {
if a == nil {
return nil
}
for _, k := range append(compression.EStargzAnnotations, containerdUncompressed) {
for _, k := range additionalAnnotations {
v, ok := a[k]
if !ok {
continue
Expand Down
2 changes: 1 addition & 1 deletion cache/remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ func (sr *immutableRef) getRemote(ctx context.Context, createIfNeeded bool, refC
}
}

if refCfg.Compression.Force {
if needsForceCompression(ctx, sr.cm.ContentStore, desc, refCfg) {
if needs, err := refCfg.Compression.Type.NeedsConversion(ctx, sr.cm.ContentStore, desc); err != nil {
return nil, err
} else if needs {
Expand Down
Loading

0 comments on commit 5b7315c

Please sign in to comment.