Skip to content

Commit

Permalink
ok broadcast channel looks good
Browse files Browse the repository at this point in the history
  • Loading branch information
jaredly committed Jan 30, 2020
1 parent 11fa1ce commit 76d01aa
Show file tree
Hide file tree
Showing 7 changed files with 138 additions and 64 deletions.
3 changes: 3 additions & 0 deletions examples/simple-example/client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import makeClient, {
getStamp,
syncMessages,
onMessage,
receiveCrossTabChanges,
type ClientState,
type CursorType,
} from '../fault-tolerant/client';
Expand All @@ -29,7 +30,9 @@ const setup = () => {
syncMessages(client.persistence, client.collections, reconnected),
messages =>
Promise.all(messages.map(message => onMessage(client, message))),
peerChange => receiveCrossTabChanges(client, peerChange),
);
client.listeners.push(network.sendCrossTabChanges);
client.setDirty = network.sync;
return { client, onConnection: network.onConnection };
};
Expand Down
77 changes: 53 additions & 24 deletions examples/simple-example/client/poll.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import makeClient, {
debounce,
type ClientState,
type CRDTImpl,
type PeerChange,
} from '../fault-tolerant/client';
import {
type ClientMessage,
Expand All @@ -15,7 +16,7 @@ import type { Persistence } from '../fault-tolerant/clientTypes.js';
import backOff from '../shared/back-off';
import poller from './poller';

const sync = async function<Delta, Data>(
const syncFetch = async function<Delta, Data>(
url: string,
sessionId: string,
getMessages: (
Expand Down Expand Up @@ -46,38 +47,66 @@ export function makeNetwork<Delta, Data>(
reconnected: boolean,
) => Promise<Array<ClientMessage<Delta, Data>>>,
onMessages: (Array<ServerMessage<Delta, Data>>) => Promise<mixed>,
onCrossTabChanges: PeerChange => Promise<void>,
): {
sync: () => void,
onConnection: ((boolean) => void) => void,
sendCrossTabChanges: PeerChange => void,
} {
const listeners = [];
const poll = poller(
3 * 1000,
() =>
new Promise(res => {
backOff(() =>
sync(url, sessionId, getMessages, onMessages).then(
() => {
listeners.forEach(f => f(true));
res();
return true;
},
err => {
console.error('Failed to sync');
console.error(err);
listeners.forEach(f => f(false));
return false;
},
),
);
}),
);

poll();
const {
BroadcastChannel,
createLeaderElection,
} = require('broadcast-channel');
const channel = new BroadcastChannel('local-first', {
webWorkerSupport: false,
});

channel.onmessage = msg => {
console.log('got a message');
onCrossTabChanges(msg).catch(err =>
console.log('failed', err.message, err.stack),
);
console.log('Processed message', JSON.stringify(msg));
};

const elector = createLeaderElection(channel);
let sync = () => {};
elector.awaitLeadership().then(() => {
console.log('Im the leader');
const poll = poller(
3 * 1000,
() =>
new Promise(res => {
backOff(() =>
syncFetch(url, sessionId, getMessages, onMessages).then(
() => {
listeners.forEach(f => f(true));
res();
return true;
},
err => {
console.error('Failed to sync');
console.error(err);
listeners.forEach(f => f(false));
return false;
},
),
);
}),
);
poll();
sync = debounce(poll);
});

return {
onConnection: fn => {
listeners.push(fn);
},
sync: debounce(poll),
sync: () => sync(),
sendCrossTabChanges: peerChange => {
channel.postMessage(peerChange);
},
};
}
2 changes: 2 additions & 0 deletions examples/simple-example/client/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ const setup = (makeNetwork, url) => {
messages.map(message => clientLib.onMessage(client, message)),
);
},
peerChange => clientLib.receiveCrossTabChanges(client, peerChange),
);
client.listeners.push(network.sendCrossTabChanges);
console.log('setting up');
client.setDirty = network.sync;
window.client = client;
Expand Down
107 changes: 69 additions & 38 deletions examples/simple-example/client/ws.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@ import makeClient, {
onMessage,
syncMessages,
debounce,
receiveCrossTabChanges,
type ClientState,
type CRDTImpl,
type PeerChange,
} from '../fault-tolerant/client';
import {
type ClientMessage,
Expand Down Expand Up @@ -58,65 +60,94 @@ const reconnectingSocket = (
return state;
};

// const doThings = (persistence, url, crdt) => {
// const client = makeClient(persistence, crdt, () => {});
// const network = makeNetwork(
// url,
// persistence.getHLC().node,
// () => syncMessages(client.persistence, client.collections),
// messages => messages.forEach(message => onMessage(client, message)),
// );
// client.setDirty = network.sync;
// };

export function makeNetwork<Delta, Data>(
url: string,
sessionId: string,
getMessages: (
reconnected: boolean,
) => Promise<Array<ClientMessage<Delta, Data>>>,
onMessages: (Array<ServerMessage<Delta, Data>>) => Promise<mixed>,
onCrossTabChanges: PeerChange => Promise<void>,
): {
sync: () => void,
onConnection: ((boolean) => void) => void,
sendCrossTabChanges: PeerChange => void,
} {
const listeners = [];
const state = reconnectingSocket(
`${url}?sessionId=${sessionId}`,
() => sync(true),
msg => {
const messages = JSON.parse(msg);
onMessages(messages);
},
listeners,
);

const sync = (reconnected: boolean = false) => {
if (state.socket) {
const socket = state.socket;
getMessages(reconnected).then(
messages => {
if (messages.length) {
socket.send(JSON.stringify(messages));
} else {
console.log('nothing to sync here');
}
},
err => {
console.error('Failed to sync messages folks');
console.error(err);
},
const {
BroadcastChannel,
createLeaderElection,
} = require('broadcast-channel');
const channel = new BroadcastChannel('local-first', {
webWorkerSupport: false,
});

channel.onmessage = msg => {
if (msg.type === 'sync' && sync != followerSync) {
console.log('got peer sync');
sync();
} else if (msg.type === 'change') {
console.log('got a message');
onCrossTabChanges(msg.change).catch(err =>
console.log('failed', err.message, err.stack),
);
} else {
console.log('but no socket');
console.log('Processed message', JSON.stringify(msg.change));
}
};

// client.listeners.push(colChanges => {
// channel.postMessage(colChanges);
// });

const elector = createLeaderElection(channel);
const followerSync = _ignored => {
channel.postMessage({ type: 'sync' });
};
let sync = followerSync;
elector.awaitLeadership().then(() => {
console.log('Im the leader');
const state = reconnectingSocket(
`${url}?sessionId=${sessionId}`,
() => sync(true),
msg => {
const messages = JSON.parse(msg);
onMessages(messages);
},
listeners,
);

sync = (reconnected: boolean = false) => {
if (state.socket) {
const socket = state.socket;
getMessages(reconnected).then(
messages => {
if (messages.length) {
socket.send(JSON.stringify(messages));
} else {
console.log('nothing to sync here');
}
},
err => {
console.error('Failed to sync messages folks');
console.error(err);
},
);
} else {
console.log('but no socket');
}
};
});

// sync();
return {
sync,
sync: () => sync(false),
onConnection: fn => {
listeners.push(fn);
},
sendCrossTabChanges: peerChange => {
console.log('sending cross tab');
channel.postMessage({ type: 'change', change: peerChange });
},
};
}
7 changes: 7 additions & 0 deletions examples/simple-example/fault-tolerant/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,16 @@ import {
validateSet,
} from '@local-first/nested-object-crdt/schema.js';
import type { Persistence } from './clientTypes.js';
export type { Persistence, PeerChange } from './clientTypes.js';

export type { CursorType } from './server.js';

/*
Cross-tab use case to support
- The network thing knows about leader election.
*/

export type CRDTImpl<Delta, Data> = {
createEmpty: () => Data,
applyDelta: (Data, Delta) => Data,
Expand Down
2 changes: 2 additions & 0 deletions examples/simple-example/fault-tolerant/clientTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import { type ClientState, type CRDTImpl } from './client';
import type { HLC } from '@local-first/hybrid-logical-clock';
import type { CursorType } from './server.js';

export type PeerChange = { col: string, nodes: Array<string> };

export type makeClient = <Delta, Data>(
persistence: Persistence<Delta, Data>,
crdt: CRDTImpl<Delta, Data>,
Expand Down
4 changes: 2 additions & 2 deletions examples/simple-example/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ const full = async () => {
false,
);

await wait();
await wait(500);
expect(await getData(pageB), { a: itemA }, 'B 0');
await addItem(pageB, 'b', itemB);
await wait();
Expand All @@ -209,7 +209,7 @@ const full = async () => {

app = runServer(serverPort, server);
console.log('please reconnect');
await wait(500);
await wait(1000);

expect(await getData(pageA), { a: itemA, b: itemB, c: itemC }, 'A 4');
expect(
Expand Down

0 comments on commit 76d01aa

Please sign in to comment.