Skip to content

Commit

Permalink
ok blob tests are passing!
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredly committed Feb 5, 2020
1 parent a9dbefd commit b954171
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 23 deletions.
28 changes: 22 additions & 6 deletions examples/simple-example/fault-tolerant/blob/basic-network.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ const syncFetch = async function<Data>(
mergeIntoLocal: (
remote: Blob<Data>,
etag: string,
) => Promise<{ blob: Blob<Data>, stamp: ?string }>,
) => Promise<?{ blob: Blob<Data>, stamp: ?string }>,

updateMeta: (
serverEtag: ?string,
Expand All @@ -108,19 +108,31 @@ const syncFetch = async function<Data>(
let toSend = local ? local.blob : null;
if (remote) {
const response = await mergeIntoLocal(remote.blob, remote.etag);
toSend = response.blob;
dirtyStamp = response.stamp;
if (response) {
toSend = response.blob;
dirtyStamp = response.stamp;
console.log('merged with changes');
} else {
toSend = null;
// TODO dirtyStamp should not be truthy in this case I don't think
// console.log('dirtyStamp', dirtyStamp);
dirtyStamp = null;
}
}
let newServerEtag = null;
if (toSend) {
console.log(remote ? 'pushing up merged' : 'pushing up local');
const t = toSend;
Object.keys(toSend).forEach(colid => {
if (Array.isArray(toSend[colid])) {
if (Array.isArray(t[colid])) {
throw new Error('Array in collection!');
}
});
newServerEtag = await putRemote(toSend);
}
await updateMeta(newServerEtag, dirtyStamp);
if (newServerEtag || dirtyStamp) {
await updateMeta(newServerEtag, dirtyStamp);
}
};

// TODO dedup with polling network
Expand All @@ -141,6 +153,7 @@ const createNetwork = <Delta, Data>(
connectionListeners.forEach(f => f(currentSyncStatus));
},
peerChange => {
console.log('received peer change');
handleCrossTabChanges(peerChange).catch(err =>
console.log('failed', err.message, err.stack),
);
Expand All @@ -166,7 +179,10 @@ const createNetwork = <Delta, Data>(
res.status === 304 ||
res.status === 404
) {
console.log('No changes!');
console.log(
'No changes from server!',
etag,
);
return null;
}
if (res.status !== 200) {
Expand Down
17 changes: 11 additions & 6 deletions examples/simple-example/fault-tolerant/blob/create-client.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ function createClient<Delta, Data, SyncStatus>(
// which doesn't seem terrible.
() => persistence.getFull(),
async (full, etag, sendCrossTabChanges) => {
const { merged, changedIds } = await persistence.mergeFull(
full,
etag,
crdt.merge,
);
const result = await persistence.mergeFull(full, etag, crdt.merge);
if (!result) {
return null;
}
const { merged, changedIds } = result;
Object.keys(changedIds).forEach(colid => {
const col = state[colid];
const data = merged.blob[colid];
Expand All @@ -72,13 +72,17 @@ function createClient<Delta, Data, SyncStatus>(
id,
value: crdt.value(data[id]),
}));
changedIds[colid].forEach(id => {
state[colid].cache[id] = data[id];
});
col.listeners.forEach(listener => {
listener(changes);
});
}
changedIds[colid].forEach(id => {
// Only update the cache if the node has already been cached
if (state[colid].cache[id]) {
// Umm is this necessary though?
if (state[colid].cache[id] || col.itemListeners[id]) {
state[colid].cache[id] = data[id];
}
if (col.itemListeners[id]) {
Expand Down Expand Up @@ -125,6 +129,7 @@ function createClient<Delta, Data, SyncStatus>(
state[colid],
getStamp,
network.setDirty,
network.sendCrossTabChanges,
);
},
onSyncStatus(fn) {
Expand Down
21 changes: 18 additions & 3 deletions examples/simple-example/fault-tolerant/blob/idb-persistence.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as hlc from '@local-first/hybrid-logical-clock';
import type { HLC } from '@local-first/hybrid-logical-clock';
import type { Delta, CRDT as Data } from '@local-first/nested-object-crdt';
import { type CursorType } from '../client';
import deepEqual from 'fast-deep-equal';
import deepEqual from '@birchill/json-equalish';
import type { FullPersistence } from '../delta/types';

export const makePersistence = function(
Expand Down Expand Up @@ -78,6 +78,7 @@ export const makePersistence = function(
);
const blob = {};
const changedIds = {};
let anyChanged = false;
await Promise.all(
Object.keys(datas).map(async col => {
const store = tx.objectStore('col:' + col);
Expand All @@ -92,6 +93,13 @@ export const makePersistence = function(
blob[col][key] = datas[col][key];
}
if (!deepEqual(prev, blob[col][key])) {
anyChanged = true;
const a = JSON.stringify(prev);
const b = JSON.stringify(blob[col][key]);
if (a === b) {
throw new Error('Jsonify says they are same');
}
console.log('Different', a, b);
if (!changedIds[col]) {
changedIds[col] = [key];
} else {
Expand All @@ -102,11 +110,18 @@ export const makePersistence = function(
});
}),
);
console.log('After merge, any changed?', anyChanged);
await tx.objectStore('meta').put(etag, 'serverEtag');
const dirty = await tx.objectStore('meta').get('dirty');
// console.log('Merged', blob);
await tx.done;
return { merged: { blob, stamp: dirty }, changedIds };
if (!anyChanged) {
return null;
}
return {
merged: { blob, stamp: dirty },
changedIds,
};
},
async applyDelta<Delta, Data>(
collection: string,
Expand All @@ -128,7 +143,7 @@ export const makePersistence = function(
}

await tx.objectStore('col:' + collection).put({ id, value });
return data;
return value;
},
};
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@ function createClient<Delta, Data, SyncStatus>(
state[colid],
getStamp,
network.setDirty,
network.sendCrossTabChanges,
);
},
onSyncStatus(fn) {
Expand Down
18 changes: 14 additions & 4 deletions examples/simple-example/fault-tolerant/delta/peer-tabs.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ export const peerTabAwareSync = function<SyncStatus>(
};

channel.onmessage = (
msg: { type: 'change', peerChange: PeerChange } | { type: 'sync' },
msg:
| { type: 'change', peerChange: PeerChange }
| { type: 'sync' }
| { type: 'status', status: SyncStatus },
) => {
console.log('got a message');
console.log('got a peer message', msg.type);
if (msg.type === 'sync' && sync !== originalSync) {
sync();
} else if (msg.type === 'change') {
handleCrossTabChange(msg.peerChange);
} else if (msg.type === 'status') {
onStatus(msg.status);
}
console.log('Processed message', msg);
};
Expand All @@ -49,8 +54,13 @@ export const peerTabAwareSync = function<SyncStatus>(
});

return {
sendConnectionStatus: (status: SyncStatus) => {},
sendCrossTabChange: (change: PeerChange) => {},
sendConnectionStatus: (status: SyncStatus) => {
channel.postMessage({ type: 'status', status });
},
sendCrossTabChange: (change: PeerChange) => {
console.log('Sending changes', change);
channel.postMessage({ type: 'change', peerChange: change });
},
sync: () => sync(),
};
};
6 changes: 5 additions & 1 deletion examples/simple-example/fault-tolerant/delta/shared.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// @flow

import type { Client, Collection } from '../types';
import type { Client, Collection, PeerChange } from '../types';
import type {
Persistence,
Network,
Expand Down Expand Up @@ -57,6 +57,7 @@ export const getCollection = function<Delta, Data, T>(
state: CollectionState<Data, T>,
getStamp: () => string,
setDirty: () => void,
sendCrossTabChanges: PeerChange => mixed,
): Collection<T> {
return {
async save(id: string, node: T) {
Expand All @@ -73,6 +74,7 @@ export const getCollection = function<Delta, Data, T>(
crdt.deltas.stamp(delta),
crdt.deltas.apply,
);
sendCrossTabChanges({ col: colid, nodes: [id] });
setDirty();
},

Expand All @@ -99,6 +101,7 @@ export const getCollection = function<Delta, Data, T>(
if (!deepEqual(plain, newPlain)) {
send(state, id, newPlain);
}
sendCrossTabChanges({ col: colid, nodes: [id] });
setDirty();
},

Expand Down Expand Up @@ -130,6 +133,7 @@ export const getCollection = function<Delta, Data, T>(
stamp,
crdt.deltas.apply,
);
sendCrossTabChanges({ col: colid, nodes: [id] });
setDirty();
},

Expand Down
4 changes: 2 additions & 2 deletions examples/simple-example/fault-tolerant/delta/types.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export type FullPersistence = {
full: Blob<Data>,
etag: string,
merge: (Data, Data) => Data,
): Promise<{
): Promise<?{
merged: { blob: Blob<Data>, stamp: ?string },
changedIds: { [colid: string]: Array<string> },
}>,
Expand Down Expand Up @@ -88,7 +88,7 @@ export type BlobNetworkCreator<Data, SyncStatus> = (
remote: Blob<Data>,
etag: string,
(PeerChange) => mixed,
) => Promise<{ blob: Blob<Data>, stamp: ?string }>,
) => Promise<?{ blob: Blob<Data>, stamp: ?string }>,
updateMeta: (
newServerEtag: ?string,
dirtyFlagToClear: ?string,
Expand Down
1 change: 1 addition & 0 deletions examples/simple-example/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"name": "simple-example-client",
"version": "1.0.0",
"dependencies": {
"@birchill/json-equalish": "^1.1.0",
"@local-first/hybrid-logical-clock": "^1.0",
"@local-first/nested-object-crdt": "^1.0",
"better-sqlite3": "^5.4.3",
Expand Down
7 changes: 6 additions & 1 deletion examples/simple-example/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ const setupServer = () => {
}
const blobPath = dataDir + '/blobs/stuff';
if (fs.existsSync(blobPath)) {
console.log('ditching the data stuff');
fs.unlinkSync(blobPath);
}
// Start serevr
Expand Down Expand Up @@ -140,6 +141,7 @@ const contention = async () => {
expect(await getCachedData(pageC), { a: itemA }, 'C 1');

// await wait(1000);
await browser.close();
app.http.close();
};

Expand Down Expand Up @@ -244,6 +246,7 @@ const full = async () => {
);
await triggerSync(pageC);
await wait();
console.log('checking');
expect(
await getCachedData(pageC),
{ a: itemA, b: itemB, c: itemC },
Expand All @@ -252,6 +255,8 @@ const full = async () => {

await addItem(pageC, 'd', itemD);
await wait();
await triggerSync(pageB);
await wait();
expect(
await getCachedData(pageA),
{ a: itemA, b: itemB, c: itemC, d: itemD },
Expand Down Expand Up @@ -357,7 +362,7 @@ const compaction = async () => {
const run = async () => {
await full();
await contention();
// await compaction();
await compaction();
};

run()
Expand Down
5 changes: 5 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,11 @@
lodash "^4.17.13"
to-fast-properties "^2.0.0"

"@birchill/json-equalish@^1.1.0":
version "1.1.0"
resolved "https://registry.yarnpkg.com/@birchill/json-equalish/-/json-equalish-1.1.0.tgz#cb01ac1ef032209ada516b6a4bf3d7a2306e972d"
integrity sha512-p6XZQQVTsqRh5wYg3Rf4EWuynWANMupIHykDS373RSVPzSJiZ2ID9gRvQs7/MZKdW7Ejaw587WLGV8C+wSitzw==

"@cnakazawa/watch@^1.0.3":
version "1.0.3"
resolved "https://registry.yarnpkg.com/@cnakazawa/watch/-/watch-1.0.3.tgz#099139eaec7ebf07a27c1786a3ff64f39464d2ef"
Expand Down

0 comments on commit b954171

Please sign in to comment.