Skip to content

Commit

Permalink
added more error handling for reads/writes, badNode registry on DataC…
Browse files Browse the repository at this point in the history
…lient
  • Loading branch information
jbooth committed Apr 11, 2014
1 parent 20dcca2 commit c57cb59
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 293 deletions.
2 changes: 0 additions & 2 deletions client/openfilemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ type openFile struct {
fh uint64
ino *openInode
w *Writer
r *Reader
}

func NewOpenFileMap(leases maggiefs.LeaseService, names maggiefs.NameService, datas maggiefs.DataService, localDnId *uint32, onNotify func(uint64)) *OpenFileMap {
Expand Down Expand Up @@ -124,7 +123,6 @@ func (o *OpenFileMap) Open(inodeid uint64, writable bool) (fh uint64, err error)
fh,
ino,
w,
NewReader(),
}
o.files[f.fh] = f
return fh, nil
Expand Down
216 changes: 36 additions & 180 deletions client/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,24 @@ import (
"fmt"
"github.com/jbooth/maggiefs/fuse"
"github.com/jbooth/maggiefs/maggiefs"
"github.com/jbooth/maggiefs/splice"
"io"
"log"
"sync"
"sync/atomic"
)

// maggiefs.Iocb
type readIocb struct {
p fuse.ReadPipe
onDone chan error
}

func (r readIocb) OnSuccess() error {
r.onDone <- r.p.Commit()
return nil
}

func (r readIocb) OnErr(err error) {
r.onDone <- err
}

// stateless function to serve random reads
func Read(datas maggiefs.DataService, inode *maggiefs.Inode, p fuse.ReadPipe, position uint64, length uint32) (err error) {
if position == inode.Length {
Expand Down Expand Up @@ -58,25 +69,37 @@ func Read(datas maggiefs.DataService, inode *maggiefs.Inode, p fuse.ReadPipe, po
}
// read bytes
//fmt.Printf("reading from block %+v at posInBlock %d, length %d array offset %d \n",block,posInBlock,numBytesFromBlock,offset)

onDone := make(chan error, 1)
// send read request
if numBytesFromBlock == length-nRead {
// if the rest of the read is coming from this block, read and commit
// note this call is async, not done when we return
onDone := make(chan bool, 1)
err = datas.Read(block, p, posInBlock, numBytesFromBlock, func() {
err = datas.Read(block, p, posInBlock, numBytesFromBlock, func(err error) {
// bail on error
if err != nil {
onDone <- err
return
}
// commit to fuse server on success
e1 := p.Commit()
if e1 != nil {
log.Printf("Err committing to pipe from remote read of block %+v, read at posInBlock %d", block, posInBlock)
onDone <- fmt.Errorf("Err (%s) committing to pipe from remote read of block %+v, read at posInBlock %d", e1, block, posInBlock)
} else {
onDone <- nil
}
onDone <- true
})
<-onDone
} else {
// else, read and wait till done, commit next time around
onDone := make(chan bool, 1)
err = datas.Read(block, p, posInBlock, numBytesFromBlock, func() { onDone <- true })
<-onDone
err = datas.Read(block, p, posInBlock, numBytesFromBlock, func(err error) { onDone <- err })
}
// check no err sending read request
if err != nil {
return fmt.Errorf("reader.go error reading from block %+v : %s", block, err.Error())
}
if err != nil && err != io.EOF {
// wait done
err = <-onDone
if err != nil {
return fmt.Errorf("reader.go error reading from block %+v : %s", block, err.Error())
}
nRead += numBytesFromBlock
Expand All @@ -87,170 +110,3 @@ func Read(datas maggiefs.DataService, inode *maggiefs.Inode, p fuse.ReadPipe, po
// sometimes the length can be more bytes than there are in the file, so always just give that back
return nil
}

func NewReader() *Reader {
return &Reader{new(sync.Mutex), 0, nil, 0, 0, 0}
}

type Reader struct {
l *sync.Mutex // guards readahead state
lastReadEndPos uint64 // heuristic for whether to switch to readahead mode -- 2 sequential reads means readahead
aheadPipe *splice.Pair // pipe for readahead
aheadPos uint64 // inode position of first byte in readahead pipe
aheadPending uint32 // how many bytes are pending to the pipe -- might be in pipe or might not be yet
aheadRequestUnfinished uint32 // this is set to >0 if we have a request running that's not finished yet
}

func (r *Reader) Read(datas maggiefs.DataService, inode *maggiefs.Inode, p fuse.ReadPipe, position uint64, length uint32) (err error) {
if position == inode.Length {
// write header for OK, 0 bytes at EOF
log.Printf("Read at EOF, position %d length %d, returning 0", position, inode.Length)
p.WriteHeader(0, 0)
return nil
}
if position > inode.Length {
return errors.New("Read past end of file")
}
if position+uint64(length) > inode.Length {
// truncate length to the EOF
log.Printf("Truncating length from %d to %d", length, inode.Length-position)
length = uint32(inode.Length - position)
}
blockForRead, err := blockForPos(position, inode)
if err != nil {
return err
}

// if this read doesn't line up with pipe, treat it as a random read
// first read should also always be random
if ((position + uint64(length)) > blockForRead.EndPos) || position == 0 {
r.l.Lock()
if r.lastReadEndPos == position {
// start readahead
err = r.initReadahead(position)
if err != nil {
return err
}
err = r.fillMore(datas, inode)
if err != nil {
return err
}
} else {
// serve random read without lock
r.lastReadEndPos = position + uint64(length)
r.l.Unlock()
return Read(datas, inode, p, position, length)
}
}
// serving from readahead pipe
defer r.l.Unlock()
// write header
err = p.WriteHeader(0, int(length))
if err != nil {
log.Printf("Error writing resp header to splice pipe : %s", err)
return err
}
nRead := uint32(0)
for nRead < length {
// serve what we can from pipe
numToSplice := length
if r.aheadPending < length {
numToSplice = r.aheadPending
}
if numToSplice > 0 {
log.Printf("Splicing %d from readahead", numToSplice)
numSpliced, err := p.LoadFrom(r.aheadPipe.ReadFd(), int(numToSplice))
if err != nil {
return err
}
if numSpliced < 0 {
return fmt.Errorf("Splice returned -1!")
}
r.aheadPos += uint64(numSpliced)
r.aheadPending -= uint32(numSpliced)
nRead += uint32(numSpliced)
}
// issue request to send more to pipe
err = r.fillMore(datas, inode)
if err != nil {
return err
}
}
return p.Commit()
}

func (r *Reader) Close() error {
err := r.drain()
splice.Done(r.aheadPipe)
return err
}

var pipeSize = 512 * 1024

func (r *Reader) initReadahead(startPos uint64) (err error) {
r.aheadPos = startPos
r.aheadPending = 0
if r.aheadPipe == nil {
r.aheadPipe, err = splice.Get()
if err != nil {
return err
}
err = r.aheadPipe.Grow(pipeSize)
if err != nil {
return err
}
} else {
return r.drain()
}
return nil
}

var drain = make([]byte, 128*1024)

func (r *Reader) drain() (err error) {
drainTo := drain[:]
if r.aheadPending < uint32(len(drainTo)) {
drainTo = drainTo[:int(r.aheadPending)]
}
for r.aheadPending > 0 {
n, err := r.aheadPipe.Write(drainTo)
r.aheadPending -= uint32(n)
if err != nil {
return err
}
}
return nil
}

func (r *Reader) fillMore(datas maggiefs.DataService, i *maggiefs.Inode) (err error) {
// if there's already an unfinished request running, don't do anything
alreadyReading := atomic.LoadUint32(&r.aheadRequestUnfinished)
if alreadyReading > 0 {
return nil
}
// same if we have enough pending bytes to fill the pipe already
if r.aheadPending == uint32(pipeSize) {
return nil
}
// send a read request to fill the pipe
posToRead := r.aheadPos + uint64(r.aheadPending)
block, err := blockForPos(posToRead, i)
if err != nil {
return err
}
posInBlock := posToRead - block.StartPos
numBytesFromBlock := uint32(block.Length()) - uint32(posInBlock)
// only request as many as our pipe can handle
pipeCapacity := uint32(pipeSize) - r.aheadPending
if numBytesFromBlock > pipeCapacity {
numBytesFromBlock = pipeCapacity
}
// mark a read in progress, update our state
atomic.StoreUint32(&r.aheadRequestUnfinished, 1)
r.aheadPending += numBytesFromBlock
// fire a read which marks done
err = datas.Read(block, r.aheadPipe, posInBlock, numBytesFromBlock, func() {
atomic.StoreUint32(&r.aheadRequestUnfinished, 0)
})
return err
}
29 changes: 11 additions & 18 deletions client/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,15 @@ func NewWriter(inodeid uint64, doNotify func(inodeid uint64, offset int64, lengt
return ret
}

type offAndLen struct {
type writeResult struct {
err error
off int64
length int64
}

// represents either an outstanding write or a sync request
type pendingWrite struct {
done chan offAndLen
done chan writeResult
isSync bool
}

Expand Down Expand Up @@ -98,6 +99,9 @@ func (w *Writer) process() {
newLen := int64(0)
for _, write := range updates {
newOffAndLen := <-write.done
if newOffAndLen.err != nil {
log.Printf("Error on write! Implement proper behavior here, seriously!")
}
if newOff == 0 && newLen == 0 {
// don't have a previous write we're extending, so set them and reloop
newOff = newOffAndLen.off
Expand Down Expand Up @@ -170,10 +174,10 @@ func (w *Writer) Write(datas maggiefs.DataService, inode *maggiefs.Inode, p []by
}
writes := blockwrites(inode, p, position, length)
for _, wri := range writes {
pending := pendingWrite{make(chan offAndLen, 1), false}
pending := pendingWrite{make(chan writeResult, 1), false}
w.pendingWrites <- pending
err = w.datas.Write(wri.b, wri.p, wri.posInBlock, func() {
finishedWrite := offAndLen{int64(wri.b.StartPos + wri.posInBlock), int64(len(wri.p))}
err = w.datas.Write(wri.b, wri.p, wri.posInBlock, func(err error) {
finishedWrite := writeResult{err, int64(wri.b.StartPos + wri.posInBlock), int64(len(wri.p))}
//log.Printf("Finished write, operation %+v, in callback now \n", finishedWrite)
pending.done <- finishedWrite
})
Expand All @@ -188,17 +192,6 @@ func (w *Writer) Sync() error {
w.l.Lock()
defer w.l.Unlock()
w.doSync()
// questionable optimization, we let go of the lock while waiting on sync,
// so other writes can get in behind us and keep the pipeline full
//syncRequest := pendingWrite{make(chan offAndLen), true}
//if w.closed {
// w.l.Unlock()
// return fmt.Errorf("Can't sync, already closed!")
//}
//w.pendingWrites <- syncRequest
//w.l.Unlock()
//// since syncRequest is unbuffered, this will block until processed
//syncRequest.done <- offAndLen{0, 0}
return nil
}

Expand All @@ -218,10 +211,10 @@ func (w *Writer) doSync() {
if w.closed {
return
}
syncRequest := pendingWrite{make(chan offAndLen), true}
syncRequest := pendingWrite{make(chan writeResult), true}
w.pendingWrites <- syncRequest
// since syncRequest is unbuffered, this will block until processed
syncRequest.done <- offAndLen{0, 0}
syncRequest.done <- writeResult{nil, 0, 0}
}

type blockwrite struct {
Expand Down
Loading

0 comments on commit c57cb59

Please sign in to comment.