Skip to content

Commit

Permalink
More tweaks.
Browse files Browse the repository at this point in the history
  • Loading branch information
mstone committed Feb 25, 2015
1 parent deaabcc commit b54100a
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 312 deletions.
46 changes: 23 additions & 23 deletions internal/connection/connection.go
Expand Up @@ -61,29 +61,6 @@ func (c *conn) allocFd() int {
return fd
}

func (c *conn) onVppOpen(m msg.Msg) {
srvrReplyChan := make(chan im.Allocdocresp)
c.srvr <- im.Allocdoc{
Reply: srvrReplyChan,
Name: m.Name,
}

srvrResp := <-srvrReplyChan
if srvrResp.Err != nil {
panic("conn unable to Allocdoc")
}

fd := c.allocFd()
doc := srvrResp.Doc
c.setDoc(fd, doc)

doc <- im.Open{
Conn: c.msgs,
Name: m.Name,
Fd: fd,
}
}

func (c *conn) getDoc(fd int) (chan interface{}, bool) {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -108,6 +85,29 @@ func (c *conn) setDoc(fd int, doc chan interface{}) {
c.fds[doc] = fd
}

func (c *conn) onVppOpen(m msg.Msg) {
srvrReplyChan := make(chan im.Allocdocresp)
c.srvr <- im.Allocdoc{
Reply: srvrReplyChan,
Name: m.Name,
}

srvrResp := <-srvrReplyChan
if srvrResp.Err != nil {
panic("conn unable to Allocdoc")
}

fd := c.allocFd()
doc := srvrResp.Doc
c.setDoc(fd, doc)

doc <- im.Open{
Conn: c.msgs,
Name: m.Name,
Fd: fd,
}
}

func (c *conn) onVppWrite(m msg.Msg) {
doc, ok := c.getDoc(m.Fd)
if !ok {
Expand Down
25 changes: 13 additions & 12 deletions internal/document/document.go
Expand Up @@ -6,7 +6,7 @@ package document
import (
im "github.com/mstone/focus/internal/msgs"
"github.com/mstone/focus/ot"
// "gopkg.in/inconshreveable/log15.v2"
"gopkg.in/inconshreveable/log15.v2"
)

// struct doc represents a vaporpad (like a file)
Expand Down Expand Up @@ -72,38 +72,39 @@ func (d *doc) readLoop() {
Rev: len(d.hist),
}
case im.Write:
// log15.Info("recv", "obj", "doc", "rev", v.Rev, "ops", v.Ops, "docrev", len(d.hist), "dochist", d.Body())
rev, ops := d.transform(v.Rev, v.Ops.Clone())
log15.Info("recv", "obj", "doc", "rev", v.Rev, "ops", v.Ops, "docrev", len(d.hist), "dochist", d.Body(), "nrev", rev, "tops", ops)
d.broadcast(v.Conn, rev, ops)
}
}
}

func (d *doc) transform(rev int, ops ot.Ops) (int, ot.Ops) {
func (d *doc) transform(rev int, clientOps ot.Ops) (int, ot.Ops) {
// extract concurrent ops
concurrentOps := []ot.Ops{}
concurrentServerOps := []ot.Ops{}
if rev < len(d.hist) {
concurrentOps = d.hist[rev:]
concurrentServerOps = d.hist[rev:]
}

// compose concurrent ops
composedOps := ot.Ops{}
for _, concurrentOp := range concurrentOps {
composedOps = ot.Compose(composedOps, concurrentOp)
serverOps := ot.Ops{}
for _, concurrentOp := range concurrentServerOps {
serverOps = ot.Compose(serverOps, concurrentOp)
}

// produce transformed ops
transformedOps, _ := ot.Transform(ops, composedOps)
forServer, _ := ot.Transform(clientOps, serverOps)

// update history
d.hist = append(d.hist, transformedOps)
// d.hist = append(d.hist, transformedOps)
d.hist = append(d.hist, forServer)

// update composed ops for new conns
d.comp = ot.Compose(d.comp, transformedOps)
d.comp = ot.Compose(d.comp, forServer)

rev = len(d.hist)

return rev, transformedOps
return rev, forServer
}

func (d *doc) broadcast(conn chan interface{}, rev int, ops ot.Ops) {
Expand Down

0 comments on commit b54100a

Please sign in to comment.