Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add peek functionality to index reader and use it in the file source (2.9.x) #9979

Merged
merged 1 commit into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/internal/storage/fileset/index/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func WithDatum(datum string) Option {
}
}

func WithPeek() Option {
return func(r *Reader) {
r.peek = true
}
}

// WithShardConfig sets the sharding configuration.
func WithShardConfig(config *ShardConfig) Option {
return func(r *Reader) {
Expand Down
11 changes: 8 additions & 3 deletions src/internal/storage/fileset/index/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type Reader struct {
topIdx *Index
datum string
shardConfig *ShardConfig
peek bool
}

// NewReader creates a new Reader.
Expand All @@ -56,11 +57,15 @@ func (r *Reader) Iterate(ctx context.Context, cb func(*Index) error) error {
if r.topIdx == nil {
return nil
}
peek := r.peek
traverseCb := func(idx *Index) (bool, error) {
if atEnd(idx.Path, r.filter) {
return false, errutil.ErrBreak
}
if idx.File != nil {
if atEnd(idx.Path, r.filter) {
if !peek {
return false, errutil.ErrBreak
}
peek = false
}
if !atStart(idx.Path, r.filter) || !(r.datum == "" || r.datum == idx.File.Datum) {
return false, nil
}
Expand Down
39 changes: 26 additions & 13 deletions src/server/pfs/server/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,10 @@ type Source interface {
}

type source struct {
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
indexOpts []index.Option
upper string
commitInfo *pfs.CommitInfo
fileSet fileset.FileSet
dirIndexOpts, fileIndexOpts []index.Option
upper string
}

// NewSource creates a Source which emits FileInfos with the information from commit, and the entries return from fileSet.
Expand All @@ -38,21 +38,34 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
s := &source{
commitInfo: commitInfo,
fileSet: fileset.NewDirInserter(fs, sc.prefix),
indexOpts: []index.Option{
dirIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
fileIndexOpts: []index.Option{
index.WithPrefix(sc.prefix),
index.WithDatum(sc.datum),
},
}
if sc.pathRange != nil {
s.fileSet = fileset.NewDirInserter(fs, sc.pathRange.Lower)
// The upper for the path range is not set to ensure that we
// emit directories at the end of the path range. For example,
// the files /d1/f1 and /d2/f2 with a path range of [/d1/f1,
// /d2/f2) should emit /d1/f1 and /d2/. The upper bound will be
// applied within the callback of the iteration.
s.indexOpts = append(s.indexOpts, index.WithRange(&index.PathRange{
// The directory index options have no upper bound because a
// directory may extend past the upper bound of the path range.
s.dirIndexOpts = append(s.dirIndexOpts,
index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
}))
s.fileIndexOpts = append(s.fileIndexOpts, index.WithRange(&index.PathRange{
Lower: sc.pathRange.Lower,
Upper: sc.pathRange.Upper,
}))
// WithPeek is set to ensure that we iterate one past the upper
// bound of the path range. This is necessary to ensure that
// directories at the end of the path range are emitted. The
// paths are checked again in the callback to ensure we
// terminate if the next directory / file is past the upper
// bound.
s.fileIndexOpts = append(s.fileIndexOpts, index.WithPeek())
s.upper = sc.pathRange.Upper
}
if sc.filter != nil {
Expand All @@ -66,7 +79,7 @@ func NewSource(commitInfo *pfs.CommitInfo, fs fileset.FileSet, opts ...SourceOpt
func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.File) error) error {
ctx, cf := pctx.WithCancel(ctx)
defer cf()
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.indexOpts...)
iter := fileset.NewIterator(ctx, s.fileSet.Iterate, s.dirIndexOpts...)
cache := make(map[string]*pfs.FileInfo)
err := s.fileSet.Iterate(ctx, func(f fileset.File) error {
idx := f.Index()
Expand Down Expand Up @@ -103,7 +116,7 @@ func (s *source) Iterate(ctx context.Context, cb func(*pfs.FileInfo, fileset.Fil
return errors.EnsureStack(err)
}
return nil
}, s.indexOpts...)
}, s.fileIndexOpts...)
if errors.Is(err, errutil.ErrBreak) {
err = nil
}
Expand Down