Skip to content

Commit

Permalink
feat(serving): add option to read xref pages concurrently (#5469)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Dec 6, 2022
1 parent 8b5bc95 commit 9493a8d
Show file tree
Hide file tree
Showing 5 changed files with 332 additions and 47 deletions.
4 changes: 2 additions & 2 deletions external.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1128,8 +1128,8 @@ def _go_dependencies():
patches = [
"@io_kythe//third_party/go:add_export_license.patch",
],
sum = "h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=",
version = "v0.0.0-20210220032951-036812b2e83c",
sum = "h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=",
version = "v0.1.0",
)

go_repository(
Expand Down
32 changes: 21 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,43 +3,53 @@ module kythe.io
require (
bitbucket.org/creachadair/shell v0.0.6
bitbucket.org/creachadair/stringset v0.0.9
cloud.google.com/go v0.90.0 // indirect
cloud.google.com/go/storage v1.16.0 // indirect
github.com/DataDog/zstd v1.4.8
github.com/apache/beam v2.31.0+incompatible
github.com/bazelbuild/rules_go v0.28.0
github.com/beevik/etree v1.1.0
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2
github.com/golang/snappy v0.0.4
github.com/google/brotli/go/cbrotli v0.0.0-20210804124202-19d86fb9a60a
github.com/google/codesearch v1.2.0 // indirect
github.com/google/codesearch v1.2.0
github.com/google/go-cmp v0.5.6
github.com/google/orderedcode v0.0.1
github.com/google/subcommands v1.2.0
github.com/google/uuid v1.3.0 // indirect
github.com/hanwen/go-fuse v1.0.0
github.com/jmhodges/levigo v1.0.0
github.com/minio/highwayhash v1.0.2
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
github.com/pkg/errors v0.9.1
github.com/sergi/go-diff v1.2.0
github.com/sourcegraph/go-langserver v2.0.0+incompatible
github.com/sourcegraph/jsonrpc2 v0.1.0
github.com/syndtr/goleveldb v1.0.0
golang.org/x/net v0.0.0-20210726213435-c6fcb2dbf985
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
golang.org/x/sync v0.1.0
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
golang.org/x/text v0.3.6
golang.org/x/tools v0.1.5
google.golang.org/api v0.52.0
google.golang.org/genproto v0.0.0-20210803142424-70bd63adacf2 // indirect
google.golang.org/grpc v1.39.0
google.golang.org/protobuf v1.27.1
gopkg.in/yaml.v2 v2.4.0 // indirect
sigs.k8s.io/yaml v1.2.0
)

go 1.13
require (
cloud.google.com/go v0.90.0 // indirect
cloud.google.com/go/storage v1.16.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/googleapis/gax-go/v2 v2.0.5 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/onsi/ginkgo v1.8.0 // indirect
github.com/onsi/gomega v1.5.0 // indirect
go.opencensus.io v0.23.0 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20210803142424-70bd63adacf2 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

go 1.18
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.m
github.com/envoyproxy/go-control-plane v0.9.9-0.20210217033140-668b12f5399d/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
Expand Down Expand Up @@ -344,8 +343,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
210 changes: 178 additions & 32 deletions kythe/go/serving/xrefs/xrefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"path/filepath"
"regexp"
"strings"
"sync"
"time"

"kythe.io/kythe/go/services/xrefs"
Expand Down Expand Up @@ -67,6 +68,8 @@ var (
// TODO(schroederc): remove once relevant clients specify their required quality
defaultTotalsQuality = flag.String("experimental_default_totals_quality", "APPROXIMATE_TOTALS", "Default TotalsQuality when unspecified in CrossReferencesRequest")

pageReadAhead = flag.Uint("page_read_ahead", 0, "How many xref pages to read ahead concurrently (0 disables readahead)")

responseLeewayTime = flag.Duration("xrefs_response_leeway_time", 50*time.Millisecond, "If possible, leave this much time at the end of a CrossReferencesRequest to return any results already read")
)

Expand Down Expand Up @@ -624,6 +627,53 @@ func decorationToReference(norm *span.Normalizer, d *srvpb.FileDecorations_Decor
}
}

type xrefCategory int

const (
xrefCategoryNone xrefCategory = iota
xrefCategoryDef
xrefCategoryDecl
xrefCategoryRef
xrefCategoryCall
xrefCategoryRelated
xrefCategoryIndirection
)

func (c xrefCategory) AddCount(reply *xpb.CrossReferencesReply, idx *srvpb.PagedCrossReferences_PageIndex, pageSet *pageSet) {
switch c {
case xrefCategoryDef:
if pageSet.Contains(idx) {
reply.Total.Definitions += int64(idx.Count)
} else {
reply.Filtered.Definitions += int64(idx.Count)
}
case xrefCategoryDecl:
if pageSet.Contains(idx) {
reply.Total.Declarations += int64(idx.Count)
} else {
reply.Filtered.Declarations += int64(idx.Count)
}
case xrefCategoryRef:
if pageSet.Contains(idx) {
reply.Total.References += int64(idx.Count)
} else {
reply.Filtered.References += int64(idx.Count)
}
case xrefCategoryRelated:
if pageSet.Contains(idx) {
reply.Total.RelatedNodesByRelation[idx.Kind] += int64(idx.Count)
} else {
reply.Filtered.RelatedNodesByRelation[idx.Kind] += int64(idx.Count)
}
case xrefCategoryCall:
if pageSet.Contains(idx) {
reply.Total.Callers += int64(idx.Count)
} else {
reply.Filtered.Callers += int64(idx.Count)
}
}
}

// CrossReferences implements part of the xrefs.Service interface.
func (t *Table) CrossReferences(ctx context.Context, req *xpb.CrossReferencesRequest) (*xpb.CrossReferencesReply, error) {
tickets, err := xrefs.FixTickets(req.Ticket)
Expand All @@ -645,11 +695,22 @@ func (t *Table) CrossReferences(ctx context.Context, req *xpb.CrossReferencesReq
return nil, status.Errorf(codes.InvalidArgument, "invalid corpus_path_filters %s: %v", strings.ReplaceAll(req.GetCorpusPathFilters().String(), "\n", " "), err)
}

pageReadGroup, pageReadGroupCtx := errgroup.WithContext(ctx)
pageReadGroup.SetLimit(int(*pageReadAhead) + 1)
single := new(syncCache[*srvpb.PagedCrossReferences_Page])

getCachedPage := func(ctx context.Context, pageKey string) (*srvpb.PagedCrossReferences_Page, error) {
return single.Get(pageKey, func() (*srvpb.PagedCrossReferences_Page, error) {
return t.crossReferencesPage(ctx, pageKey)
})
}
getFilteredPage := func(ctx context.Context, pageKey string) (*srvpb.PagedCrossReferences_Page, int, error) {
p, err := t.crossReferencesPage(ctx, pageKey)
p, err := getCachedPage(ctx, pageKey)
if err != nil {
return nil, 0, err
}
// Clear page from cache; it should only be used once.
single.Delete(pageKey)
return p, filter.FilterGroup(p.GetGroup()), nil
}

Expand Down Expand Up @@ -874,24 +935,83 @@ readLoop:
}

pageSet := filter.PageSet(cr)
for _, idx := range cr.GetPageIndex() {

pageCategory := func(idx *srvpb.PagedCrossReferences_PageIndex) xrefCategory {
// Filter anchor pages based on requested build configs
if len(buildConfigs) != 0 && !buildConfigs.Contains(idx.BuildConfig) && !xrefs.IsRelatedNodeKind(relatedKinds, idx.Kind) {
return xrefCategoryNone
}

switch {
case xrefs.IsDefKind(req.DefinitionKind, idx.Kind, cr.Incomplete):
return xrefCategoryDef
case xrefs.IsDeclKind(req.DeclarationKind, idx.Kind, cr.Incomplete):
return xrefCategoryDecl
case xrefs.IsRefKind(req.ReferenceKind, idx.Kind):
return xrefCategoryRef
case len(req.Filter) > 0 && xrefs.IsRelatedNodeKind(relatedKinds, idx.Kind):
return xrefCategoryRelated
case indirections.Contains(idx.Kind):
return xrefCategoryIndirection
case xrefs.IsCallerKind(req.CallerKind, idx.Kind):
return xrefCategoryCall
default:
return xrefCategoryNone
}
}

// Find the first unskipped page index so proper read ahead.
firstUnskippedPage := len(cr.GetPageIndex())
for i, idx := range cr.GetPageIndex() {
c := pageCategory(idx)
if c == xrefCategoryNone {
continue
}

if !stats.skipPage(idx) {
firstUnskippedPage = i
break
}
c.AddCount(reply, idx, pageSet)
}

// If enabled, start reading pages concurrently starting from the first
// unskipped page.
if *pageReadAhead > 0 {
pageReadGroup.Go(func() error {
ctx := pageReadGroupCtx
for _, idx := range cr.GetPageIndex()[firstUnskippedPage:] {
if err := ctx.Err(); err != nil {
return err
}
if pageCategory(idx) == xrefCategoryNone || !pageSet.Contains(idx) {
continue
}

idx := idx
pageReadGroup.Go(func() error {
_, err := getCachedPage(ctx, idx.PageKey)
return err
})
}
return nil
})
}

for _, idx := range cr.GetPageIndex()[firstUnskippedPage:] {
if !leewayTime.IsZero() && time.Now().After(leewayTime) {
log.Println("WARNING: hit soft deadline; trying to return already read xrefs")
log.Printf("WARNING: hit soft deadline; trying to return already read xrefs: %s", time.Now().Sub(leewayTime))
break readLoop
}

switch {
case xrefs.IsDefKind(req.DefinitionKind, idx.Kind, cr.Incomplete):
if !pageSet.Contains(idx) {
reply.Filtered.Definitions += int64(idx.Count)
continue
}
reply.Total.Definitions += int64(idx.Count)
c := pageCategory(idx)
c.AddCount(reply, idx, pageSet)
if c != xrefCategoryIndirection && c != xrefCategoryRelated && !pageSet.Contains(idx) {
continue
}

switch c {
case xrefCategoryDef:
if wantMoreCrossRefs && !stats.skipPage(idx) {
p, filtered, err := getFilteredPage(ctx, idx.PageKey)
if err != nil {
Expand All @@ -901,12 +1021,7 @@ readLoop:
reply.Filtered.Definitions += int64(filtered)
stats.addAnchors(&crs.Definition, p.Group)
}
case xrefs.IsDeclKind(req.DeclarationKind, idx.Kind, cr.Incomplete):
if !pageSet.Contains(idx) {
reply.Filtered.Declarations += int64(idx.Count)
continue
}
reply.Total.Declarations += int64(idx.Count)
case xrefCategoryDecl:
if wantMoreCrossRefs && !stats.skipPage(idx) {
p, filtered, err := getFilteredPage(ctx, idx.PageKey)
if err != nil {
Expand All @@ -916,12 +1031,7 @@ readLoop:
reply.Filtered.Declarations += int64(filtered)
stats.addAnchors(&crs.Declaration, p.Group)
}
case xrefs.IsRefKind(req.ReferenceKind, idx.Kind):
if !pageSet.Contains(idx) {
reply.Filtered.References += int64(idx.Count)
continue
}
reply.Total.References += int64(idx.Count)
case xrefCategoryRef:
if wantMoreCrossRefs && !stats.skipPage(idx) {
p, filtered, err := getFilteredPage(ctx, idx.PageKey)
if err != nil {
Expand All @@ -931,12 +1041,11 @@ readLoop:
reply.Filtered.References += int64(filtered)
stats.addAnchors(&crs.Reference, p.Group)
}
case xrefs.IsRelatedNodeKind(nil, idx.Kind):
case xrefCategoryRelated, xrefCategoryIndirection:
var p *srvpb.PagedCrossReferences_Page

if len(req.Filter) > 0 && xrefs.IsRelatedNodeKind(relatedKinds, idx.Kind) {
if pageSet.Contains(idx) {
reply.Total.RelatedNodesByRelation[idx.Kind] += int64(idx.Count)
if wantMoreCrossRefs && !stats.skipPage(idx) {
var filtered int
p, filtered, err = getFilteredPage(ctx, idx.PageKey)
Expand All @@ -947,8 +1056,6 @@ readLoop:
reply.Filtered.RelatedNodesByRelation[idx.Kind] += int64(filtered)
stats.addRelatedNodes(crs, p.Group)
}
} else {
reply.Filtered.RelatedNodesByRelation[idx.Kind] += int64(idx.Count)
}
}

Expand All @@ -966,12 +1073,7 @@ readLoop:
}
}
}
case xrefs.IsCallerKind(req.CallerKind, idx.Kind):
if !pageSet.Contains(idx) {
reply.Filtered.Callers += int64(idx.Count)
continue
}
reply.Total.Callers += int64(idx.Count)
case xrefCategoryCall:
if wantMoreCrossRefs && !stats.skipPage(idx) {
p, filtered, err := getFilteredPage(ctx, idx.PageKey)
if err != nil {
Expand Down Expand Up @@ -1524,3 +1626,47 @@ func isNonContextError(err error) bool {
err = canonicalError(err, "", "")
return err != nil && err != xrefs.ErrCanceled && err != xrefs.ErrDeadlineExceeded
}

// call is an in-flight or completed Get call
type call[T any] struct {
wg sync.WaitGroup
val T
err error
}

type syncCache[T any] struct {
mu sync.Mutex
m map[string]*call[T]
}

// Get executes and returns the results of the given function, making sure that
// there is only one execution for a given key (until Delete is called). If a
// duplicate comes in, the duplicate caller waits for the original to complete
// and receives the same results.
func (g *syncCache[T]) Get(key string, fn func() (T, error)) (T, error) {
g.mu.Lock()
if g.m == nil {
g.m = make(map[string]*call[T])
}
if c, ok := g.m[key]; ok {
g.mu.Unlock()
c.wg.Wait()
return c.val, c.err
}
c := new(call[T])
c.wg.Add(1)
g.m[key] = c
g.mu.Unlock()

c.val, c.err = fn()
c.wg.Done()

return c.val, c.err
}

// Delete removes the given key from the cache.
func (g *syncCache[T]) Delete(key string) {
g.mu.Lock()
delete(g.m, key)
g.mu.Unlock()
}

0 comments on commit 9493a8d

Please sign in to comment.