Skip to content

Commit

Permalink
estargz: support lossless compression
Browse files Browse the repository at this point in the history
Signed-off-by: Kohei Tokunaga <ktokunaga.mail@gmail.com>
  • Loading branch information
ktock committed Sep 8, 2021
1 parent 1d81483 commit f5999ba
Show file tree
Hide file tree
Showing 9 changed files with 404 additions and 242 deletions.
2 changes: 1 addition & 1 deletion cmd/ctr-remote/commands/get-toc-digest.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ var GetTOCDigestCommand = cli.Command{
decompressor = new(zstdchunked.Decompressor)
}

tocOff, tocSize, err := decompressor.ParseFooter(footer)
_, tocOff, tocSize, err := decompressor.ParseFooter(footer)
if err != nil {
return errors.Wrapf(err, "error parsing footer")
}
Expand Down
4 changes: 2 additions & 2 deletions estargz/build_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ func TestSort(t *testing.T) {
if tt.allowMissedFiles != nil {
opts = append(opts, WithAllowPrioritizeNotFound(&missedFiles))
}
rc, err := Build(compressBlob(t, buildTarStatic(t, tt.in, tarprefix), srcCompression),
rc, err := Build(compressBlob(t, buildTar(t, tt.in, tarprefix), srcCompression),
append(opts, WithPrioritizedFiles(pfiles))...)
if tt.wantFail {
if err != nil {
Expand Down Expand Up @@ -406,7 +406,7 @@ func TestSort(t *testing.T) {
gotTar := tar.NewReader(zr)

// Compare all
wantTar := tar.NewReader(buildTarStatic(t, tt.want, tarprefix))
wantTar := tar.NewReader(buildTar(t, tt.want, tarprefix))
for {
// Fetch and parse next header.
gotH, wantH, err := next(t, gotTar, wantTar)
Expand Down
76 changes: 68 additions & 8 deletions estargz/estargz.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Open(sr *io.SectionReader, opt ...OpenOption) (*Reader, error) {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
maybeTocBytes := footer[:fOffset]
tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
_, tocOffset, tocSize, err := d.ParseFooter(footer[fOffset:])
if err != nil {
allErr = append(allErr, err)
continue
Expand Down Expand Up @@ -187,7 +187,7 @@ func OpenFooter(sr *io.SectionReader) (tocOffset int64, footerSize int64, rErr e
for _, d := range []Decompressor{new(GzipDecompressor), new(legacyGzipDecompressor)} {
fSize := d.FooterSize()
fOffset := positive(int64(len(footer)) - fSize)
tocOffset, _, err := d.ParseFooter(footer[fOffset:])
_, tocOffset, _, err := d.ParseFooter(footer[fOffset:])
if err == nil {
return tocOffset, fSize, err
}
Expand Down Expand Up @@ -591,6 +591,11 @@ type currentCompressionWriter struct{ w *Writer }

func (ccw currentCompressionWriter) Write(p []byte) (int, error) {
ccw.w.diffHash.Write(p)
if ccw.w.gz == nil {
if err := ccw.w.condOpenGz(); err != nil {
return 0, err
}
}
return ccw.w.gz.Write(p)
}

Expand All @@ -601,6 +606,25 @@ func (w *Writer) chunkSize() int {
return w.ChunkSize
}

// Unpack decompresses the given estargz blob and returns a ReadCloser of the tar blob.
// TOC JSON and footer are removed.
func Unpack(sr *io.SectionReader, c Decompressor) (io.ReadCloser, error) {
footerSize := c.FooterSize()
if sr.Size() < footerSize {
return nil, fmt.Errorf("blob is too small; %d < %d", sr.Size(), footerSize)
}
footerOffset := sr.Size() - footerSize
footer := make([]byte, footerSize)
if _, err := sr.ReadAt(footer, footerOffset); err != nil {
return nil, err
}
blobPayloadSize, _, _, err := c.ParseFooter(footer)
if err != nil {
return nil, errors.Wrapf(err, "failed to parse footer")
}
return c.Reader(io.LimitReader(sr, blobPayloadSize))
}

// NewWriter returns a new stargz writer (gzip-based) writing to w.
//
// The writer must be closed to write its trailing table of contents.
Expand Down Expand Up @@ -696,17 +720,50 @@ func (w *Writer) condOpenGz() (err error) {
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be gzip compressed.
// always be compressed by the specified compressor.
func (w *Writer) AppendTar(r io.Reader) error {
var src io.Reader
br := bufio.NewReader(r)
var tr *tar.Reader
if isGzip(br) {
// NewReader can't fail if isGzip returned true.
zr, _ := gzip.NewReader(br)
tr = tar.NewReader(zr)
src = zr
} else {
src = io.Reader(br)
}
// write restructured tar which doesn't contain existing TOC entry.
// (header bytes may change)
return w.appendTar(src, currentCompressionWriter{w}, true)
}

// AppendTarLossLess reads the tar or tar.gz file from r and appends
// each of its contents to w.
//
// The input r can optionally be gzip compressed but the output will
// always be compressed by the specified compressor.
//
// The difference of this func with AppendTar is that this writes
// the input tar stream into w without any modification (e.g. to header bytes).
//
// Note that if the input tar stream already contains TOC JSON, this returns
// error because w cannot overwrite the TOC JSON to the one generated by w without
// lossy modification. To avoid this error, if the input stream is known to be stargz/estargz,
// you shoud decompress it and remove TOC JSON in advance.
func (w *Writer) AppendTarLossLess(r io.Reader) error {
var src io.Reader
br := bufio.NewReader(r)
if isGzip(br) {
zr, _ := gzip.NewReader(br)
src = zr
} else {
tr = tar.NewReader(br)
src = io.Reader(br)
}
// stream out the src tar to the writer as-is.
return w.appendTar(io.TeeReader(src, currentCompressionWriter{w}), ioutil.Discard, false)
}

func (w *Writer) appendTar(src io.Reader, dst io.Writer, allowTOCExists bool) error {
tr := tar.NewReader(src)
for {
h, err := tr.Next()
if err == io.EOF {
Expand All @@ -715,10 +772,13 @@ func (w *Writer) AppendTar(r io.Reader) error {
if err != nil {
return fmt.Errorf("error reading from source tar: tar.Reader.Next: %v", err)
}
if h.Name == TOCTarName {
if cleanEntryName(h.Name) == TOCTarName {
// It is possible for a layer to be "stargzified" twice during the
// distribution lifecycle. So we reserve "TOCTarName" here to avoid
// duplicated entries in the resulting layer.
if !allowTOCExists {
return fmt.Errorf("existing TOC JSON is not allowed; decompress layer before append")
}
continue
}

Expand All @@ -744,7 +804,7 @@ func (w *Writer) AppendTar(r io.Reader) error {
if err := w.condOpenGz(); err != nil {
return err
}
tw := tar.NewWriter(currentCompressionWriter{w})
tw := tar.NewWriter(dst)
if err := tw.WriteHeader(h); err != nil {
return err
}
Expand Down
30 changes: 15 additions & 15 deletions estargz/gzip.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,31 +124,31 @@ func (gz *GzipDecompressor) ParseTOC(r io.Reader) (toc *JTOC, tocDgst digest.Dig
return parseTOCEStargz(r)
}

func (gz *GzipDecompressor) ParseFooter(p []byte) (tocOffset, tocSize int64, err error) {
func (gz *GzipDecompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
if len(p) != FooterSize {
return 0, 0, fmt.Errorf("invalid length %d cannot be parsed", len(p))
return 0, 0, 0, fmt.Errorf("invalid length %d cannot be parsed", len(p))
}
zr, err := gzip.NewReader(bytes.NewReader(p))
if err != nil {
return 0, 0, err
return 0, 0, 0, err
}
defer zr.Close()
extra := zr.Header.Extra
si1, si2, subfieldlen, subfield := extra[0], extra[1], extra[2:4], extra[4:]
if si1 != 'S' || si2 != 'G' {
return 0, 0, fmt.Errorf("invalid subfield IDs: %q, %q; want E, S", si1, si2)
return 0, 0, 0, fmt.Errorf("invalid subfield IDs: %q, %q; want E, S", si1, si2)
}
if slen := binary.LittleEndian.Uint16(subfieldlen); slen != uint16(16+len("STARGZ")) {
return 0, 0, fmt.Errorf("invalid length of subfield %d; want %d", slen, 16+len("STARGZ"))
return 0, 0, 0, fmt.Errorf("invalid length of subfield %d; want %d", slen, 16+len("STARGZ"))
}
if string(subfield[16:]) != "STARGZ" {
return 0, 0, fmt.Errorf("STARGZ magic string must be included in the footer subfield")
return 0, 0, 0, fmt.Errorf("STARGZ magic string must be included in the footer subfield")
}
tocOffset, err = strconv.ParseInt(string(subfield[:16]), 16, 64)
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
}
return tocOffset, 0, nil
return tocOffset, tocOffset, 0, nil
}

func (gz *GzipDecompressor) FooterSize() int64 {
Expand All @@ -165,27 +165,27 @@ func (gz *legacyGzipDecompressor) ParseTOC(r io.Reader) (toc *JTOC, tocDgst dige
return parseTOCEStargz(r)
}

func (gz *legacyGzipDecompressor) ParseFooter(p []byte) (tocOffset, tocSize int64, err error) {
func (gz *legacyGzipDecompressor) ParseFooter(p []byte) (blobPayloadSize, tocOffset, tocSize int64, err error) {
if len(p) != legacyFooterSize {
return 0, 0, fmt.Errorf("legacy: invalid length %d cannot be parsed", len(p))
return 0, 0, 0, fmt.Errorf("legacy: invalid length %d cannot be parsed", len(p))
}
zr, err := gzip.NewReader(bytes.NewReader(p))
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to get footer gzip reader")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to get footer gzip reader")
}
defer zr.Close()
extra := zr.Header.Extra
if len(extra) != 16+len("STARGZ") {
return 0, 0, fmt.Errorf("legacy: invalid stargz's extra field size")
return 0, 0, 0, fmt.Errorf("legacy: invalid stargz's extra field size")
}
if string(extra[16:]) != "STARGZ" {
return 0, 0, fmt.Errorf("legacy: magic string STARGZ not found")
return 0, 0, 0, fmt.Errorf("legacy: magic string STARGZ not found")
}
tocOffset, err = strconv.ParseInt(string(extra[:16]), 16, 64)
if err != nil {
return 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
return 0, 0, 0, errors.Wrapf(err, "legacy: failed to parse toc offset")
}
return tocOffset, 0, nil
return tocOffset, tocOffset, 0, nil
}

func (gz *legacyGzipDecompressor) FooterSize() int64 {
Expand Down
4 changes: 2 additions & 2 deletions estargz/gzip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func checkFooter(t *testing.T, off int64) {
if len(footer) != FooterSize {
t.Fatalf("for offset %v, footer length was %d, not expected %d. got bytes: %q", off, len(footer), FooterSize, footer)
}
got, _, err := (&GzipDecompressor{}).ParseFooter(footer)
_, got, _, err := (&GzipDecompressor{}).ParseFooter(footer)
if err != nil {
t.Fatalf("failed to parse footer for offset %d, footer: %x: err: %v",
off, footer, err)
Expand All @@ -125,7 +125,7 @@ func checkLegacyFooter(t *testing.T, off int64) {
if len(footer) != legacyFooterSize {
t.Fatalf("for offset %v, footer length was %d, not expected %d. got bytes: %q", off, len(footer), legacyFooterSize, footer)
}
got, _, err := (&legacyGzipDecompressor{}).ParseFooter(footer)
_, got, _, err := (&legacyGzipDecompressor{}).ParseFooter(footer)
if err != nil {
t.Fatalf("failed to parse legacy footer for offset %d, footer: %x: err: %v",
off, footer, err)
Expand Down

0 comments on commit f5999ba

Please sign in to comment.