Skip to content

Commit

Permalink
Return an empty reader when read a directory (#64)
Browse files Browse the repository at this point in the history
* Return an empty reader when open filesystem directory

* "Fix ReadCloser for Get method of file, sftp hdfs. Add IsDir field for Object struct"

* support sync single object for sftp source

Co-authored-by: chnliyong <liyong@juicedata.io>
  • Loading branch information
2 people authored and davies committed Jan 28, 2021
1 parent f0ec719 commit e049423
Show file tree
Hide file tree
Showing 22 changed files with 76 additions and 27 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dist
juicesync
.idea
2 changes: 1 addition & 1 deletion object/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func (b *wasb) List(prefix, marker string, limit int64) ([]*Object, error) {
for i := 0; i < n; i++ {
blob := resp.Blobs[i]
mtime := time.Time(blob.Properties.LastModified)
objs[i] = &Object{blob.Name, int64(blob.Properties.ContentLength), mtime}
objs[i] = &Object{blob.Name, int64(blob.Properties.ContentLength), mtime, strings.HasSuffix(blob.Name, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/b2.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (c *b2client) List(prefix, marker string, limit int64) ([]*Object, error) {
attr, err := objects[i].Attrs(ctx)
if err == nil {
// attr.LastModified is not correct
objs[i] = &Object{attr.Name, attr.Size, attr.UploadTimestamp}
objs[i] = &Object{attr.Name, attr.Size, attr.UploadTimestamp, strings.HasSuffix(attr.Name, "/")}
}
}
return objs, nil
Expand Down
2 changes: 1 addition & 1 deletion object/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (q *bosclient) List(prefix, marker string, limit int64) ([]*Object, error)
k := out.Contents[i]
println(k.LastModified)
mod, _ := time.Parse("2006-01-02T15:04:05Z", k.LastModified)
objs[i] = &Object{k.Key, int64(k.Size), mod}
objs[i] = &Object{k.Key, int64(k.Size), mod, strings.HasSuffix(k.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (c *COS) List(prefix, marker string, limit int64) ([]*Object, error) {
}
objs := make([]*Object, len(out.Contents))
for i, item := range out.Contents {
objs[i] = &Object{item.Key, item.Size, item.LastModified}
objs[i] = &Object{item.Key, item.Size, item.LastModified, strings.HasSuffix(item.Key, "/")}
}
return objs, nil
}
Expand Down
12 changes: 11 additions & 1 deletion object/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,20 @@ func (d *filestore) path(key string) string {

func (d *filestore) Get(key string, off, limit int64) (io.ReadCloser, error) {
p := d.path(key)

f, err := os.Open(p)
if err != nil {
return nil, err
}

finfo, err := f.Stat()
if err != nil {
return nil, err
}
if finfo.IsDir() {
return ioutil.NopCloser(bytes.NewBuffer([]byte{})), nil
}

if off > 0 {
if _, err := f.Seek(off, 0); err != nil {
f.Close()
Expand Down Expand Up @@ -215,7 +225,7 @@ func (d *filestore) ListAll(prefix, marker string) (<-chan *Object, error) {
return nil
}
owner, group := getOwnerGroup(info)
f := &File{Object{key, info.Size(), info.ModTime()}, owner, group, info.Mode()}
f := &File{Object{key, info.Size(), info.ModTime(), info.IsDir()}, owner, group, info.Mode()}
if info.IsDir() {
f.Size = 0
if f.Key != "" || !strings.HasSuffix(d.root, "/") {
Expand Down
2 changes: 1 addition & 1 deletion object/gs.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (g *gs) List(prefix, marker string, limit int64) ([]*Object, error) {
for i := 0; i < n; i++ {
item := objects.Items[i]
mtime, _ := time.Parse(time.RFC3339, item.Updated)
objs[i] = &Object{item.Name, int64(item.Size), mtime}
objs[i] = &Object{item.Name, int64(item.Size), mtime, strings.HasSuffix(item.Name, "/")}
}
return objs, nil
}
Expand Down
8 changes: 7 additions & 1 deletion object/hdfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ func (h *hdfsclient) Get(key string, off, limit int64) (io.ReadCloser, error) {
if err != nil {
return nil, err
}

finfo := f.Stat()
if finfo.IsDir() {
return ioutil.NopCloser(bytes.NewBuffer([]byte{})), nil
}

if off > 0 {
if _, err := f.Seek(off, io.SeekStart); err != nil {
f.Close()
Expand Down Expand Up @@ -176,7 +182,7 @@ func (h *hdfsclient) ListAll(prefix, marker string) (<-chan *Object, error) {
return nil
}
hinfo := info.(*hdfs.FileInfo)
f := &File{Object{key, info.Size(), info.ModTime()}, hinfo.Owner(), hinfo.OwnerGroup(), info.Mode()}
f := &File{Object{key, info.Size(), info.ModTime(), info.IsDir()}, hinfo.Owner(), hinfo.OwnerGroup(), info.Mode()}
if f.Owner == superuser {
f.Owner = "root"
}
Expand Down
2 changes: 1 addition & 1 deletion object/ks3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *ks3) List(prefix, marker string, limit int64) ([]*Object, error) {
objs := make([]*Object, n)
for i := 0; i < n; i++ {
o := resp.Contents[i]
objs[i] = &Object{*o.Key, *o.Size, *o.LastModified}
objs[i] = &Object{*o.Key, *o.Size, *o.LastModified, strings.HasSuffix(*o.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/mem.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (m *memStore) List(prefix, marker string, limit int64) ([]*Object, error) {
for k := range m.objects {
if strings.HasPrefix(k, prefix) && k > marker {
obj := m.objects[k]
f := &File{Object{k, int64(len(obj.data)), obj.mtime}, obj.owner, obj.group, obj.mode}
f := &File{Object{k, int64(len(obj.data)), obj.mtime, strings.HasSuffix(k, "/")}, obj.owner, obj.group, obj.mode}
objs = append(objs, (*Object)(unsafe.Pointer(f)))
}
}
Expand Down
2 changes: 1 addition & 1 deletion object/mss.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (c *mss) List(prefix, marker string, limit int64) ([]*Object, error) {
}
objs := make([]*Object, len(out.Contents))
for i, item := range out.Contents {
objs[i] = &Object{item.Key, item.Size, item.LastModified}
objs[i] = &Object{item.Key, item.Size, item.LastModified, strings.HasSuffix(item.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/nos.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *nos) List(prefix, marker string, limit int64) ([]*Object, error) {
if err == nil {
mtime = mtime.Add(-8 * time.Hour)
}
objs[i] = &Object{o.Key, o.Size, mtime}
objs[i] = &Object{o.Key, o.Size, mtime, strings.HasSuffix(o.Key, "/")}
}
return objs, nil
}
Expand Down
1 change: 1 addition & 0 deletions object/object_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type Object struct {
Key string
Size int64
Mtime time.Time // Unix seconds
IsDir bool
}

type MultipartUpload struct {
Expand Down
2 changes: 1 addition & 1 deletion object/obs.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *obsClient) List(prefix, marker string, limit int64) ([]*Object, error)
objs := make([]*Object, n)
for i := 0; i < n; i++ {
o := resp.Contents[i]
objs[i] = &Object{o.Key, o.Size, o.LastModified}
objs[i] = &Object{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (o *ossClient) List(prefix, marker string, limit int64) ([]*Object, error)
objs := make([]*Object, n)
for i := 0; i < n; i++ {
o := result.Objects[i]
objs[i] = &Object{o.Key, o.Size, o.LastModified}
objs[i] = &Object{o.Key, o.Size, o.LastModified, strings.HasSuffix(o.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/qingstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (q *qingstor) List(prefix, marker string, limit int64) ([]*Object, error) {
objs := make([]*Object, n)
for i := 0; i < n; i++ {
k := out.Keys[i]
objs[i] = &Object{(*k.Key), *k.Size, time.Unix(int64(*k.Modified), 0)}
objs[i] = &Object{(*k.Key), *k.Size, time.Unix(int64(*k.Modified), 0), strings.HasSuffix(*k.Key, "/")}
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/qiniu.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (q *qiniu) List(prefix, marker string, limit int64) ([]*Object, error) {
for i := 0; i < n; i++ {
entry := entries[i]
if entry.Key > prefix {
objs = append(objs, &Object{entry.Key, entry.Fsize, time.Unix(entry.PutTime/10000000, 0)})
objs = append(objs, &Object{entry.Key, entry.Fsize, time.Unix(entry.PutTime/10000000, 0), strings.HasSuffix(entry.Key, "/")})
}
}
return objs, nil
Expand Down
2 changes: 1 addition & 1 deletion object/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *s3client) List(prefix, marker string, limit int64) ([]*Object, error) {
objs := make([]*Object, n)
for i := 0; i < n; i++ {
o := resp.Contents[i]
objs[i] = &Object{*o.Key, *o.Size, *o.LastModified}
objs[i] = &Object{*o.Key, *o.Size, *o.LastModified, strings.HasSuffix(*o.Key, "/")}
}
return objs, nil
}
Expand Down
46 changes: 38 additions & 8 deletions object/sftp.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ func (f *sftpStore) Get(key string, off, limit int64) (io.ReadCloser, error) {
if err != nil {
return nil, err
}
finfo, err := ff.Stat()
if err != nil {
return nil, err
}
if finfo.IsDir() {
return ioutil.NopCloser(bytes.NewBuffer([]byte{})), nil
}

if off > 0 {
if _, err := ff.Seek(off, 0); err != nil {
ff.Close()
Expand Down Expand Up @@ -254,32 +262,54 @@ func (s sortFI) Less(i, j int) bool {
return name1 < name2
}

func (f *sftpStore) find(c *sftp.Client, path, marker string, out chan *Object) {
if path == "" {
path = "."
}
func (f *sftpStore) doFind(c *sftp.Client, path, marker string, out chan *Object) {
infos, err := c.ReadDir(path)
if err != nil {
logger.Errorf("readdir %s: %s", path, err)
return
}

sort.Sort(sortFI(infos))
for _, fi := range infos {
p := path + "/" + fi.Name()
key := p[len(f.root):]
if key >= marker {
if fi.IsDir() {
out <- &Object{key + "/", 0, fi.ModTime()}
} else if fi.Size() > 0 {
out <- &Object{key, fi.Size(), fi.ModTime()}
out <- &Object{key + "/", 0, fi.ModTime(), true}
} else {
out <- &Object{key, fi.Size(), fi.ModTime(), false}
}
}
if fi.IsDir() && (key >= marker || strings.HasPrefix(marker, key)) {
f.find(c, p, marker, out)
f.doFind(c, p, marker, out)
}
}
}

func (f *sftpStore) find(c *sftp.Client, path, marker string, out chan *Object) {
if path == "" {
path = "."
}

if marker != "" {
f.doFind(c, path, marker, out)
return
}

// try the file with file path `path` as an object
fi, err := c.Stat(path)
if err != nil {
logger.Errorf("Stat %s error: %s", path, err)
return
}
if fi.IsDir() {
out <- &Object{"", 0, fi.ModTime(), true}
f.doFind(c, path, marker, out)
} else {
out <- &Object{"", fi.Size(), fi.ModTime(), false}
}
}

func (f *sftpStore) List(prefix, marker string, limit int64) ([]*Object, error) {
return nil, notSupported
}
Expand Down
2 changes: 1 addition & 1 deletion object/speedy.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *speedy) List(prefix, marker string, limit int64) ([]*Object, error) {
if strings.HasSuffix(item.Key, "/.speedycloud_dir_flag") {
continue
}
objs = append(objs, &Object{item.Key, item.Size, item.LastModified})
objs = append(objs, &Object{item.Key, item.Size, item.LastModified, strings.HasSuffix(item.Key, "/")})
}
return objs, nil
}
Expand Down
2 changes: 1 addition & 1 deletion object/ufile.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (u *ufile) List(prefix, marker string, limit int64) ([]*Object, error) {
}
objs := make([]*Object, len(out.DataSet))
for i, item := range out.DataSet {
objs[i] = &Object{item.FileName, item.Size, time.Unix(int64(item.ModifyTime), 0)}
objs[i] = &Object{item.FileName, item.Size, time.Unix(int64(item.ModifyTime), 0), strings.HasSuffix(item.FileName, "/")}
}
return objs, nil
}
Expand Down
3 changes: 2 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,8 +368,9 @@ OUT:
hasMore = false
break
}
if !config.Dirs && obj.Size == 0 && strings.HasSuffix(obj.Key, "/") {
if !config.Dirs && obj.IsDir {
// ignore directories
logger.Debug("Ignore directory ", obj.Key)
continue
}
atomic.AddUint64(&found, 1)
Expand Down

0 comments on commit e049423

Please sign in to comment.