diff --git a/kythe/go/platform/kzip/BUILD b/kythe/go/platform/kzip/BUILD index 9bc656ab5e..3aff9d7ffa 100644 --- a/kythe/go/platform/kzip/BUILD +++ b/kythe/go/platform/kzip/BUILD @@ -14,6 +14,7 @@ go_library( "//kythe/proto:go_go_proto", "//kythe/proto:java_go_proto", "@com_github_golang_protobuf//jsonpb:go_default_library_gen", + "@com_github_golang_protobuf//proto:go_default_library", "@org_bitbucket_creachadair_stringset//:go_default_library", "@org_golang_x_sync//errgroup:go_default_library", ], @@ -22,6 +23,10 @@ go_library( go_test( name = "kzip_test", srcs = ["kzip_test.go"], + data = [ + "missing-pbunit.kzip", + "missing-unit.kzip", + ], deps = [ ":kzip", "//kythe/proto:analysis_go_proto", diff --git a/kythe/go/platform/kzip/kzip.go b/kythe/go/platform/kzip/kzip.go index 06ddf9978a..1e120c51d1 100644 --- a/kythe/go/platform/kzip/kzip.go +++ b/kythe/go/platform/kzip/kzip.go @@ -74,6 +74,7 @@ import ( "os" "path" "sort" + "strconv" "strings" "sync" "time" @@ -82,6 +83,7 @@ import ( "bitbucket.org/creachadair/stringset" "github.com/golang/protobuf/jsonpb" + "github.com/golang/protobuf/proto" "golang.org/x/sync/errgroup" apb "kythe.io/kythe/proto/analysis_go_proto" @@ -95,6 +97,32 @@ import ( _ "kythe.io/kythe/proto/java_go_proto" ) +// Type Encoding describes how compilation units will be encoded when written to a kzip. +type Encoding int + +const ( + EncodingJSON Encoding = 1 + EncodingProto Encoding = 2 + EncodingAll Encoding = EncodingJSON | EncodingProto + + prefixJSON = "units" + prefixProto = "pbunits" +) + +// String stringifies an Encoding +func (e Encoding) String() string { + switch { + case e == EncodingAll: + return "All" + case e == EncodingJSON: + return "JSON" + case e == EncodingProto: + return "Proto" + default: + return "Encoding" + strconv.FormatInt(int64(e), 10) + } +} + // A Reader permits reading and scanning compilation records and file contents // stored in a .kzip archive. The Lookup and Scan methods are mutually safe for // concurrent use by multiple goroutines. @@ -105,6 +133,10 @@ type Reader struct { // directory, but it's not required by the spec. Use whatever name the // archive actually specifies in the leading directory. root string + + // The prefix used for the compilation unit directory; one of + // prefixJSON or prefixProto + unitsPrefix string } // NewReader constructs a new Reader that consumes zip data from r, whose total @@ -124,14 +156,59 @@ func NewReader(r io.ReaderAt, size int64) (*Reader, error) { } else if fi := archive.File[0].FileInfo(); !fi.IsDir() { return nil, errors.New("archive root is not a directory") } - + root := archive.File[0].Name + pref, err := unitPrefix(root, archive.File) + if err != nil { + return nil, err + } return &Reader{ - zip: archive, - root: archive.File[0].Name, + zip: archive, + root: root, + unitsPrefix: pref, }, nil } -func (r *Reader) unitPath(digest string) string { return path.Join(r.root, "units", digest) } +func unitPrefix(root string, fs []*zip.File) (string, error) { + jsonDir := root + prefixJSON + "/" + protoDir := root + prefixProto + "/" + j := sort.Search(len(fs), func(i int) bool { + return fs[i].Name > jsonDir + }) + hasJSON := j < len(fs) && strings.HasPrefix(fs[j].Name, jsonDir) + p := sort.Search(len(fs), func(i int) bool { + return fs[i].Name > protoDir + }) + hasProto := p < len(fs) && strings.HasPrefix(fs[p].Name, protoDir) + if !hasJSON && !hasProto { + return "", fmt.Errorf("no compilation units found") + } + if hasJSON && hasProto { + // validate that they have identical units based on hash + for p < len(fs) && j < len(fs) { + ispb := strings.HasPrefix(fs[p].Name, protoDir) + isjson := strings.HasPrefix(fs[j].Name, jsonDir) + if ispb != isjson { + return "", fmt.Errorf("both proto and JSON units found but are not identical") + } + if !ispb { + break + } + pdigest := strings.Split(fs[p].Name, "/")[2] + jdigest := strings.Split(fs[j].Name, "/")[2] + if pdigest != jdigest { + return "", fmt.Errorf("both proto and JSON units found but are not identical") + } + p++ + j++ + } + } + if hasProto { + return prefixProto, nil + } + return prefixJSON, nil +} + +func (r *Reader) unitPath(digest string) string { return path.Join(r.root, r.unitsPrefix, digest) } func (r *Reader) filePath(digest string) string { return path.Join(r.root, "files", digest) } // ErrDigestNotFound is returned when a requested compilation unit or file @@ -142,15 +219,22 @@ var ErrDigestNotFound = errors.New("digest not found") // multiple times. var ErrUnitExists = errors.New("unit already exists") -func readUnit(digest string, f *zip.File) (*Unit, error) { +func (r *Reader) readUnit(digest string, f *zip.File) (*Unit, error) { rc, err := f.Open() if err != nil { return nil, err } defer rc.Close() - var msg apb.IndexedCompilation - if err := jsonpb.Unmarshal(rc, &msg); err != nil { + if r.unitsPrefix == prefixProto { + rec := make([]byte, f.UncompressedSize64) + if _, err = io.ReadFull(rc, rec); err != nil { + return nil, err + } + if err := proto.Unmarshal(rec, &msg); err != nil { + return nil, fmt.Errorf("error unmarshaling for %s: %s", digest, err) + } + } else if err := jsonpb.Unmarshal(rc, &msg); err != nil { return nil, err } return &Unit{ @@ -160,8 +244,8 @@ func readUnit(digest string, f *zip.File) (*Unit, error) { }, nil } -// firstIndex returns the first index in the archive's file list whose path is -// greater than or equal to with prefix, or -1 if no such index exists. +// firstIndex returns the first index in the archive's file list whose +// path starts with prefix, or -1 if no such index exists. func (r *Reader) firstIndex(prefix string) int { fs := r.zip.File n := sort.Search(len(fs), func(i int) bool { @@ -170,6 +254,9 @@ func (r *Reader) firstIndex(prefix string) int { if n >= len(fs) { return -1 } + if !strings.HasPrefix(fs[n].Name, prefix) { + return -1 + } return n } @@ -177,9 +264,10 @@ func (r *Reader) firstIndex(prefix string) int { // the requested digest is not in the archive, ErrDigestNotFound is returned. func (r *Reader) Lookup(unitDigest string) (*Unit, error) { needle := r.unitPath(unitDigest) - if pos := r.firstIndex(needle); pos >= 0 { + pos := r.firstIndex(needle) + if pos >= 0 { if f := r.zip.File[pos]; f.Name == needle { - return readUnit(unitDigest, f) + return r.readUnit(unitDigest, f) } } return nil, ErrDigestNotFound @@ -198,6 +286,26 @@ func ReadConcurrency(n int) ScanOption { return readConcurrency(n) } +func (r *Reader) canonicalUnits() (string, []*zip.File) { + prefix := r.unitPath("") + "/" + pos := r.firstIndex(prefix) + if pos < 0 { + return "", nil + } + var res []*zip.File + for _, file := range r.zip.File[pos:] { + if !strings.HasPrefix(file.Name, prefix) { + break + } + if file.Name == prefix { + continue // tolerate an empty units directory entry + } + res = append(res, file) + + } + return prefix, res +} + // Scan scans all the compilations stored in the archive, and invokes f for // each compilation record. If f reports an error, the scan is terminated and // that error is propagated to the caller of Scan. At most 1 invocation of f @@ -215,9 +323,8 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error { } } - prefix := r.unitPath("") + "/" - pos := r.firstIndex(prefix) - if pos < 0 { + prefix, fileUnits := r.canonicalUnits() + if len(fileUnits) == 0 { return nil } @@ -226,14 +333,10 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error { g, ctx := errgroup.WithContext(ctx) files := make(chan *zip.File) + g.Go(func() error { defer close(files) - for _, file := range r.zip.File[pos:] { - if !strings.HasPrefix(file.Name, prefix) { - break - } else if file.Name == prefix { - continue // tolerate an empty units directory entry - } + for _, file := range fileUnits { select { case <-ctx.Done(): return nil @@ -242,7 +345,6 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error { } return nil }) - units := make(chan *Unit) var wg sync.WaitGroup for i := 0; i < concurrency; i++ { @@ -251,7 +353,7 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error { defer wg.Done() for file := range files { digest := strings.TrimPrefix(file.Name, prefix) - unit, err := readUnit(digest, file) + unit, err := r.readUnit(digest, file) if err != nil { return err } @@ -316,11 +418,23 @@ type Writer struct { fd stringset.Set // file digests already written ud stringset.Set // unit digests already written c io.Closer // a closer for the underlying writer (may be nil) + + encoding Encoding // What encoding to use +} + +// WriterOption describes options when creating a Writer +type WriterOption func(*Writer) + +// WithEncoding sets the encoding to be used by a Writer +func WithEncoding(e Encoding) WriterOption { + return func(w *Writer) { + w.encoding = e + } } // NewWriter constructs a new empty Writer that delivers output to w. The // AddUnit and AddFile methods are safe for use by concurrent goroutines. -func NewWriter(w io.Writer) (*Writer, error) { +func NewWriter(w io.Writer, options ...WriterOption) (*Writer, error) { archive := zip.NewWriter(w) // Create an entry for the root directory, which must be first. root := &zip.FileHeader{ @@ -334,17 +448,22 @@ func NewWriter(w io.Writer) (*Writer, error) { } archive.SetComment("Kythe kzip archive") - return &Writer{ - zip: archive, - fd: stringset.New(), - ud: stringset.New(), - }, nil + kw := &Writer{ + zip: archive, + fd: stringset.New(), + ud: stringset.New(), + encoding: EncodingJSON, + } + for _, opt := range options { + opt(kw) + } + return kw, nil } // NewWriteCloser behaves as NewWriter, but arranges that when the *Writer is // closed it also closes wc. -func NewWriteCloser(wc io.WriteCloser) (*Writer, error) { - w, err := NewWriter(wc) +func NewWriteCloser(wc io.WriteCloser, options ...WriterOption) (*Writer, error) { + w, err := NewWriter(wc, options...) if err == nil { w.c = wc } @@ -374,15 +493,34 @@ func (w *Writer) AddUnit(cu *apb.CompilationUnit, index *apb.IndexedCompilation_ return digest, ErrUnitExists } - f, err := w.zip.CreateHeader(newFileHeader("root", "units", digest)) - if err != nil { - return "", err + if w.encoding&EncodingJSON != 0 { + f, err := w.zip.CreateHeader(newFileHeader("root", prefixJSON, digest)) + if err != nil { + return "", err + } + if err := toJSON.Marshal(f, &apb.IndexedCompilation{ + Unit: unit.Proto, + Index: index, + }); err != nil { + return "", err + } } - if err := toJSON.Marshal(f, &apb.IndexedCompilation{ - Unit: unit.Proto, - Index: index, - }); err != nil { - return "", err + if w.encoding&EncodingProto != 0 { + f, err := w.zip.CreateHeader(newFileHeader("root", prefixProto, digest)) + if err != nil { + return "", err + } + rec, err := proto.Marshal(&apb.IndexedCompilation{ + Unit: unit.Proto, + Index: index, + }) + if err != nil { + return "", err + } + _, err = f.Write(rec) + if err != nil { + return "", err + } } w.ud.Add(digest) return digest, nil diff --git a/kythe/go/platform/kzip/kzip_test.go b/kythe/go/platform/kzip/kzip_test.go index 7013f96c8e..a710854725 100644 --- a/kythe/go/platform/kzip/kzip_test.go +++ b/kythe/go/platform/kzip/kzip_test.go @@ -22,6 +22,7 @@ import ( "bytes" "errors" "fmt" + "io/ioutil" "strings" "testing" @@ -32,11 +33,23 @@ import ( spb "kythe.io/kythe/proto/storage_go_proto" ) -func TestRoundTrip(t *testing.T) { +func TestRoundTrip_Proto(t *testing.T) { + testRoundTrip(kzip.EncodingProto, t) +} + +func TestRoundTrip_All(t *testing.T) { + testRoundTrip(kzip.EncodingAll, t) +} + +func TestRoundTrip_JSON(t *testing.T) { + testRoundTrip(kzip.EncodingJSON, t) +} + +func testRoundTrip(encoding kzip.Encoding, t *testing.T) { buf := bytes.NewBuffer(nil) // Create a kzip with some interesting data. - w, err := kzip.NewWriter(buf) + w, err := kzip.NewWriter(buf, kzip.WithEncoding(encoding)) if err != nil { t.Fatalf("NewWriter: unexpected error: %v", err) } @@ -356,3 +369,26 @@ func TestScanConcurrency(t *testing.T) { t.Errorf("Scan found %d units, want %d", numUnits, N) } } + +func TestMissingJSONUnitFails(t *testing.T) { + b, err := ioutil.ReadFile("missing-unit.kzip") + if err != nil { + t.Fatalf("Unable to read test file missing-unit.kzip") + } + _, err = kzip.NewReader(bytes.NewReader(b), int64(len(b))) + if err == nil || err.Error() != "both proto and JSON units found but are not identical" { + t.Errorf("Unexpected error: %s", err) + } + +} + +func TestMissingProtoUnitFails(t *testing.T) { + b, err := ioutil.ReadFile("missing-pbunit.kzip") + if err != nil { + t.Fatalf("Unable to read test file missing-pbunit.kzip") + } + _, err = kzip.NewReader(bytes.NewReader(b), int64(len(b))) + if err == nil || err.Error() != "both proto and JSON units found but are not identical" { + t.Errorf("Unexpected error: %s", err) + } +} diff --git a/kythe/go/platform/kzip/missing-pbunit.kzip b/kythe/go/platform/kzip/missing-pbunit.kzip new file mode 100644 index 0000000000..dc7e6f1d8e Binary files /dev/null and b/kythe/go/platform/kzip/missing-pbunit.kzip differ diff --git a/kythe/go/platform/kzip/missing-unit.kzip b/kythe/go/platform/kzip/missing-unit.kzip new file mode 100644 index 0000000000..722a748bd0 Binary files /dev/null and b/kythe/go/platform/kzip/missing-unit.kzip differ diff --git a/kythe/go/platform/tools/kzip/flags/BUILD b/kythe/go/platform/tools/kzip/flags/BUILD new file mode 100644 index 0000000000..2b296e06a1 --- /dev/null +++ b/kythe/go/platform/tools/kzip/flags/BUILD @@ -0,0 +1,11 @@ +load("//tools:build_rules/shims.bzl", "go_library") + +package(default_visibility = ["//kythe:default_visibility"]) + +go_library( + name = "flags", + srcs = ["encoding.go"], + deps = [ + "//kythe/go/platform/kzip", + ], +) diff --git a/kythe/go/platform/tools/kzip/flags/encoding.go b/kythe/go/platform/tools/kzip/flags/encoding.go new file mode 100644 index 0000000000..4ddc62075d --- /dev/null +++ b/kythe/go/platform/tools/kzip/flags/encoding.go @@ -0,0 +1,48 @@ +/* + * Copyright 2019 The Kythe Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package flags provides type EncodingFlag for use as a flag to specify Encoding. +package flags + +import ( + "fmt" + "strings" + + "kythe.io/kythe/go/platform/kzip" +) + +// EncodingFlag encapsulates an Encoding for use as a flag. +type EncodingFlag struct { + kzip.Encoding +} + +// Set updates an Encoding based on the text value +func (e *EncodingFlag) Set(v string) error { + v = strings.ToUpper(v) + switch { + case v == "ALL": + *e = EncodingFlag{kzip.EncodingAll} + return nil + case v == "JSON": + *e = EncodingFlag{kzip.EncodingJSON} + return nil + case v == "PROTO": + *e = EncodingFlag{kzip.EncodingProto} + return nil + default: + return fmt.Errorf("Unknown encoding %s", e) + } +} diff --git a/kythe/go/platform/tools/kzip/mergecmd/BUILD b/kythe/go/platform/tools/kzip/mergecmd/BUILD index 2ac04cb2c3..1ba4f7c275 100644 --- a/kythe/go/platform/tools/kzip/mergecmd/BUILD +++ b/kythe/go/platform/tools/kzip/mergecmd/BUILD @@ -7,6 +7,7 @@ go_library( srcs = ["mergecmd.go"], deps = [ "//kythe/go/platform/kzip", + "//kythe/go/platform/tools/kzip/flags", "//kythe/go/platform/vfs", "//kythe/go/util/cmdutil", "@com_github_google_subcommands//:go_default_library", diff --git a/kythe/go/platform/tools/kzip/mergecmd/mergecmd.go b/kythe/go/platform/tools/kzip/mergecmd/mergecmd.go index 7bb7f6f971..1094865618 100644 --- a/kythe/go/platform/tools/kzip/mergecmd/mergecmd.go +++ b/kythe/go/platform/tools/kzip/mergecmd/mergecmd.go @@ -26,20 +26,21 @@ import ( "os" "path/filepath" + "bitbucket.org/creachadair/stringset" "kythe.io/kythe/go/platform/kzip" + "kythe.io/kythe/go/platform/tools/kzip/flags" "kythe.io/kythe/go/platform/vfs" "kythe.io/kythe/go/util/cmdutil" - "bitbucket.org/creachadair/stringset" - "github.com/google/subcommands" ) type mergeCommand struct { cmdutil.Info - output string - append bool + output string + append bool + encoding flags.EncodingFlag } // New creates a new subcommand for merging kzip files. @@ -54,6 +55,7 @@ func New() subcommands.Command { func (c *mergeCommand) SetFlags(fs *flag.FlagSet) { fs.StringVar(&c.output, "output", "", "Path to output kzip file") fs.BoolVar(&c.append, "append", false, "Whether to additionally merge the contents of the existing output file, if it exists") + fs.Var(&c.encoding, "encoding", "Encoding to use on output, one of JSON, PROTO, or BOTH") } // Execute implements the subcommands interface and merges the provided files. @@ -61,6 +63,7 @@ func (c *mergeCommand) Execute(ctx context.Context, fs *flag.FlagSet, _ ...inter if c.output == "" { return c.Fail("required --output path missing") } + opt := kzip.WithEncoding(c.encoding.Encoding) dir, file := filepath.Split(c.output) if dir == "" { dir = "." @@ -86,7 +89,7 @@ func (c *mergeCommand) Execute(ctx context.Context, fs *flag.FlagSet, _ ...inter } } } - if err := mergeArchives(ctx, tmpOut, archives); err != nil { + if err := mergeArchives(ctx, tmpOut, archives, opt); err != nil { return c.Fail("Error merging archives: %v", err) } if err := vfs.Rename(ctx, tmpName, c.output); err != nil { @@ -95,8 +98,8 @@ func (c *mergeCommand) Execute(ctx context.Context, fs *flag.FlagSet, _ ...inter return subcommands.ExitSuccess } -func mergeArchives(ctx context.Context, out io.WriteCloser, archives []string) error { - wr, err := kzip.NewWriteCloser(out) +func mergeArchives(ctx context.Context, out io.WriteCloser, archives []string, opts ...kzip.WriterOption) error { + wr, err := kzip.NewWriteCloser(out, opts...) if err != nil { out.Close() return fmt.Errorf("error creating writer: %v", err)