Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peek functionality to index reader and use it in the file source (2.8.x) #9961

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/internal/storage/fileset/index/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func WithDatum(datum string) Option {
}
}

func WithPeek() Option {
return func(r *Reader) {
r.peek = true
}
}

// WithShardConfig sets the sharding configuration.
func WithShardConfig(config *ShardConfig) Option {
return func(r *Reader) {
Expand Down
11 changes: 8 additions & 3 deletions src/internal/storage/fileset/index/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Reader struct {
topIdx *Index
datum string
shardConfig *ShardConfig
peek bool
}

// NewReader creates a new Reader.
Expand All @@ -56,11 +57,15 @@ func (r *Reader) Iterate(ctx context.Context, cb func(*Index) error) error {
if r.topIdx == nil {
return nil
}
peek := r.peek
traverseCb := func(idx *Index) (bool, error) {
if atEnd(idx.Path, r.filter) {
return false, errutil.ErrBreak
}
if idx.File != nil {
if atEnd(idx.Path, r.filter) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks intentional, but just to confirm: Do we only want to peek if the next file index is past the range?
Otherwise, it looks like range indices will continue to be read until we get to the next file index.

I assume this is needed so we emit the peeked file, maybe that's something worth calling out with a comment so it doesn't have to be inferred and we're aware of that later if we need to modify this code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we want to check the first file index after the end of the path range. This may involve traversing additional range indexes if the end of the path range lines up with a chunk boundary.

if !peek {
return false, errutil.ErrBreak
}
peek = false
}
if !atStart(idx.Path, r.filter) || !(r.datum == "" || r.datum == idx.File.Datum) {
return false, nil
}
Expand Down
39 changes: 26 additions & 13 deletions src/server/pfs/server/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type Source interface {
}

type source struct {
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
indexOpts []index.Option
upper string
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
dirIndexOpts, fileIndexOpts []index.Option
upper string
}

// NewSource creates a Source which emits FileInfos with the information from commit, and the entries return from fileSet.
Expand All @@ -38,21 +38,34 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
s := &source{
commitInfo: commitInfo,
fileSet: fileset.NewDirInserter(fs, sc.prefix),
indexOpts: []index.Option{
dirIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
fileIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
}
if sc.pathRange != nil {
s.fileSet = fileset.NewDirInserter(fs, sc.pathRange.Lower)
// The upper for the path range is not set to ensure that we
// emit directories at the end of the path range. For example,
// the files /d1/f1 and /d2/f2 with a path range of [/d1/f1,
// /d2/f2) should emit /d1/f1 and /d2/. The upper bound will be
// applied within the callback of the iteration.
s.indexOpts = append(s.indexOpts, index.WithRange(&index.PathRange{
// The directory index options have no upper bound because a
// directory may extend past the upper bound of the path range.
s.dirIndexOpts = append(s.dirIndexOpts,
index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
}))
s.fileIndexOpts = append(s.fileIndexOpts, index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
Upper: sc.pathRange.Upper,
}))
// WithPeek is set to ensure that we iterate one past the upper
// bound of the path range. This is necessary to ensure that
// directories at the end of the path range are emitted. The
// paths are checked again in the callback to ensure we
// terminate if the next directory / file is past the upper
// bound.
s.fileIndexOpts = append(s.fileIndexOpts, index.WithPeek())
s.upper = sc.pathRange.Upper
}
if sc.filter != nil {
Expand All @@ -66,7 +79,7 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.File) error) error {
ctx, cf := pctx.WithCancel(ctx)
defer cf()
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.indexOpts...)
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.dirIndexOpts...)
cache := make(map[string]*pfs.FileInfo)
err := s.fileSet.Iterate(ctx, func(f fileset.File) error {
idx := f.Index()
Expand Down Expand Up @@ -103,7 +116,7 @@ func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.Fil
return errors.EnsureStack(err)
}
return nil
}, s.indexOpts...)
}, s.fileIndexOpts...)
if errors.Is(err, errutil.ErrBreak) {
err = nil
}
Expand Down