Skip to content

Commit

Permalink
drive: fix missing items when listing using --fast-list / ListR
Browse files Browse the repository at this point in the history
This is caused by a bug in Google drive where, in some circumstances
querying for "(A in parents) or (B in parents)" returns nothing
whereas querying for "A in parents" and "B in parents" separately
works fine.

This has been reported here:

https://issuetracker.google.com/issues/149522397

This workaround detects this condition by seeing if a listing for more
than one directory at once returns nothing.

If it does then it retries each one individually.

This can potentially have a false positive if the user has multiple
empty directories which are queried at once. The consequence of this
will be that ListR is disabled for a while until the directories are
found to be actually empty in which case it will be re-enabled.

Fixes #3114 and Fixes #4289
  • Loading branch information
ncw committed May 31, 2020
1 parent e7bd392 commit cbf3d43
Showing 1 changed file with 74 additions and 16 deletions.
90 changes: 74 additions & 16 deletions backend/drive/drive.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"text/template"
"time"

Expand Down Expand Up @@ -68,6 +69,8 @@ const (
minChunkSize = 256 * fs.KibiByte
defaultChunkSize = 8 * fs.MebiByte
partialFields = "id,name,size,md5Checksum,trashed,modifiedTime,createdTime,mimeType,parents,webViewLink,shortcutDetails"
listRGrouping = 50 // number of IDs to search at once when using ListR
listRInputBuffer = 1000 // size of input buffer when using ListR
)

// Globals
Expand Down Expand Up @@ -558,6 +561,9 @@ type Fs struct {
isTeamDrive bool // true if this is a team drive
fileFields googleapi.Field // fields to fetch file info with
m configmap.Mapper
grouping int32 // number of IDs to search at once in ListR - read with atomic
listRmu *sync.Mutex // protects listRempties
listRempties map[string]struct{} // IDs of supposedly empty directories which triggered grouping disable
}

type baseObject struct {
Expand Down Expand Up @@ -1079,11 +1085,14 @@ func NewFs(name, path string, m configmap.Mapper) (fs.Fs, error) {
}

f := &Fs{
name: name,
root: root,
opt: *opt,
pacer: newPacer(opt),
m: m,
name: name,
root: root,
opt: *opt,
pacer: newPacer(opt),
m: m,
grouping: listRGrouping,
listRmu: new(sync.Mutex),
listRempties: make(map[string]struct{}),
}
f.isTeamDrive = opt.TeamDriveID != ""
f.fileFields = f.getFileFields()
Expand Down Expand Up @@ -1634,15 +1643,17 @@ func (s listRSlices) Less(i, j int) bool {
// In each cycle it will read up to grouping entries from the in channel without blocking.
// If an error occurs it will be send to the out channel and then return. Once the in channel is closed,
// nil is send to the out channel and the function returns.
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan listREntry, out chan<- error, cb func(fs.DirEntry) error, grouping int) {
func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in chan listREntry, out chan<- error, cb func(fs.DirEntry) error) {
var dirs []string
var paths []string
var grouping int32

for dir := range in {
dirs = append(dirs[:0], dir.id)
paths = append(paths[:0], dir.path)
grouping = atomic.LoadInt32(&f.grouping)
waitloop:
for i := 1; i < grouping; i++ {
for i := int32(1); i < grouping; i++ {
select {
case d, ok := <-in:
if !ok {
Expand All @@ -1655,13 +1666,15 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
}
listRSlices{dirs, paths}.Sort()
var iErr error
foundItems := false
_, err := f.list(ctx, dirs, "", false, false, false, func(item *drive.File) bool {
// shared with me items have no parents when at the root
if f.opt.SharedWithMe && len(item.Parents) == 0 && len(paths) == 1 && paths[0] == "" {
item.Parents = dirs
}
for _, parent := range item.Parents {
var i int
foundItems = true
earlyExit := false
// If only one item in paths then no need to search for the ID
// assuming google drive is doing its job properly.
Expand Down Expand Up @@ -1702,6 +1715,53 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
}
return false
})
// Found no items in more than one directory. Retry these as
// individual directories This is to work around a bug in google
// drive where (A in parents) or (B in parents) returns nothing
// sometimes. See #3114, #4289 and
// https://issuetracker.google.com/issues/149522397
if len(dirs) > 1 && !foundItems {
if atomic.SwapInt32(&f.grouping, 1) != 1 {
fs.Logf(f, "Disabling ListR to work around bug in drive as multi listing (%d) returned no entries", len(dirs))
}
var recycled = make([]listREntry, len(dirs))
f.listRmu.Lock()
for i := range dirs {
recycled[i] = listREntry{id: dirs[i], path: paths[i]}
// Make a note of these dirs - if they all turn
// out to be empty then we can re-enable grouping
f.listRempties[dirs[i]] = struct{}{}
}
f.listRmu.Unlock()
// recycle these in the background so we don't deadlock
// the listR runners if they all get here
wg.Add(len(recycled))
go func() {
for _, entry := range recycled {
in <- entry
}
fs.Debugf(f, "Recycled %d entries", len(recycled))
}()
}
// If using a grouping of 1 and dir was empty then check to see if it
// is part of the group that caused grouping to be disabled.
if grouping == 1 && len(dirs) == 1 && !foundItems {
f.listRmu.Lock()
if _, found := f.listRempties[dirs[0]]; found {
// Remove the ID
delete(f.listRempties, dirs[0])
// If no empties left => all the directories that
// triggered the grouping being set to 1 were actually
// empty so must have made a mistake
if len(f.listRempties) == 0 {
if atomic.SwapInt32(&f.grouping, listRGrouping) != listRGrouping {
fs.Logf(f, "Re-enabling ListR as previous detection was in error")
}
}
}
f.listRmu.Unlock()
}

for range dirs {
wg.Done()
}
Expand Down Expand Up @@ -1736,11 +1796,6 @@ func (f *Fs) listRRunner(ctx context.Context, wg *sync.WaitGroup, in <-chan list
// Don't implement this unless you have a more efficient way
// of listing recursively that doing a directory traversal.
func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (err error) {
const (
grouping = 50
inputBuffer = 1000
)

err = f.dirCache.FindRoot(ctx, false)
if err != nil {
return err
Expand All @@ -1753,7 +1808,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (

mu := sync.Mutex{} // protects in and overflow
wg := sync.WaitGroup{}
in := make(chan listREntry, inputBuffer)
in := make(chan listREntry, listRInputBuffer)
out := make(chan error, fs.Config.Checkers)
list := walk.NewListRHelper(callback)
overflow := []listREntry{}
Expand All @@ -1766,6 +1821,9 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
job := listREntry{actualID(d.ID()), d.Remote()}
select {
case in <- job:
// Adding the wg after we've entered the item is
// safe here because we know when the callback
// is called we are holding a waitgroup.
wg.Add(1)
default:
overflow = append(overflow, job)
Expand All @@ -1779,7 +1837,7 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
in <- listREntry{directoryID, dir}

for i := 0; i < fs.Config.Checkers; i++ {
go f.listRRunner(ctx, &wg, in, out, cb, grouping)
go f.listRRunner(ctx, &wg, in, out, cb)
}
go func() {
// wait until the all directories are processed
Expand All @@ -1789,8 +1847,8 @@ func (f *Fs) ListR(ctx context.Context, dir string, callback fs.ListRCallback) (
mu.Lock()
l := len(overflow)
// only fill half of the channel to prevent entries being put into overflow again
if l > inputBuffer/2 {
l = inputBuffer / 2
if l > listRInputBuffer/2 {
l = listRInputBuffer / 2
}
wg.Add(l)
for _, d := range overflow[:l] {
Expand Down

0 comments on commit cbf3d43

Please sign in to comment.