Skip to content

Commit

Permalink
added new offset/length to inode data notifications
Browse files Browse the repository at this point in the history
  • Loading branch information
jbooth committed Mar 17, 2014
1 parent 8d8ae18 commit 40660ba
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 43 deletions.
20 changes: 13 additions & 7 deletions leaseserver/leaseclient.go
Expand Up @@ -18,8 +18,8 @@ func NewLeaseClient(hostAddr string) (*LeaseClient, error) {
}

// sends notification
func (lc LeaseClient) Notify(nodeid uint64) (err error) {
req := request{OP_NOTIFY, 0, nodeid, 0}
func (lc LeaseClient) Notify(nodeid uint64, off int64, length int64) (err error) {
req := request{OP_NOTIFY, 0, nodeid, 0, off, length}
_, err = lc.c.doRequest(req)
return err
}
Expand All @@ -29,7 +29,7 @@ func (lc LeaseClient) Notify(nodeid uint64) (err error) {
// also registers a callback for when the node is remotely changed, this will be triggered
// upon the file changing *unless* we've cancelled this lease. Recommend
func (lc LeaseClient) ReadLease(nodeid uint64) (l maggiefs.ReadLease, err error) {
req := request{OP_READLEASE, 0, nodeid, 0}
req := request{OP_READLEASE, 0, nodeid, 0, 0, 0}
resp, err := lc.c.doRequest(req)
if err != nil {
return nil, err
Expand All @@ -47,7 +47,7 @@ func (lc LeaseClient) GetNotifier() chan maggiefs.NotifyEvent {

// blocks until all leases are released for the given node
func (lc LeaseClient) WaitAllReleased(nodeid uint64) error {
req := request{OP_CHECKLEASES, 0, nodeid, 0}
req := request{OP_CHECKLEASES, 0, nodeid, 0, 0, 0}
resp, err := lc.c.doRequest(req)
if err != nil {
return err
Expand All @@ -63,15 +63,17 @@ func (lc LeaseClient) WaitAllReleased(nodeid uint64) error {
}

type NotifyEvent struct {
inodeid uint64
ackid uint64
inodeid uint64
offset int64
length int64
c *rawclient
}

func (n NotifyEvent) Ack() error {
// send ack message to server

req := request{OP_ACKNOWLEDGE, n.ackid, n.inodeid, n.ackid}
req := request{OP_ACKNOWLEDGE, n.ackid, n.inodeid, n.ackid, 0, 0}
n.c.sendRequestNoResponse(req)
return nil
}
Expand All @@ -80,6 +82,10 @@ func (n NotifyEvent) Inodeid() uint64 {
return n.inodeid
}

func (n NotifyEvent) OffAndLength() (int64, int64) {
return n.offset, n.length
}

type Lease struct {
leaseid uint64
inodeid uint64
Expand All @@ -89,7 +95,7 @@ type Lease struct {
// lets go of lock, committing our changes to all open readleases
func (l *Lease) Release() error {
var op = OP_READLEASE_RELEASE
req := request{op, l.leaseid, l.inodeid, 0}
req := request{op, l.leaseid, l.inodeid, 0, 0, 0}
_, err := l.c.doRequest(req)
return err
}
45 changes: 22 additions & 23 deletions leaseserver/leaseserver.go
Expand Up @@ -96,13 +96,15 @@ func (c *clientConn) ack(ackId uint64) error {
}

// sends a notification, returning a pendingAck which will wait for acknowledgement
func (c *clientConn) notify(nodeid uint64, leaseid uint64, ackId uint64) pendingAck {
func (c *clientConn) notify(nodeid uint64, leaseid uint64, ackId uint64, offset int64, length int64) pendingAck {
// send notification to client
r := response{
Reqno: ackId,
Leaseid: leaseid,
Inodeid: nodeid,
Status: STATUS_NOTIFY,
Reqno: ackId,
Leaseid: leaseid,
Inodeid: nodeid,
Status: STATUS_NOTIFY,
NotifyStartPos: offset,
NotifyLength: length,
}
c.resp <- r
// create pending ack
Expand Down Expand Up @@ -139,13 +141,13 @@ type pendingAck struct {
func (p pendingAck) waitAcknowledged() {
// timeout hardcoded for now
fmt.Printf("Waiting acknowledged conn id %d ackid %d\n", p.c.id, p.ackId)
timeout := time.After(time.Second * 5)
timeout := time.After(time.Second * 120)
select {
case <-p.ack:
return
case <-timeout:
// client lease is EXPIRED, KILL IT
fmt.Printf("Conn %d timed out waiting for ackid %d\n", p.c.id, p.ackId)
log.Printf("Conn %d timed out after 120s waiting for ackid %d\n", p.c.id, p.ackId)
p.c.closeAndDie()
}
return
Expand Down Expand Up @@ -266,19 +268,14 @@ func (ls *LeaseServer) process() {
case OP_READLEASE:
resp, err = ls.createLease(qr.req, qr.conn)
case OP_NOTIFY:
// notify will dispatch a goroutine and eventually queue OP_NOTIFY_DONE
err = ls.notify(qr.req, qr.conn)
// no response from this loop, notify spins off a goroutine to respond
// no response from this loop, notify spins off a goroutine to respond directly to qr.conn
continue
case OP_READLEASE_RELEASE:
resp, err = ls.releaseLease(qr.req, qr.conn)
if err != nil {
log.Printf("Error releasing readlease: %s\n", err.Error())
}
case OP_NOTIFY_DONE:
// this comes from a previous invocation of NOTIFY
// response finally goes to the original calling client
resp = response{qr.req.Reqno, 0, 0, STATUS_OK}
case OP_ACKNOWLEDGE:
//fmt.Printf("got ack for client id %d, ackid %d\n", qr.conn.id, qr.req.Leaseid)
qr.conn.ack(qr.req.Leaseid)
Expand All @@ -299,7 +296,7 @@ func (ls *LeaseServer) process() {
// send responses
if err != nil {
fmt.Printf("error processing request %+v, error: %s", qr.req, err)
qr.conn.resp <- response{0, 0, 0, STATUS_ERR}
qr.conn.resp <- response{0, 0, 0, STATUS_ERR, 0, 0}
} else {
//fmt.Printf("processed request %+v, sending response %+v\n", qr.req, resp)
qr.conn.resp <- resp
Expand All @@ -322,7 +319,7 @@ func (ls *LeaseServer) createLease(r request, c *clientConn) (response, error) {
ls.leasesByInode[r.Inodeid] = leasesForInode
// record in ls.leases under lease Id
ls.leasesById[l.leaseid] = l
return response{r.Reqno, leaseid, r.Inodeid, STATUS_OK}, nil
return response{r.Reqno, leaseid, r.Inodeid, STATUS_OK, 0, 0}, nil
}

func (ls *LeaseServer) releaseLease(r request, c *clientConn) (response, error) {
Expand All @@ -345,7 +342,7 @@ func (ls *LeaseServer) releaseLease(r request, c *clientConn) (response, error)
delete(ls.leasesByInode, inodeid)
}
// done
resp := response{r.Reqno, 0, 0, STATUS_OK}
resp := response{r.Reqno, 0, 0, STATUS_OK, 0, 0}
return resp, nil
}

Expand Down Expand Up @@ -373,20 +370,22 @@ func (ls *LeaseServer) notify(r request, c *clientConn) error {
// find all readleases attached to this inode id
readLeases := ls.leasesByInode[r.Inodeid]

pendingAcks := make([]pendingAck, len(readLeases))
pendingAcks := make([]pendingAck, 0, len(readLeases)-1)
// notify them all
idx := 0
for _, rl := range readLeases {
ls.ackIdCounter += 1
pendingAcks[idx] = rl.client.notify(r.Inodeid, rl.leaseid, ls.ackIdCounter)
if rl.client.id != c.id {
ls.ackIdCounter += 1
pendingAcks = append(pendingAcks, rl.client.notify(r.Inodeid, rl.leaseid, ls.ackIdCounter, r.NotifyStartPos, r.NotifyLength))
}
}
// spin off function which responds to committer after all readleases have acked
go func() {
// wait all readers acknowledged
for _, ack := range pendingAcks {
ack.waitAcknowledged()
}
// queue response to be sent to the client who committed
c.resp <- response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_OK}
c.resp <- response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_OK, r.NotifyStartPos, r.NotifyLength}
}()
return nil
}
Expand All @@ -395,9 +394,9 @@ func (ls *LeaseServer) checkLeases(r request, c *clientConn) (response, error) {
inodeLeases := ls.leasesByInode[r.Inodeid]
//fmt.Printf("Checking leases for inode %d : %+v\n", r.Inodeid, inodeLeases)
if inodeLeases != nil && len(inodeLeases) > 0 {
return response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_WAIT}, nil
return response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_WAIT, 0, 0}, nil
}
return response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_OK}, nil
return response{r.Reqno, r.Leaseid, r.Inodeid, STATUS_OK, 0, 0}, nil
}

// atomically adds incr to val, returns new val
Expand Down
21 changes: 16 additions & 5 deletions leaseserver/leaseserver_test.go
Expand Up @@ -58,7 +58,7 @@ func TestCommit(t *testing.T) {
}
fmt.Println("committing")
go func() {
err := ls.Notify(nodeid)
err := ls.Notify(nodeid, 4096, 8192)
if err != nil {
panic(err)
}
Expand All @@ -83,7 +83,7 @@ func TestCommit(t *testing.T) {
func TestWaitForAck(t *testing.T) {
nodeid := uint64(10)
o.Do(startServer)
fmt.Printf("testCommit getting readlease\n")
fmt.Printf("testCommit getting readlease ls2\n")
rl, _ := ls2.ReadLease(nodeid)
fmt.Printf("got lease %+v, cli id %d\n", rl, 0)
fmt.Println("asserting no notification so far")
Expand All @@ -99,14 +99,15 @@ func TestWaitForAck(t *testing.T) {
fmt.Println("committing")
notifyDone := make(chan bool)
go func() {
err := ls.Notify(nodeid)
err := ls.Notify(nodeid, 4096, 8192)
if err != nil {
panic(err)
}
notifyDone <- true
fmt.Println("done committing")
fmt.Println("done committing ls1")
}()
fmt.Println("waiting for notification")

fmt.Println("checking that ls2 got notify")
threeSecondTimeout = time.After(time.Duration(3 * 1e9))
var n maggiefs.NotifyEvent
select {
Expand Down Expand Up @@ -138,6 +139,16 @@ func TestWaitForAck(t *testing.T) {
fmt.Println("notify did NOT return after ack!")
t.Fail()
}
fmt.Println("Testing that committer didn't get its own notify: ")
threeSecondTimeout = time.After(time.Duration(3 * 1e9))
select {
case <-threeSecondTimeout:
fmt.Println("ls1 didn't receive notify for its own commit, we're good")
break
case <-ls.GetNotifier():
fmt.Println("ls1 got its own notify! not supposed to!")
t.Fail()
}
fmt.Println("releasing readlease")
rl.Release()
}
Expand Down
2 changes: 1 addition & 1 deletion leaseserver/rawclient.go
Expand Up @@ -83,7 +83,7 @@ func (c *rawclient) mux() {
case resp := <-c.responses:
if resp.Status == STATUS_NOTIFY {
// this is a notification so forward to the notification chan
c.notifier <- NotifyEvent{inodeid: resp.Inodeid, ackid: resp.Reqno, c: c}
c.notifier <- NotifyEvent{inodeid: resp.Inodeid, ackid: resp.Reqno, offset: resp.NotifyStartPos, length: resp.NotifyLength, c: c}
} else {
// response to a request, forward to it's response chan
k := resp.Reqno
Expand Down
13 changes: 9 additions & 4 deletions leaseserver/reqresp.go
Expand Up @@ -23,11 +23,16 @@ type request struct {
Leaseid uint64
Inodeid uint64
Reqno uint64 // sent back with response so we know which request it was, overridden with ackID if an ack
// notify startpos and length, only valid if OP=OP_NOTIFY this is the range of positions in the file to throw out cache for
NotifyStartPos int64
NotifyLength int64
}

type response struct {
Reqno uint64 // reqno that was sent with the request, overridden with ackID if a notify
Leaseid uint64
Inodeid uint64
Status byte // ok, err, or we're a notify
Reqno uint64 // reqno that was sent with the request, overridden with ackID if a notify
Leaseid uint64
Inodeid uint64
Status byte // ok, err, or we're a notify
NotifyStartPos int64 // startpos and length of changes for a notification if this is STATUS_NOTIFY
NotifyLength int64
}
10 changes: 7 additions & 3 deletions maggiefs/nameservice.go
Expand Up @@ -12,8 +12,10 @@ type LeaseService interface {
// upon the file changing *unless* we've cancelled this lease.
ReadLease(nodeid uint64) (l ReadLease, err error)

// sends a notification to all holders of ReadLease for this nodeid
Notify(nodeid uint64) (err error)
// sends a notification to all holders of ReadLease for this nodeid that its bytes has changed at the
// offset and length we're advertising here
// readers must either ack in a timely manner or their lease will be interrupted
Notify(nodeid uint64, off int64, length int64) (err error)

// returns a chan which will contain an event every time any inode in the system is changed
// used for cache coherency
Expand All @@ -31,8 +33,10 @@ type ReadLease interface {
type NotifyEvent interface {
// client MUST call ack for every received notifyEvent. If not, your client lease will expire.
Ack() error
// the inode id we're notifying a change for
// the inode id we're notifying a change for,
Inodeid() uint64
// the offset and length of the file that were changed
OffAndLength() (int64, int64)
}

type NameService interface {
Expand Down

0 comments on commit 40660ba

Please sign in to comment.