Skip to content

Commit

Permalink
Add comments to core class properties
Browse files Browse the repository at this point in the history
  • Loading branch information
jbakse committed Jan 27, 2022
1 parent f0cd1ca commit 9fc2840
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 46 deletions.
4 changes: 4 additions & 0 deletions notes/todo.todo
Expand Up @@ -92,7 +92,11 @@
# Code Review
-- client ids sometimes called "id" sometimes "name", confusing?
-- room has properties that are DS records and properties that are Party Records, rename properties to improve clarity?
-- Room recordList -> userRecordList?
-- compare whenReady() implemenations in Room, Client, Record
-- should room.participants be room.participantIds
-- why use room.participants as cache of #roomDataRecord.participants
might be better to just call #roomDataRecord.get on demand, removs a property, less to think about


-- merging incoming data with shared object
Expand Down
20 changes: 14 additions & 6 deletions src/Client.js
Expand Up @@ -2,14 +2,20 @@ import { DeepstreamClient } from "@deepstream/client";
import { createEmitter } from "./emitter";
import * as log from "./log";

/* !global DeepstreamClient */

/*
* Client
*
* Wraps and manages a deepstream client connection.
* Keeps an up to date list of connected clients.
*
*/
export class Client {
#name;
#name; // uid: provided by deepstream
#deepstreamClient; // ds.Client: wrapped by this Client
#clients = []; // [uid]: currently connected clients

#isReady;
#emitter;
#deepstreamClient;
#clients = [];

constructor(host) {
this.#deepstreamClient = new DeepstreamClient(host);
Expand All @@ -20,6 +26,8 @@ export class Client {
this.#deepstreamClient.on("connectionStateChanged", (connectionState) =>
log.debug("connectionStateChanged", connectionState)
);

// get current connected clients and subscribe for updates
this.#deepstreamClient.presence.getAll((error, clients) => {
this.#clients = clients;
});
Expand Down Expand Up @@ -78,7 +86,7 @@ export class Client {
this.#deepstreamClient.close();
}

// @todo getter
// @todo getter?
name() {
return this.#name;
}
Expand Down
60 changes: 32 additions & 28 deletions src/Record.js
Expand Up @@ -2,33 +2,36 @@ import * as log from "./log";
import onChange from "on-change";
import { createEmitter } from "./emitter";

// const customMergeStrategy = (
// localValue,
// localVersion,
// remoteValue,
// remoteVersion,
// callback
// ) => {
// log.warn("Merge");
// callback(null, remoteValue);
// };
/*
* Record
*
* - creates an object `#shared`
* - wraps and observes `#shared` with `#watchShared` (using onChange)
* // https://github.com/sindresorhus/on-change
* - syncs changes via deepstream record `#record`
* - deep merges incoming changes into `#shared`
*
*/
export class Record {
#client;
#name;
#shared;
#watchedShared;
#emitter;
#client; // party.Client: currently connected client
#name; // string: full name of record (e.g. "appName-roomName/_recordName")
#shared; // {}: internal read/write object to be synced with other clients
#watchedShared; // Proxy: observable object wrapping #shared
#record; // ds.Record: the record this party.Record is managing

#isReady;
#record;
#emitter;

constructor(client, name) {
this.#client = client;
this.#name = name;
this.#shared = {};
this.#watchedShared = onChange(
this.#shared,
this._onClientChangedData.bind(this)
this.#onClientChangedData.bind(this)
);
// create a reference back to this party.Record from the shared object
// property key is a Symbol so its unique and mostly hidden
this.#shared[Symbol.for("Record")] = this;
this.#emitter = createEmitter();
this.#isReady = false;
Expand Down Expand Up @@ -76,14 +79,14 @@ export class Record {
async _connect() {
await this.#client.whenReady();

// subscribe to record
// get and subscribe to record
this.#record = this.#client.getRecord(this.#name);
// this.#record.setMergeStrategy(customMergeStrategy);
this.#record.subscribe("shared", this.#onServerChangedData.bind(this));

this.#record.subscribe("shared", this._onServerChangedData.bind(this));
await this.#record.whenReady();
// this.#record.delete(); // emergency clear it

// initialize shared object
// #todo should we use setWithAck or await #record.whenReady()?
if (!this.#record.get("shared")) this.#record.set("shared", {});

// report
Expand All @@ -93,27 +96,29 @@ export class Record {
this.#isReady = true;
this.#emitter.emit("ready");
}
_onClientChangedData(path, newValue, oldValue) {
// on-change alerts us when the value actually changes

#onClientChangedData(path, newValue, oldValue) {
// on-change alerts us only when the value actually changes
// so we don't need to test if newValue and oldValue are different
this.#record.set("shared." + path, newValue);
}

// _onServerChangedData
// called from deepstreem. this is called when the deepstreem data store is
// updated, even if the update was local. If the update is local
// this.#shared === data -> true
// because this.#shared was updated first, triggering this callback
// if the change originated non-locally, than this.#shared does need to be
// updated

_onServerChangedData(data) {
#onServerChangedData(data) {
// don't replace #shared itself as #watchedShared has a reference to it
// instead patch it to match the incoming data
patchInPlace(this.#shared, data, "shared");
}
}

function getType(value) {
function getMergeType(value) {
if (value === null) return "null";
if (typeof value === "object") return "object";
if (typeof value === "boolean") return "primative";
Expand Down Expand Up @@ -150,8 +155,8 @@ function patchInPlace(_old, _new, _keyPath = "") {
// patch shared object and array keys
for (const key of newKeys) {
if (Object.prototype.hasOwnProperty.call(_old, key)) {
const oldType = getType(_old[key]);
const newType = getType(_new[key]);
const oldType = getMergeType(_old[key]);
const newType = getMergeType(_new[key]);
if (oldType === "unsupported") {
log.error(
`${_keyPath}.${key} is unsupported type: ${typeof _new[key]}`
Expand All @@ -166,7 +171,6 @@ function patchInPlace(_old, _new, _keyPath = "") {
}
if (oldType !== "object" || newType !== "object") {
if (_old[key] !== _new[key]) {
// log.debug(`update ${_keyPath}.${key}`);
_old[key] = _new[key];
}
continue;
Expand Down
31 changes: 19 additions & 12 deletions src/Room.js
Expand Up @@ -5,20 +5,26 @@ import * as log from "./log";
// eslint-disable-next-line
import css from "./party_debug.css";

/*
* Room
*
* Keeps track of particpants and records related to a specific app/room.
*/

export class Room {
#client;
#appName;
#roomName;
#emitter;
#client; // party.Client: currently connected client
#appName; // string: user provide name for the app
#roomName; // string: user provide name for the room

#roomDataRecord; // ds_record {participants: [uid], host: uid}
#participants; // cache of #roomDataRecord.participants
#recordList; // ds_list records created in this room by request (not participants)
#participantRecords; // {uid: party_record}
#participantShareds; // [(watche)shared] cache of #participantRecords.getShared()s
#clientParticpantRecord; // party_record, participant record for currently connected client
#roomDataRecord; // ds.Record: {participants: [uid], host: uid}
#participants; // [uid]: cache of #roomDataRecord.participants
#recordList; // ds.List: user records created in this room
#participantRecords; // {uid: party.Record}: map of particpant records for room
#participantShareds; // [(watched)shared] cache of #participantRecords.getShared()s
#clientParticpantRecord; // party.Record, participant record this client

#isReady;
#emitter;

constructor(client, appName, roomName) {
this.#client = client;
Expand All @@ -35,10 +41,10 @@ export class Room {
);

this.#isReady = false;
this._connect();
this.#connect();
}

async _connect() {
async #connect() {
await this.#client.whenReady();
const connectRoomData = async () => {
// load the _room_data record
Expand All @@ -51,6 +57,7 @@ export class Room {
this.#participants = this.#roomDataRecord.get("participants");
if (!this.#participants) {
this.#participants = [];
// @todo change next two lines to setWithAck?
this.#roomDataRecord.set("participants", []);
await this.#roomDataRecord.whenReady();
}
Expand Down
5 changes: 5 additions & 0 deletions src/emitter.js
Expand Up @@ -5,16 +5,21 @@

export const createEmitter = () => ({
events: {},
// send message to subscribers
emit(event, ...args) {
for (const i of this.events[event] || []) {
i(...args);
}
},

// subscribe to future messages
on(event, cb) {
(this.events[event] = this.events[event] || []).push(cb);
return () =>
(this.events[event] = this.events[event].filter((i) => i !== cb));
},

// subscribe for first future message only
once(event, cb) {
const unbind = this.on(event, (...args) => {
unbind();
Expand Down

0 comments on commit 9fc2840

Please sign in to comment.