-
Notifications
You must be signed in to change notification settings - Fork 77
/
sync.js
151 lines (122 loc) · 3.27 KB
/
sync.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
setClock(makeClock(new Timestamp(0, 0, makeClientId())));
let _onSync = null;
let _syncEnabled = true;
function setSyncingEnabled(flag) {
_syncEnabled = flag;
}
async function post(data) {
let res = await fetch('https://crdt.jlongster.com/server/sync', {
method: 'POST',
body: JSON.stringify(data),
headers: {
'Content-Type': 'application/json'
}
});
res = await res.json();
if (res.status !== 'ok') {
throw new Error('API error: ' + res.reason);
}
return res.data;
}
function apply(msg) {
let table = _data[msg.dataset];
if (!table) {
throw new Error('Unknown dataset: ' + msg.dataset);
}
let row = table.find(row => row.id === msg.row);
if (!row) {
table.push({ id: msg.row, [msg.column]: msg.value });
} else {
row[msg.column] = msg.value;
}
}
function compareMessages(messages) {
let existingMessages = new Map();
// This could be optimized, but keeping it simple for now. Need to
// find the latest message that exists for the dataset/row/column
// for each incoming message, so sort it first
let sortedMessages = [..._messages].sort((m1, m2) => {
if (m1.timestamp < m2.timestamp) {
return 1;
} else if (m1.timestamp > m2.timestamp) {
return -1;
}
return 0;
});
messages.forEach(msg1 => {
let existingMsg = sortedMessages.find(
msg2 =>
msg1.dataset === msg2.dataset &&
msg1.row === msg2.row &&
msg1.column === msg2.column
);
existingMessages.set(msg1, existingMsg);
});
return existingMessages;
}
function applyMessages(messages) {
let existingMessages = compareMessages(messages);
let clock = getClock();
messages.forEach(msg => {
let existingMsg = existingMessages.get(msg);
if (!existingMsg || existingMsg.timestamp < msg.timestamp) {
apply(msg);
}
if (!existingMsg || existingMsg.timestamp !== msg.timestamp) {
clock.merkle = merkle.insert(
clock.merkle,
Timestamp.parse(msg.timestamp)
);
_messages.push(msg);
}
});
_onSync && _onSync();
}
function sendMessages(messages) {
applyMessages(messages);
sync(messages);
}
function receiveMessages(messages) {
messages.forEach(msg =>
Timestamp.recv(getClock(), Timestamp.parse(msg.timestamp))
);
applyMessages(messages);
}
function onSync(func) {
_onSync = func;
}
async function sync(initialMessages = [], since = null) {
if (!_syncEnabled) {
return;
}
let messages = initialMessages;
if (since) {
let timestamp = new Timestamp(since, 0, '0').toString();
messages = _messages.filter(msg => msg.timestamp >= timestamp);
}
let result;
try {
result = await post({
group_id: 'my-group',
client_id: getClock().timestamp.node(),
messages,
merkle: getClock().merkle
});
} catch (e) {
throw new Error('network-failure');
}
if (result.messages.length > 0) {
receiveMessages(result.messages);
}
let diffTime = merkle.diff(result.merkle, getClock().merkle);
if (diffTime) {
if (since && since === diffTime) {
throw new Error(
'A bug happened while syncing and the client ' +
'was unable to get in sync with the server. ' +
"This is an internal error that shouldn't happen"
);
}
return sync([], diffTime);
}
}