Skip to content

Commit

Permalink
union: implement optional interfaces (Move, DirMove, Copy etc) - fixes
Browse files Browse the repository at this point in the history
…#2619

Implement optional interfaces
- Purge
- PutStream
- Copy
- Move
- DirMove
- DirCacheFlush
- ChangeNotify
- About

Make Hashes() return the intersection of all the hashes supported by the remotes
  • Loading branch information
ncw committed Oct 2, 2018
1 parent e4538af commit d90eccd
Showing 1 changed file with 148 additions and 7 deletions.
155 changes: 148 additions & 7 deletions backend/union/union.go
Expand Up @@ -42,6 +42,8 @@ type Fs struct {
opt Options // options for this Fs
root string // the path we are working on
remotes []fs.Fs // slice of remotes
wr fs.Fs // writable remote
hashSet hash.Set // intersection of hash types
}

// Name of the remote (as passed into NewFs)
Expand All @@ -66,18 +68,119 @@ func (f *Fs) Features() *fs.Features {

// Rmdir removes the root directory of the Fs object
func (f *Fs) Rmdir(dir string) error {
return f.remotes[len(f.remotes)-1].Rmdir(dir)
return f.wr.Rmdir(dir)
}

// Hashes returns hash.HashNone to indicate remote hashing is unavailable
func (f *Fs) Hashes() hash.Set {
// This could probably be set if all remotes share the same hashing algorithm
return hash.Set(hash.None)
return f.hashSet
}

// Mkdir makes the root directory of the Fs object
func (f *Fs) Mkdir(dir string) error {
return f.remotes[len(f.remotes)-1].Mkdir(dir)
return f.wr.Mkdir(dir)
}

// Purge all files in the root and the root directory
//
// Implement this if you have a way of deleting all the files
// quicker than just running Remove() on the result of List()
//
// Return an error if it doesn't exist
func (f *Fs) Purge() error {
return f.wr.Features().Purge()
}

// Copy src to this remote using server side copy operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantCopy
func (f *Fs) Copy(src fs.Object, remote string) (fs.Object, error) {
if src.Fs() != f.wr {
fs.Debugf(src, "Can't copy - not same remote type")
return nil, fs.ErrorCantCopy
}
return f.wr.Features().Copy(src, remote)
}

// Move src to this remote using server side move operations.
//
// This is stored with the remote path given
//
// It returns the destination Object and a possible error
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantMove
func (f *Fs) Move(src fs.Object, remote string) (fs.Object, error) {
if src.Fs() != f.wr {
fs.Debugf(src, "Can't move - not same remote type")
return nil, fs.ErrorCantMove
}
return f.wr.Features().Move(src, remote)
}

// DirMove moves src, srcRemote to this remote at dstRemote
// using server side move operations.
//
// Will only be called if src.Fs().Name() == f.Name()
//
// If it isn't possible then return fs.ErrorCantDirMove
//
// If destination exists then return fs.ErrorDirExists
func (f *Fs) DirMove(src fs.Fs, srcRemote, dstRemote string) error {
srcFs, ok := src.(*Fs)
if !ok {
fs.Debugf(srcFs, "Can't move directory - not same remote type")
return fs.ErrorCantDirMove
}
return f.wr.Features().DirMove(srcFs.wr, srcRemote, dstRemote)
}

// ChangeNotify calls the passed function with a path
// that has had changes. If the implementation
// uses polling, it should adhere to the given interval.
// At least one value will be written to the channel,
// specifying the initial value and updated values might
// follow. A 0 Duration should pause the polling.
// The ChangeNotify implemantion must empty the channel
// regulary. When the channel gets closed, the implemantion
// should stop polling and release resources.
func (f *Fs) ChangeNotify(fn func(string, fs.EntryType), ch <-chan time.Duration) {
for _, remote := range f.remotes {
if ChangeNotify := remote.Features().ChangeNotify; ChangeNotify != nil {
ChangeNotify(fn, ch)
}
}
}

// DirCacheFlush resets the directory cache - used in testing
// as an optional interface
func (f *Fs) DirCacheFlush() {
for _, remote := range f.remotes {
if DirCacheFlush := remote.Features().DirCacheFlush; DirCacheFlush != nil {
DirCacheFlush()
}
}
}

// PutStream uploads to the remote path with the modTime given of indeterminate size
//
// May create the object even if it returns an error - if so
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) PutStream(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.wr.Features().PutStream(in, src, options...)
}

// About gets quota information from the Fs
func (f *Fs) About() (*fs.Usage, error) {
return f.wr.Features().About()
}

// Put in to the remote path with the modTime given of the given size
Expand All @@ -86,7 +189,7 @@ func (f *Fs) Mkdir(dir string) error {
// will return the object and the error, otherwise will return
// nil and the error
func (f *Fs) Put(in io.Reader, src fs.ObjectInfo, options ...fs.OpenOption) (fs.Object, error) {
return f.remotes[len(f.remotes)-1].Put(in, src, options...)
return f.wr.Put(in, src, options...)
}

// List the objects and directories in dir into entries. The
Expand Down Expand Up @@ -204,6 +307,7 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
root: root,
opt: *opt,
remotes: remotes,
wr: remotes[len(remotes)-1],
}
var features = (&fs.Features{
CaseInsensitive: true,
Expand All @@ -212,16 +316,53 @@ func NewFs(name, root string, m configmap.Mapper) (fs.Fs, error) {
WriteMimeType: true,
CanHaveEmptyDirectories: true,
BucketBased: true,
SetTier: true,
GetTier: true,
}).Fill(f)
features = features.Mask(f.wr) // mask the features just on the writable fs

// FIXME maybe should be masking the bools here?

// Clear ChangeNotify and DirCacheFlush if all are nil
clearChangeNotify := true
clearDirCacheFlush := true
for _, remote := range f.remotes {
features = features.Mask(remote)
remoteFeatures := remote.Features()
if remoteFeatures.ChangeNotify != nil {
clearChangeNotify = false
}
if remoteFeatures.DirCacheFlush != nil {
clearDirCacheFlush = false
}
}
if clearChangeNotify {
features.ChangeNotify = nil
}
if clearDirCacheFlush {
features.DirCacheFlush = nil
}

f.features = features

// Get common intersection of hashes
hashSet := f.remotes[0].Hashes()
for _, remote := range f.remotes[1:] {
hashSet = hashSet.Overlap(remote.Hashes())
}
f.hashSet = hashSet

return f, nil
}

// Check the interfaces are satisfied
var (
_ fs.Fs = &Fs{}
_ fs.Fs = (*Fs)(nil)
_ fs.Purger = (*Fs)(nil)
_ fs.PutStreamer = (*Fs)(nil)
_ fs.Copier = (*Fs)(nil)
_ fs.Mover = (*Fs)(nil)
_ fs.DirMover = (*Fs)(nil)
_ fs.DirCacheFlusher = (*Fs)(nil)
_ fs.ChangeNotifier = (*Fs)(nil)
_ fs.Abouter = (*Fs)(nil)
)

0 comments on commit d90eccd

Please sign in to comment.