Skip to content

Commit

Permalink
fix(extraction): deterministically write files to kzip (#4479)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Apr 20, 2020
1 parent 0f9ced3 commit 04e2fc9
Showing 1 changed file with 31 additions and 12 deletions.
43 changes: 31 additions & 12 deletions kythe/go/extractors/bazel/extractor.go
Expand Up @@ -26,7 +26,6 @@ import (
"os"
"path/filepath"
"sort"
"sync"
"time"

"kythe.io/kythe/go/platform/kzip"
Expand Down Expand Up @@ -257,30 +256,50 @@ func (c *Config) extract(ctx context.Context, info *ActionInfo, file fileReader)
}

// fetchInputs concurrently fetches the contents of all the specified file
// paths. An open reader for each file is passed to the file callback along
// with its path's offset in the input slice. If the callback returns an error,
// that error is propagated.
// paths. An open reader for each file is passed sequentially to the file
// callback along with its path's offset in the input slice. If the callback
// returns an error, that error is propagated.
func (c *Config) fetchInputs(ctx context.Context, paths []string, file func(int, io.Reader) error) error {
// Fetch concurrently. Each element of the proto slices is accessed by a
// single goroutine corresponding to its index.

g, gCtx := errgroup.WithContext(ctx)

files := make([]chan io.ReadCloser, len(paths))
for i := range paths {
files[i] = make(chan io.ReadCloser)
}
g.Go(func() error {
// Pass each file Reader to the callback sequentially
for i, ch := range files {
select {
case <-gCtx.Done():
return gCtx.Err()
case rc := <-ch:
err := file(i, rc)
rc.Close()
if err != nil {
return err
}
}
}
return nil
})

throttle := make(chan struct{}, 256)
var g errgroup.Group
var fmu sync.Mutex // coordinates access into the file callback
for i, path := range paths {
i, path := i, path
throttle <- struct{}{}
g.Go(func() error {
throttle <- struct{}{}
defer func() { <-throttle }()
rc, err := c.openRead(ctx, path)
rc, err := c.openRead(gCtx, path)
if err != nil {
log.Printf("ERROR: Reading input file: %v", err)
return err
}
defer rc.Close()
fmu.Lock()
defer fmu.Unlock()
return file(i, rc)
files[i] <- rc
close(files[i])
return nil
})
}
return g.Wait()
Expand Down

0 comments on commit 04e2fc9

Please sign in to comment.