Skip to content

Commit

Permalink
mount: read data that is just written
Browse files Browse the repository at this point in the history
able read on data not flushed
multiple file open shares the same file handle

fix #1182 on linux
  • Loading branch information
chrislusf committed Jan 22, 2020
1 parent 09f4cee commit 6b48d24
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 22 deletions.
3 changes: 1 addition & 2 deletions weed/filesys/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
}

file := node.(*File)
file.isOpen = true
file.isOpen++
fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
fh.dirtyMetadata = true
return file, fh, nil

}
Expand Down
23 changes: 23 additions & 0 deletions weed/filesys/dirty_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,3 +222,26 @@ func max(x, y int64) int64 {
}
return y
}
func min(x, y int64) int64 {
if x < y {
return x
}
return y
}

func (pages *ContinuousDirtyPages) ReadDirtyData(ctx context.Context, data []byte, startOffset int64) (offset int64, size int, err error) {
bufSize := int64(len(data))
if startOffset+bufSize < pages.Offset {
return
}
if startOffset >= pages.Offset+pages.Size {
return
}

offset = max(pages.Offset, startOffset)
stopOffset := min(pages.Offset+pages.Size, startOffset+bufSize)
size = int(stopOffset - offset)
copy(data[offset-startOffset:], pages.Data[offset-pages.Offset:stopOffset-pages.Offset])

return
}
12 changes: 6 additions & 6 deletions weed/filesys/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type File struct {
wfs *WFS
entry *filer_pb.Entry
entryViewCache []filer2.VisibleInterval
isOpen bool
isOpen int
}

func (file *File) fullpath() filer2.FullPath {
Expand All @@ -42,7 +42,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {

glog.V(4).Infof("file Attr %s, open:%v, existing attr: %+v", file.fullpath(), file.isOpen, attr)

if !file.isOpen {
if file.isOpen <=0 {
if err := file.maybeLoadEntry(ctx); err != nil {
return err
}
Expand All @@ -52,7 +52,7 @@ func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Valid = time.Second
attr.Mode = os.FileMode(file.entry.Attributes.FileMode)
attr.Size = filer2.TotalSize(file.entry.Chunks)
if file.isOpen {
if file.isOpen > 0 {
attr.Size = file.entry.Attributes.FileSize
}
attr.Crtime = time.Unix(file.entry.Attributes.Crtime, 0)
Expand Down Expand Up @@ -81,7 +81,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op

glog.V(4).Infof("file %v open %+v", file.fullpath(), req)

file.isOpen = true
file.isOpen++

handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)

Expand Down Expand Up @@ -140,7 +140,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f
file.entry.Attributes.Mtime = req.Mtime.Unix()
}

if file.isOpen {
if file.isOpen > 0 {
return nil
}

Expand Down Expand Up @@ -218,7 +218,7 @@ func (file *File) Forget() {
}

func (file *File) maybeLoadEntry(ctx context.Context) error {
if file.entry == nil || !file.isOpen {
if file.entry == nil || file.isOpen <= 0{
entry, err := file.wfs.maybeLoadEntry(ctx, file.dir.Path, file.Name)
if err != nil {
return err
Expand Down
57 changes: 43 additions & 14 deletions weed/filesys/filehandle.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"path"
"time"

"github.com/gabriel-vasile/mimetype"

"github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/gabriel-vasile/mimetype"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
)
Expand Down Expand Up @@ -50,29 +51,50 @@ func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fus

glog.V(4).Infof("%s read fh %d: [%d,%d)", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size))

buff := make([]byte, req.Size)

totalRead, err := fh.readFromChunks(ctx, buff, req.Offset)
if err == nil {
dirtyOffset, dirtySize, dirtyReadErr := fh.readFromDirtyPages(ctx, buff, req.Offset)
if dirtyReadErr == nil && totalRead+req.Offset < dirtyOffset+int64(dirtySize) {
totalRead = dirtyOffset + int64(dirtySize) - req.Offset
}
}

resp.Data = buff[:totalRead]

if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}

return err
}

func (fh *FileHandle) readFromDirtyPages(ctx context.Context, buff []byte, startOffset int64) (offset int64, size int, err error) {
return fh.dirtyPages.ReadDirtyData(ctx, buff, startOffset)
}

func (fh *FileHandle) readFromChunks(ctx context.Context, buff []byte, offset int64) (int64, error) {

// this value should come from the filer instead of the old f
if len(fh.f.entry.Chunks) == 0 {
glog.V(1).Infof("empty fh %v/%v", fh.f.dir.Path, fh.f.Name)
return nil
return 0, nil
}

buff := make([]byte, req.Size)

if fh.f.entryViewCache == nil {
fh.f.entryViewCache = filer2.NonOverlappingVisibleIntervals(fh.f.entry.Chunks)
}

chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, req.Offset, req.Size)

totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, req.Offset)
chunkViews := filer2.ViewFromVisibleIntervals(fh.f.entryViewCache, offset, len(buff))

resp.Data = buff[:totalRead]
totalRead, err := filer2.ReadIntoBuffer(ctx, fh.f.wfs, fh.f.fullpath(), buff, chunkViews, offset)

if err != nil {
glog.Errorf("file handle read %s: %v", fh.f.fullpath(), err)
}

return err
return totalRead, err
}

// Write to the file handle
Expand Down Expand Up @@ -115,11 +137,12 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err

glog.V(4).Infof("%v release fh %d", fh.f.fullpath(), fh.handle)

fh.dirtyPages.releaseResource()
fh.f.isOpen--

fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))

fh.f.isOpen = false
if fh.f.isOpen <= 0 {
fh.dirtyPages.releaseResource()
fh.f.wfs.ReleaseHandle(fh.f.fullpath(), fuse.HandleID(fh.handle))
}

return nil
}
Expand All @@ -141,7 +164,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
return nil
}

return fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {
err = fh.f.wfs.WithFilerClient(ctx, func(client filer_pb.SeaweedFilerClient) error {

if fh.f.entry.Attributes != nil {
fh.f.entry.Attributes.Mime = fh.contentType
Expand Down Expand Up @@ -178,4 +201,10 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {

return nil
})

if err == nil {
fh.dirtyMetadata = false
}

return err
}
11 changes: 11 additions & 0 deletions weed/filesys/wfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type WFS struct {
// contains all open handles, protected by handlesLock
handlesLock sync.Mutex
handles []*FileHandle
pathToHandleIndex map[filer2.FullPath]int

bufPool sync.Pool

Expand All @@ -68,6 +69,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
wfs := &WFS{
option: option,
listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(option.DirListCacheLimit * 3).ItemsToPrune(100)),
pathToHandleIndex: make(map[filer2.FullPath]int),
bufPool: sync.Pool{
New: func() interface{} {
return make([]byte, option.ChunkSizeLimit)
Expand Down Expand Up @@ -102,18 +104,26 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHand
wfs.handlesLock.Lock()
defer wfs.handlesLock.Unlock()

index, found := wfs.pathToHandleIndex[fullpath]
if found && wfs.handles[index] != nil {
glog.V(2).Infoln(fullpath, "found fileHandle id", index)
return wfs.handles[index]
}

fileHandle = newFileHandle(file, uid, gid)
for i, h := range wfs.handles {
if h == nil {
wfs.handles[i] = fileHandle
fileHandle.handle = uint64(i)
wfs.pathToHandleIndex[fullpath] = i
glog.V(4).Infof( "%s reuse fh %d", fullpath,fileHandle.handle)
return
}
}

wfs.handles = append(wfs.handles, fileHandle)
fileHandle.handle = uint64(len(wfs.handles) - 1)
wfs.pathToHandleIndex[fullpath] = int(fileHandle.handle)
glog.V(4).Infof( "%s new fh %d", fullpath,fileHandle.handle)

return
Expand All @@ -124,6 +134,7 @@ func (wfs *WFS) ReleaseHandle(fullpath filer2.FullPath, handleId fuse.HandleID)
defer wfs.handlesLock.Unlock()

glog.V(4).Infof("%s ReleaseHandle id %d current handles length %d", fullpath, handleId, len(wfs.handles))
delete(wfs.pathToHandleIndex, fullpath)
if int(handleId) < len(wfs.handles) {
wfs.handles[int(handleId)] = nil
}
Expand Down

0 comments on commit 6b48d24

Please sign in to comment.