Skip to content

Commit

Permalink
Replace latestCreatedAtMapByActor with vector clock
Browse files Browse the repository at this point in the history
1. replace text edit's  latestCreatedAtMapByActor with vector clock.
2. Fix gc timing in gc_test.go, because gc changed to use vectorclock
3. Because it is used in text edit, context includes current vector.
  • Loading branch information
highcloud100 committed Feb 6, 2024
1 parent abc3187 commit 70ba504
Show file tree
Hide file tree
Showing 20 changed files with 704 additions and 648 deletions.
8 changes: 1 addition & 7 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,6 @@ func fromEdit(pbEdit *api.Operation_Edit) (*operations.Edit, error) {
if err != nil {
return nil, err
}
createdAtMapByActor, err := fromCreatedAtMapByActor(
pbEdit.CreatedAtMapByActor,
)
if err != nil {
return nil, err
}
executedAt, err := fromTimeTicket(pbEdit.ExecutedAt)
if err != nil {
return nil, err
Expand All @@ -387,7 +381,7 @@ func fromEdit(pbEdit *api.Operation_Edit) (*operations.Edit, error) {
parentCreatedAt,
from,
to,
createdAtMapByActor,
pbEdit.VectorClock,
pbEdit.Content,
pbEdit.Attributes,
executedAt,
Expand Down
14 changes: 7 additions & 7 deletions api/converter/to_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,13 +309,13 @@ func toRemove(remove *operations.Remove) (*api.Operation_Remove_, error) {
func toEdit(e *operations.Edit) (*api.Operation_Edit_, error) {
return &api.Operation_Edit_{
Edit: &api.Operation_Edit{
ParentCreatedAt: ToTimeTicket(e.ParentCreatedAt()),
From: toTextNodePos(e.From()),
To: toTextNodePos(e.To()),
CreatedAtMapByActor: toCreatedAtMapByActor(e.CreatedAtMapByActor()),
Content: e.Content(),
Attributes: e.Attributes(),
ExecutedAt: ToTimeTicket(e.ExecutedAt()),
ParentCreatedAt: ToTimeTicket(e.ParentCreatedAt()),
From: toTextNodePos(e.From()),
To: toTextNodePos(e.To()),
VectorClock: e.VectorClock(),
Content: e.Content(),
Attributes: e.Attributes(),
ExecutedAt: ToTimeTicket(e.ExecutedAt()),
},
}, nil
}
Expand Down
1,071 changes: 533 additions & 538 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ message Operation {
TimeTicket parent_created_at = 1;
TextNodePos from = 2;
TextNodePos to = 3;
map<string, TimeTicket> created_at_map_by_actor = 4;
map<string, int64> vectorClock = 4;
string content = 5;
TimeTicket executed_at = 6;
map<string, string> attributes = 7;
Expand Down
1 change: 0 additions & 1 deletion client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ func (c *Client) Attach(ctx context.Context, doc *document.Document, options ...
}

doc.SetActor(c.id)

doc.SetSyncedVectorMap(c.id)

if err := doc.Update(func(root *json.Object, p *presence.Presence) error {
Expand Down
14 changes: 10 additions & 4 deletions pkg/document/change/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,16 @@ type Context struct {
delimiter uint32
root *crdt.Root
presenceChange *innerpresence.PresenceChange
vectorClock time.VectorClock
}

// NewContext creates a new instance of Context.
func NewContext(id ID, message string, root *crdt.Root) *Context {
func NewContext(id ID, message string, root *crdt.Root, vectorClock time.VectorClock) *Context {
return &Context{
id: id,
message: message,
root: root,
id: id,
message: message,
root: root,
vectorClock: vectorClock,
}
}

Expand Down Expand Up @@ -94,3 +96,7 @@ func (c *Context) LastTimeTicket() *time.Ticket {
func (c *Context) SetPresenceChange(presenceChange innerpresence.PresenceChange) {
c.presenceChange = &presenceChange
}

func (c *Context) VectorClock() time.VectorClock {

Check failure on line 100 in pkg/document/change/context.go

View workflow job for this annotation

GitHub Actions / build

exported: exported method Context.VectorClock should have comment or be unexported (revive)
return c.vectorClock
}
3 changes: 2 additions & 1 deletion pkg/document/crdt/element.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ type Container interface {
type GCElement interface {
Element
removedNodesLen() int
purgeRemovedNodesBefore(ticket *time.Ticket) (int, error)
//TODO(highcloud100): Check the case where minSeqVector[actor.String()] is nil
purgeRemovedNodesBefore(minSeqVector *time.VectorClock) (int, error)
}

// Element represents JSON element.
Expand Down
48 changes: 22 additions & 26 deletions pkg/document/crdt/rga_tree_split.go
Original file line number Diff line number Diff line change
Expand Up @@ -445,23 +445,23 @@ func (s *RGATreeSplit[V]) findFloorNode(id *RGATreeSplitNodeID) *RGATreeSplitNod
func (s *RGATreeSplit[V]) edit(
from *RGATreeSplitNodePos,
to *RGATreeSplitNodePos,
latestCreatedAtMapByActor map[string]*time.Ticket,
vectorClock *time.VectorClock,
content V,
editedAt *time.Ticket,
) (*RGATreeSplitNodePos, map[string]*time.Ticket, error) {
) (*RGATreeSplitNodePos, error) {
// 01. Split nodes with from and to
toLeft, toRight, err := s.findNodeWithSplit(to, editedAt)
if err != nil {
return nil, nil, err
return nil, err
}
fromLeft, fromRight, err := s.findNodeWithSplit(from, editedAt)
if err != nil {
return nil, nil, err
return nil, err
}

// 02. delete between from and to
nodesToDelete := s.findBetween(fromRight, toRight)
latestCreatedAtMap, removedNodeMapByNodeKey := s.deleteNodes(nodesToDelete, latestCreatedAtMapByActor, editedAt)
removedNodeMapByNodeKey := s.deleteNodes(nodesToDelete, vectorClock, editedAt)

var caretID *RGATreeSplitNodeID
if toRight == nil {
Expand All @@ -482,7 +482,7 @@ func (s *RGATreeSplit[V]) edit(
s.removedNodeMap[key] = removedNode
}

return caretPos, latestCreatedAtMap, nil
return caretPos, nil
}

func (s *RGATreeSplit[V]) findBetween(from, to *RGATreeSplitNode[V]) []*RGATreeSplitNode[V] {
Expand All @@ -497,14 +497,13 @@ func (s *RGATreeSplit[V]) findBetween(from, to *RGATreeSplitNode[V]) []*RGATreeS

func (s *RGATreeSplit[V]) deleteNodes(
candidates []*RGATreeSplitNode[V],
latestCreatedAtMapByActor map[string]*time.Ticket,
vectorClock *time.VectorClock,
editedAt *time.Ticket,
) (map[string]*time.Ticket, map[string]*RGATreeSplitNode[V]) {
createdAtMapByActor := make(map[string]*time.Ticket)
) map[string]*RGATreeSplitNode[V] {
removedNodeMap := make(map[string]*RGATreeSplitNode[V])

if len(candidates) == 0 {
return createdAtMapByActor, removedNodeMap
return removedNodeMap
}

// There are 2 types of nodes in `candidates`: should delete, should not delete.
Expand All @@ -515,27 +514,21 @@ func (s *RGATreeSplit[V]) deleteNodes(
nodesToKeep = append(nodesToKeep, leftEdge)

for _, node := range candidates {
actorIDHex := node.createdAt().ActorIDHex()
actorID := node.createdAt().ActorID()

var latestCreatedAt *time.Ticket
if latestCreatedAtMapByActor == nil {
latestCreatedAt = time.MaxTicket
var latestSyncedAt *time.Ticket
if vectorClock == nil {
latestSyncedAt = time.MaxTicket
} else {
createdAt, ok := latestCreatedAtMapByActor[actorIDHex]
createdAt, ok := (*vectorClock)[actorID.String()]
if ok {
latestCreatedAt = createdAt
latestSyncedAt = time.NewTicket(createdAt, time.MaxDelimiter, actorID)
} else {
latestCreatedAt = time.InitialTicket
latestSyncedAt = time.InitialTicket
}
}

if node.Remove(editedAt, latestCreatedAt) {
latestCreatedAt := createdAtMapByActor[actorIDHex]
createdAt := node.id.createdAt
if latestCreatedAt == nil || createdAt.After(latestCreatedAt) {
createdAtMapByActor[actorIDHex] = createdAt
}

if node.Remove(editedAt, latestSyncedAt) {
removedNodeMap[node.id.key()] = node
} else {
nodesToKeep = append(nodesToKeep, node)
Expand All @@ -544,7 +537,7 @@ func (s *RGATreeSplit[V]) deleteNodes(
nodesToKeep = append(nodesToKeep, rightEdge)
s.deleteIndexNodes(nodesToKeep)

return createdAtMapByActor, removedNodeMap
return removedNodeMap
}

// findEdgesOfCandidates finds the edges outside `candidates`,
Expand Down Expand Up @@ -623,9 +616,12 @@ func (s *RGATreeSplit[V]) removedNodesLen() int {
}

// purgeRemovedNodesBefore physically purges nodes that have been removed.
func (s *RGATreeSplit[V]) purgeRemovedNodesBefore(ticket *time.Ticket) (int, error) {
func (s *RGATreeSplit[V]) purgeRemovedNodesBefore(minSeqVector *time.VectorClock) (int, error) {
count := 0
for _, node := range s.removedNodeMap {
lamport := (*minSeqVector)[node.removedAt.ActorID().String()]
// For now new ticket's actorID is always the same as the node's actorID.
ticket := time.NewTicket(lamport, time.MaxDelimiter, node.removedAt.ActorID())
if node.removedAt != nil && ticket.Compare(node.removedAt) >= 0 {
s.treeByIndex.Delete(node.indexNode)
s.purge(node)
Expand Down
7 changes: 4 additions & 3 deletions pkg/document/crdt/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (r *Root) GarbageCollect(minSeqVector *time.VectorClock) (int, error) {

for _, pair := range r.removedElementPairMapByCreatedAt {
actor := pair.elem.RemovedAt().ActorID()

//TODO(highcloud100): Check the case where minSeqVector[actor.String()] is nil
minTicket := time.NewTicket((*minSeqVector)[actor.String()], time.MaxDelimiter, actor)

if pair.elem.RemovedAt() != nil && minTicket.Compare(pair.elem.RemovedAt()) >= 0 {
Expand All @@ -155,9 +157,8 @@ func (r *Root) GarbageCollect(minSeqVector *time.VectorClock) (int, error) {
}

for _, node := range r.elementHasRemovedNodesSetByCreatedAt {
actor := node.CreatedAt().ActorID()
minTicket := time.NewTicket((*minSeqVector)[actor.String()], time.MaxDelimiter, actor)
purgedNodes, err := node.purgeRemovedNodesBefore(minTicket)

purgedNodes, err := node.purgeRemovedNodesBefore(minSeqVector)
if err != nil {
return 0, err
}
Expand Down
16 changes: 8 additions & 8 deletions pkg/document/crdt/root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,28 @@ func TestRoot(t *testing.T) {
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

fromPos, toPos, _ := text.CreateRange(0, 0)
_, _, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
_, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, "Hello World", text.String())
assert.Equal(t, 0, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(5, 10)
_, _, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, "HelloYorkied", text.String())
assert.Equal(t, 1, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(0, 5)
_, _, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, "Yorkied", text.String())
assert.Equal(t, 2, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(6, 7)
_, _, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, "Yorkie", text.String())
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestRoot(t *testing.T) {

for _, tc := range tests {
fromPos, toPos, _ := text.CreateRange(tc.from, tc.to)
_, _, err := text.Edit(fromPos, toPos, nil, tc.content, nil, ctx.IssueTimeTicket())
_, err := text.Edit(fromPos, toPos, nil, tc.content, nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, tc.want, text.String())
Expand All @@ -146,21 +146,21 @@ func TestRoot(t *testing.T) {
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

fromPos, toPos, _ := text.CreateRange(0, 0)
_, _, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
_, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, `[{"val":"Hello World"}]`, text.Marshal())
assert.Equal(t, 0, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(6, 11)
_, _, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, `[{"val":"Hello "},{"val":"Yorkie"}]`, text.Marshal())
assert.Equal(t, 1, root.GarbageLen())

fromPos, toPos, _ = text.CreateRange(0, 6)
_, _, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
registerElementHasRemovedNodes(fromPos, toPos, root, text)
assert.Equal(t, `[{"val":"Yorkie"}]`, text.Marshal())
Expand Down
10 changes: 5 additions & 5 deletions pkg/document/crdt/text.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,11 @@ func (t *Text) CreateRange(from, to int) (*RGATreeSplitNodePos, *RGATreeSplitNod
func (t *Text) Edit(
from,
to *RGATreeSplitNodePos,
latestCreatedAtMapByActor map[string]*time.Ticket,
vectorClock *time.VectorClock,
content string,
attributes map[string]string,
executedAt *time.Ticket,
) (*RGATreeSplitNodePos, map[string]*time.Ticket, error) {
) (*RGATreeSplitNodePos, error) {
val := NewTextValue(content, NewRHT())
for key, value := range attributes {
val.attrs.Set(key, value, executedAt)
Expand All @@ -236,7 +236,7 @@ func (t *Text) Edit(
return t.rgaTreeSplit.edit(
from,
to,
latestCreatedAtMapByActor,
vectorClock,
val,
executedAt,
)
Expand Down Expand Up @@ -323,6 +323,6 @@ func (t *Text) removedNodesLen() int {
}

// purgeRemovedNodesBefore physically purges nodes that have been removed.
func (t *Text) purgeRemovedNodesBefore(ticket *time.Ticket) (int, error) {
return t.rgaTreeSplit.purgeRemovedNodesBefore(ticket)
func (t *Text) purgeRemovedNodesBefore(minSeqVector *time.VectorClock) (int, error) {
return t.rgaTreeSplit.purgeRemovedNodesBefore(minSeqVector)
}
8 changes: 4 additions & 4 deletions pkg/document/crdt/text_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ func TestText(t *testing.T) {
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

fromPos, toPos, _ := text.CreateRange(0, 0)
_, _, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
_, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
assert.Equal(t, `[{"val":"Hello World"}]`, text.Marshal())

fromPos, toPos, _ = text.CreateRange(6, 11)
_, _, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
assert.Equal(t, `[{"val":"Hello "},{"val":"Yorkie"}]`, text.Marshal())
})
Expand Down Expand Up @@ -70,12 +70,12 @@ func TestText(t *testing.T) {
text := crdt.NewText(crdt.NewRGATreeSplit(crdt.InitialTextNode()), ctx.IssueTimeTicket())

fromPos, toPos, _ := text.CreateRange(0, 0)
_, _, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
_, err := text.Edit(fromPos, toPos, nil, "Hello World", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
assert.Equal(t, `[{"val":"Hello World"}]`, text.Marshal())

fromPos, toPos, _ = text.CreateRange(6, 11)
_, _, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
_, err = text.Edit(fromPos, toPos, nil, "Yorkie", nil, ctx.IssueTimeTicket())
assert.NoError(t, err)
assert.Equal(t, `[{"val":"Hello "},{"val":"Yorkie"}]`, text.Marshal())

Expand Down
4 changes: 3 additions & 1 deletion pkg/document/crdt/tree.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,11 +427,13 @@ func (t *Tree) removedNodesLen() int {
}

// purgeRemovedNodesBefore physically purges nodes that have been removed.
func (t *Tree) purgeRemovedNodesBefore(ticket *time.Ticket) (int, error) {
func (t *Tree) purgeRemovedNodesBefore(minSeqVector *time.VectorClock) (int, error) {
count := 0
nodesToBeRemoved := make(map[*TreeNode]bool)

for _, node := range t.removedNodeMap {
lamport := (*minSeqVector)[node.RemovedAt.ActorID().String()]
ticket := time.NewTicket(lamport, time.MaxDelimiter, node.RemovedAt.ActorID())
if node.RemovedAt != nil && ticket.Compare(node.RemovedAt) >= 0 {
count++
nodesToBeRemoved[node] = true
Expand Down

0 comments on commit 70ba504

Please sign in to comment.