Skip to content

Commit

Permalink
Add synced vector map initialization using snapshots
Browse files Browse the repository at this point in the history
1. server store synced vector map in snapshot_info.
2. The client also receives vector clock when pulling pack from the snapshot, and replace it's vector map with pulled vector map.
  • Loading branch information
highcloud100 committed Feb 8, 2024
1 parent fa4222f commit 80f8987
Show file tree
Hide file tree
Showing 19 changed files with 872 additions and 765 deletions.
1 change: 1 addition & 0 deletions admin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,7 @@ func (c *Client) ListChangeSummaries(
seq,
snapshotMeta.Msg.Lamport,
snapshotMeta.Msg.Snapshot,
snapshotMeta.Msg.SyncedVectorMap,
)

if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions api/converter/from_pb.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ func FromChangePack(pbPack *api.ChangePack) (*change.Pack, error) {
return nil, err
}

syncedVectorMap, err := time.NewSyncedVectorMapFromJSON(pbPack.SyncedVectorMap)
if err != nil {
return nil, err
}

return &change.Pack{
DocumentKey: key.Key(pbPack.DocumentKey),
Checkpoint: fromCheckpoint(pbPack.Checkpoint),
Changes: changes,
Snapshot: pbPack.Snapshot,
MinSyncedTicket: minSyncedTicket,
IsRemoved: pbPack.IsRemoved,
SyncedVectorMap: syncedVectorMap,
}, nil
}

Expand Down
215 changes: 113 additions & 102 deletions api/yorkie/v1/admin.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/yorkie/v1/admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ message GetSnapshotMetaRequest {
message GetSnapshotMetaResponse {
bytes snapshot = 1;
int64 lamport = 2 [jstype = JS_STRING];
string synced_vector_map = 3;
}

message SearchDocumentsRequest {
Expand Down
1,193 changes: 602 additions & 591 deletions api/yorkie/v1/resources.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions api/yorkie/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ message ChangePack {
repeated Change changes = 4;
TimeTicket min_synced_ticket = 5;
bool is_removed = 6;
string synced_vector_map = 7;
}

message Change {
Expand Down
3 changes: 3 additions & 0 deletions pkg/document/change/pack.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type Pack struct {

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

// SyncedVectorMap is SnapShot's synced vector map information.
SyncedVectorMap time.SyncedVectorMap
}

// NewPack creates a new instance of Pack.
Expand Down
3 changes: 2 additions & 1 deletion pkg/document/document.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,8 @@ func (d *Document) ApplyChangePack(pack *change.Pack) error {
if len(pack.Snapshot) > 0 {
d.cloneRoot = nil
d.clonePresences = nil
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq); err != nil {
if err := d.doc.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq,
pack.SyncedVectorMap); err != nil {
return err
}
} else {
Expand Down
19 changes: 15 additions & 4 deletions pkg/document/internal_document.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,14 +125,20 @@ func NewInternalDocumentFromSnapshot(
return nil, err
}

id := change.InitialID.SyncLamport(lamport)

if svm == nil {
svm = time.InitialSyncedVectorMap(id.Lamport())
}

return &InternalDocument{
key: k,
status: StatusDetached,
root: crdt.NewRoot(obj),
presences: presences,
onlineClients: &gosync.Map{},
checkpoint: change.InitialCheckpoint.NextServerSeq(serverSeq),
changeID: change.InitialID.SyncLamport(lamport),
changeID: id,
syncedVectorMap: svm,
}, nil
}
Expand All @@ -156,7 +162,7 @@ func (d *InternalDocument) HasLocalChanges() bool {
func (d *InternalDocument) ApplyChangePack(pack *change.Pack) error {
// 01. Apply remote changes to both the cloneRoot and the document.
if len(pack.Snapshot) > 0 {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq); err != nil {
if err := d.applySnapshot(pack.Snapshot, pack.Checkpoint.ServerSeq, pack.SyncedVectorMap); err != nil {
return err
}
} else {
Expand Down Expand Up @@ -185,7 +191,7 @@ func (d *InternalDocument) ApplyChangePack(pack *change.Pack) error {
}

// GarbageCollect purge elements that were removed before the given time.
func (d *InternalDocument) GarbageCollect(minSeqVector *time.VectorClock) (int, error) {
func (d *InternalDocument) GarbageCollect(minSeqVector time.VectorClock) (int, error) {
return d.root.GarbageCollect(minSeqVector)
}

Expand Down Expand Up @@ -246,7 +252,11 @@ func (d *InternalDocument) RootObject() *crdt.Object {
return d.root.Object()
}

func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64) error {
func (d *InternalDocument) applySnapshot(
snapshot []byte,
serverSeq int64,
syncedVectorMap time.SyncedVectorMap,
) error {
rootObj, presences, err := converter.BytesToSnapshot(snapshot)
if err != nil {
return err
Expand All @@ -255,6 +265,7 @@ func (d *InternalDocument) applySnapshot(snapshot []byte, serverSeq int64) error
d.root = crdt.NewRoot(rootObj)
d.presences = presences
d.changeID = d.changeID.SyncLamport(serverSeq)
d.syncedVectorMap = syncedVectorMap

return nil
}
Expand Down
12 changes: 6 additions & 6 deletions pkg/document/time/vector_clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,20 @@ func (vc VectorClock) CopyAndSet(id string, value int64) VectorClock {
// ChangeActorID change the InitialActorID into New ActorID.
// Move the initial actor's vector clock to the given actor's vector clock.
// And delete the initial actor's vector clock.
func (svm SyncedVectorMap) ChangeActorID(newID *ActorID) {
id := newID.String()
func (svm SyncedVectorMap) ChangeActorID(id *ActorID) {
newID := id.String()
// Already has actor id given from the server.
if svm[id] != nil {
if svm[newID] != nil {
return
}

initID := InitialActorID.String()

// If current actor id is initial actor id
svm[id] = svm[initID]
svm[id][id] = svm[id][initID]
svm[newID] = svm[initID]
svm[newID][newID] = svm[newID][initID]

// delete initialID
delete(svm[id], initID)
delete(svm[newID], initID)
delete(svm, initID)
}
16 changes: 16 additions & 0 deletions pkg/document/time/vector_clock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,20 @@ func TestVectorClock(t *testing.T) {

assert.Equal(t, time.VectorClock{"1": 0, "2": 1, "3": 0}, minVec)
})

t.Run("ChangeActorID test", func(t *testing.T) {
initID := time.InitialActorID.String()
newID, err := time.ActorIDFromHex("111111111111111111111111")
assert.NoError(t, err)

vec := time.InitialSyncedVectorMap(1)
vec["B"] = time.VectorClock{"B": 3}
vec[initID]["B"] = 2

vec.ChangeActorID(newID)

assert.Equal(t, time.SyncedVectorMap{
newID.String(): time.VectorClock{newID.String(): 1, "B": 2},
"B": time.VectorClock{"B": 3}}, vec)
})
}
18 changes: 11 additions & 7 deletions server/backend/database/snapshot_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ type SnapshotInfo struct {

// CreatedAt is the time when the snapshot is created.
CreatedAt time.Time `bson:"created_at"`

// SyncedVectorMap is the synced vector map of the snapshot.
SyncedVectorMap string `bson:"synced_vector_map"`
}

// DeepCopy returns a deep copy of the SnapshotInfo.
Expand All @@ -53,13 +56,14 @@ func (i *SnapshotInfo) DeepCopy() *SnapshotInfo {
}

return &SnapshotInfo{
ID: i.ID,
ProjectID: i.ProjectID,
DocID: i.DocID,
ServerSeq: i.ServerSeq,
Lamport: i.Lamport,
Snapshot: i.Snapshot,
CreatedAt: i.CreatedAt,
ID: i.ID,
ProjectID: i.ProjectID,
DocID: i.DocID,
ServerSeq: i.ServerSeq,
Lamport: i.Lamport,
Snapshot: i.Snapshot,
CreatedAt: i.CreatedAt,
SyncedVectorMap: i.SyncedVectorMap,
}
}

Expand Down
1 change: 1 addition & 0 deletions server/packs/packs.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ func BuildDocumentForServerSeq(
snapshotInfo.ServerSeq,
snapshotInfo.Lamport,
snapshotInfo.Snapshot,
snapshotInfo.SyncedVectorMap,
)
if err != nil {
return nil, err
Expand Down
21 changes: 18 additions & 3 deletions server/packs/pushpull.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/yorkie-team/yorkie/api/converter"
"github.com/yorkie-team/yorkie/api/types"
"github.com/yorkie-team/yorkie/pkg/document/change"
"github.com/yorkie-team/yorkie/pkg/document/time"
"github.com/yorkie-team/yorkie/server/backend"
"github.com/yorkie-team/yorkie/server/backend/database"
"github.com/yorkie-team/yorkie/server/logging"
Expand Down Expand Up @@ -95,7 +96,7 @@ func pullPack(
return NewServerPack(docInfo.Key, change.Checkpoint{
ServerSeq: reqPack.Checkpoint.ServerSeq,
ClientSeq: cpAfterPush.ClientSeq,
}, nil, nil), nil
}, nil, nil, ""), nil
}

if initialServerSeq < reqPack.Checkpoint.ServerSeq {
Expand All @@ -122,7 +123,7 @@ func pullPack(
return nil, err
}

return NewServerPack(docInfo.Key, cpAfterPull, pulledChanges, nil), nil
return NewServerPack(docInfo.Key, cpAfterPull, pulledChanges, nil, ""), nil
}

return pullSnapshot(ctx, be, clientInfo, docInfo, reqPack, cpAfterPush, initialServerSeq)
Expand Down Expand Up @@ -155,13 +156,27 @@ func pullSnapshot(
return nil, err
}
}

// Remove the initial actor's vector clock and replace it with the client's vector clock.
// This is because the initial actor's vector clock is used for the snapshot.
initID := time.InitialActorID.String()
delete(doc.SyncedVectorMap(), string(clientInfo.ID))
delete(doc.SyncedVectorMap()[initID], initID)
doc.SyncedVectorMap()[clientInfo.ID.String()] = doc.SyncedVectorMap()[time.InitialActorID.String()]
delete(doc.SyncedVectorMap(), initID)

cpAfterPull := cpAfterPush.NextServerSeq(docInfo.ServerSeq)

snapshot, err := converter.SnapshotToBytes(doc.RootObject(), doc.AllPresences())
if err != nil {
return nil, err
}

syncedVectorMap, err := doc.SyncedVectorMap().EncodeToString()
if err != nil {
return nil, err
}

logging.From(ctx).Debugf(
"PULL: '%s' build snapshot with changes(%d~%d) from '%s', cp: %s",
clientInfo.Key,
Expand All @@ -171,7 +186,7 @@ func pullSnapshot(
cpAfterPull,
)

return NewServerPack(docInfo.Key, cpAfterPull, nil, snapshot), err
return NewServerPack(docInfo.Key, cpAfterPull, nil, snapshot, syncedVectorMap), err
}

func pullChangeInfos(
Expand Down
14 changes: 10 additions & 4 deletions server/packs/serverpacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ type ServerPack struct {

// IsRemoved is a flag that indicates whether the document is removed.
IsRemoved bool

// SyncedVectorMap is SnapShot's synced information.
SyncedVectorMap string
}

// NewServerPack creates a new instance of ServerPack.
Expand All @@ -57,12 +60,14 @@ func NewServerPack(
cp change.Checkpoint,
changeInfos []*database.ChangeInfo,
snapshot []byte,
syncedVectorMap string,
) *ServerPack {
return &ServerPack{
DocumentKey: key,
Checkpoint: cp,
ChangeInfos: changeInfos,
Snapshot: snapshot,
DocumentKey: key,
Checkpoint: cp,
ChangeInfos: changeInfos,
Snapshot: snapshot,
SyncedVectorMap: syncedVectorMap,
}
}

Expand Down Expand Up @@ -130,6 +135,7 @@ func (p *ServerPack) ToPBChangePack() (*api.ChangePack, error) {
Snapshot: p.Snapshot,
MinSyncedTicket: converter.ToTimeTicket(p.MinSyncedTicket),
IsRemoved: p.IsRemoved,
SyncedVectorMap: p.SyncedVectorMap,
}, nil
}

Expand Down
1 change: 1 addition & 0 deletions server/packs/snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func storeSnapshot(
snapshotInfo.ServerSeq,
snapshotInfo.Lamport,
snapshotInfo.Snapshot,
snapshotInfo.SyncedVectorMap,
)
if err != nil {
return err
Expand Down
10 changes: 8 additions & 2 deletions server/rpc/admin_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,15 @@ func (s *adminServer) GetSnapshotMeta(
return nil, err
}

syncedVectorMap, err := doc.SyncedVectorMap().EncodeToString()
if err != nil {
return nil, err
}

return connect.NewResponse(&api.GetSnapshotMetaResponse{
Lamport: doc.Lamport(),
Snapshot: snapshot,
Lamport: doc.Lamport(),
Snapshot: snapshot,
SyncedVectorMap: syncedVectorMap,
}), nil
}

Expand Down

0 comments on commit 80f8987

Please sign in to comment.