Skip to content

Commit

Permalink
feat(kzip): add support for encoding compilation units as proto and/o…
Browse files Browse the repository at this point in the history
…r JSON in kzips. (#3836)

This uses a separate directory for pb encoded units, and validates that a kzip has proto, JSON, or both; if both, ensure the digests are the same. `merge` command supports specifying encoding.
  • Loading branch information
jaysachs committed Jun 26, 2019
1 parent 6f83e91 commit 3643205
Show file tree
Hide file tree
Showing 9 changed files with 289 additions and 47 deletions.
5 changes: 5 additions & 0 deletions kythe/go/platform/kzip/BUILD
Expand Up @@ -14,6 +14,7 @@ go_library(
"//kythe/proto:go_go_proto",
"//kythe/proto:java_go_proto",
"@com_github_golang_protobuf//jsonpb:go_default_library_gen",
"@com_github_golang_protobuf//proto:go_default_library",
"@org_bitbucket_creachadair_stringset//:go_default_library",
"@org_golang_x_sync//errgroup:go_default_library",
],
Expand All @@ -22,6 +23,10 @@ go_library(
go_test(
name = "kzip_test",
srcs = ["kzip_test.go"],
data = [
"missing-pbunit.kzip",
"missing-unit.kzip",
],
deps = [
":kzip",
"//kythe/proto:analysis_go_proto",
Expand Down
214 changes: 176 additions & 38 deletions kythe/go/platform/kzip/kzip.go
Expand Up @@ -74,6 +74,7 @@ import (
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -82,6 +83,7 @@ import (

"bitbucket.org/creachadair/stringset"
"github.com/golang/protobuf/jsonpb"
"github.com/golang/protobuf/proto"
"golang.org/x/sync/errgroup"

apb "kythe.io/kythe/proto/analysis_go_proto"
Expand All @@ -95,6 +97,32 @@ import (
_ "kythe.io/kythe/proto/java_go_proto"
)

// Type Encoding describes how compilation units will be encoded when written to a kzip.
type Encoding int

const (
EncodingJSON Encoding = 1
EncodingProto Encoding = 2
EncodingAll Encoding = EncodingJSON | EncodingProto

prefixJSON = "units"
prefixProto = "pbunits"
)

// String stringifies an Encoding
func (e Encoding) String() string {
switch {
case e == EncodingAll:
return "All"
case e == EncodingJSON:
return "JSON"
case e == EncodingProto:
return "Proto"
default:
return "Encoding" + strconv.FormatInt(int64(e), 10)
}
}

// A Reader permits reading and scanning compilation records and file contents
// stored in a .kzip archive. The Lookup and Scan methods are mutually safe for
// concurrent use by multiple goroutines.
Expand All @@ -105,6 +133,10 @@ type Reader struct {
// directory, but it's not required by the spec. Use whatever name the
// archive actually specifies in the leading directory.
root string

// The prefix used for the compilation unit directory; one of
// prefixJSON or prefixProto
unitsPrefix string
}

// NewReader constructs a new Reader that consumes zip data from r, whose total
Expand All @@ -124,14 +156,59 @@ func NewReader(r io.ReaderAt, size int64) (*Reader, error) {
} else if fi := archive.File[0].FileInfo(); !fi.IsDir() {
return nil, errors.New("archive root is not a directory")
}

root := archive.File[0].Name
pref, err := unitPrefix(root, archive.File)
if err != nil {
return nil, err
}
return &Reader{
zip: archive,
root: archive.File[0].Name,
zip: archive,
root: root,
unitsPrefix: pref,
}, nil
}

func (r *Reader) unitPath(digest string) string { return path.Join(r.root, "units", digest) }
func unitPrefix(root string, fs []*zip.File) (string, error) {
jsonDir := root + prefixJSON + "/"
protoDir := root + prefixProto + "/"
j := sort.Search(len(fs), func(i int) bool {
return fs[i].Name > jsonDir
})
hasJSON := j < len(fs) && strings.HasPrefix(fs[j].Name, jsonDir)
p := sort.Search(len(fs), func(i int) bool {
return fs[i].Name > protoDir
})
hasProto := p < len(fs) && strings.HasPrefix(fs[p].Name, protoDir)
if !hasJSON && !hasProto {
return "", fmt.Errorf("no compilation units found")
}
if hasJSON && hasProto {
// validate that they have identical units based on hash
for p < len(fs) && j < len(fs) {
ispb := strings.HasPrefix(fs[p].Name, protoDir)
isjson := strings.HasPrefix(fs[j].Name, jsonDir)
if ispb != isjson {
return "", fmt.Errorf("both proto and JSON units found but are not identical")
}
if !ispb {
break
}
pdigest := strings.Split(fs[p].Name, "/")[2]
jdigest := strings.Split(fs[j].Name, "/")[2]
if pdigest != jdigest {
return "", fmt.Errorf("both proto and JSON units found but are not identical")
}
p++
j++
}
}
if hasProto {
return prefixProto, nil
}
return prefixJSON, nil
}

func (r *Reader) unitPath(digest string) string { return path.Join(r.root, r.unitsPrefix, digest) }
func (r *Reader) filePath(digest string) string { return path.Join(r.root, "files", digest) }

// ErrDigestNotFound is returned when a requested compilation unit or file
Expand All @@ -142,15 +219,22 @@ var ErrDigestNotFound = errors.New("digest not found")
// multiple times.
var ErrUnitExists = errors.New("unit already exists")

func readUnit(digest string, f *zip.File) (*Unit, error) {
func (r *Reader) readUnit(digest string, f *zip.File) (*Unit, error) {
rc, err := f.Open()
if err != nil {
return nil, err
}
defer rc.Close()

var msg apb.IndexedCompilation
if err := jsonpb.Unmarshal(rc, &msg); err != nil {
if r.unitsPrefix == prefixProto {
rec := make([]byte, f.UncompressedSize64)
if _, err = io.ReadFull(rc, rec); err != nil {
return nil, err
}
if err := proto.Unmarshal(rec, &msg); err != nil {
return nil, fmt.Errorf("error unmarshaling for %s: %s", digest, err)
}
} else if err := jsonpb.Unmarshal(rc, &msg); err != nil {
return nil, err
}
return &Unit{
Expand All @@ -160,8 +244,8 @@ func readUnit(digest string, f *zip.File) (*Unit, error) {
}, nil
}

// firstIndex returns the first index in the archive's file list whose path is
// greater than or equal to with prefix, or -1 if no such index exists.
// firstIndex returns the first index in the archive's file list whose
// path starts with prefix, or -1 if no such index exists.
func (r *Reader) firstIndex(prefix string) int {
fs := r.zip.File
n := sort.Search(len(fs), func(i int) bool {
Expand All @@ -170,16 +254,20 @@ func (r *Reader) firstIndex(prefix string) int {
if n >= len(fs) {
return -1
}
if !strings.HasPrefix(fs[n].Name, prefix) {
return -1
}
return n
}

// Lookup returns the specified compilation from the archive, if it exists. If
// the requested digest is not in the archive, ErrDigestNotFound is returned.
func (r *Reader) Lookup(unitDigest string) (*Unit, error) {
needle := r.unitPath(unitDigest)
if pos := r.firstIndex(needle); pos >= 0 {
pos := r.firstIndex(needle)
if pos >= 0 {
if f := r.zip.File[pos]; f.Name == needle {
return readUnit(unitDigest, f)
return r.readUnit(unitDigest, f)
}
}
return nil, ErrDigestNotFound
Expand All @@ -198,6 +286,26 @@ func ReadConcurrency(n int) ScanOption {
return readConcurrency(n)
}

func (r *Reader) canonicalUnits() (string, []*zip.File) {
prefix := r.unitPath("") + "/"
pos := r.firstIndex(prefix)
if pos < 0 {
return "", nil
}
var res []*zip.File
for _, file := range r.zip.File[pos:] {
if !strings.HasPrefix(file.Name, prefix) {
break
}
if file.Name == prefix {
continue // tolerate an empty units directory entry
}
res = append(res, file)

}
return prefix, res
}

// Scan scans all the compilations stored in the archive, and invokes f for
// each compilation record. If f reports an error, the scan is terminated and
// that error is propagated to the caller of Scan. At most 1 invocation of f
Expand All @@ -215,9 +323,8 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error {
}
}

prefix := r.unitPath("") + "/"
pos := r.firstIndex(prefix)
if pos < 0 {
prefix, fileUnits := r.canonicalUnits()
if len(fileUnits) == 0 {
return nil
}

Expand All @@ -226,14 +333,10 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error {
g, ctx := errgroup.WithContext(ctx)

files := make(chan *zip.File)

g.Go(func() error {
defer close(files)
for _, file := range r.zip.File[pos:] {
if !strings.HasPrefix(file.Name, prefix) {
break
} else if file.Name == prefix {
continue // tolerate an empty units directory entry
}
for _, file := range fileUnits {
select {
case <-ctx.Done():
return nil
Expand All @@ -242,7 +345,6 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error {
}
return nil
})

units := make(chan *Unit)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
Expand All @@ -251,7 +353,7 @@ func (r *Reader) Scan(f func(*Unit) error, opts ...ScanOption) error {
defer wg.Done()
for file := range files {
digest := strings.TrimPrefix(file.Name, prefix)
unit, err := readUnit(digest, file)
unit, err := r.readUnit(digest, file)
if err != nil {
return err
}
Expand Down Expand Up @@ -316,11 +418,23 @@ type Writer struct {
fd stringset.Set // file digests already written
ud stringset.Set // unit digests already written
c io.Closer // a closer for the underlying writer (may be nil)

encoding Encoding // What encoding to use
}

// WriterOption describes options when creating a Writer
type WriterOption func(*Writer)

// WithEncoding sets the encoding to be used by a Writer
func WithEncoding(e Encoding) WriterOption {
return func(w *Writer) {
w.encoding = e
}
}

// NewWriter constructs a new empty Writer that delivers output to w. The
// AddUnit and AddFile methods are safe for use by concurrent goroutines.
func NewWriter(w io.Writer) (*Writer, error) {
func NewWriter(w io.Writer, options ...WriterOption) (*Writer, error) {
archive := zip.NewWriter(w)
// Create an entry for the root directory, which must be first.
root := &zip.FileHeader{
Expand All @@ -334,17 +448,22 @@ func NewWriter(w io.Writer) (*Writer, error) {
}
archive.SetComment("Kythe kzip archive")

return &Writer{
zip: archive,
fd: stringset.New(),
ud: stringset.New(),
}, nil
kw := &Writer{
zip: archive,
fd: stringset.New(),
ud: stringset.New(),
encoding: EncodingJSON,
}
for _, opt := range options {
opt(kw)
}
return kw, nil
}

// NewWriteCloser behaves as NewWriter, but arranges that when the *Writer is
// closed it also closes wc.
func NewWriteCloser(wc io.WriteCloser) (*Writer, error) {
w, err := NewWriter(wc)
func NewWriteCloser(wc io.WriteCloser, options ...WriterOption) (*Writer, error) {
w, err := NewWriter(wc, options...)
if err == nil {
w.c = wc
}
Expand Down Expand Up @@ -374,15 +493,34 @@ func (w *Writer) AddUnit(cu *apb.CompilationUnit, index *apb.IndexedCompilation_
return digest, ErrUnitExists
}

f, err := w.zip.CreateHeader(newFileHeader("root", "units", digest))
if err != nil {
return "", err
if w.encoding&EncodingJSON != 0 {
f, err := w.zip.CreateHeader(newFileHeader("root", prefixJSON, digest))
if err != nil {
return "", err
}
if err := toJSON.Marshal(f, &apb.IndexedCompilation{
Unit: unit.Proto,
Index: index,
}); err != nil {
return "", err
}
}
if err := toJSON.Marshal(f, &apb.IndexedCompilation{
Unit: unit.Proto,
Index: index,
}); err != nil {
return "", err
if w.encoding&EncodingProto != 0 {
f, err := w.zip.CreateHeader(newFileHeader("root", prefixProto, digest))
if err != nil {
return "", err
}
rec, err := proto.Marshal(&apb.IndexedCompilation{
Unit: unit.Proto,
Index: index,
})
if err != nil {
return "", err
}
_, err = f.Write(rec)
if err != nil {
return "", err
}
}
w.ud.Add(digest)
return digest, nil
Expand Down

0 comments on commit 3643205

Please sign in to comment.