Skip to content

Commit

Permalink
Support network auto recovery
Browse files Browse the repository at this point in the history
 - Reconnect stream when network is restored
 - Flush local changes after applying them to the server
 - #4
  • Loading branch information
hackerwins committed Feb 22, 2020
1 parent 2203474 commit e86ab28
Show file tree
Hide file tree
Showing 4 changed files with 269 additions and 210 deletions.
218 changes: 119 additions & 99 deletions dist/yorkie.js

Large diffs are not rendered by default.

12 changes: 12 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3.3'

services:
envoy:
build:
context: ./docker
dockerfile: ./envoy.Dockerfile
image: 'grpcweb:envoy'
container_name: 'envoy'
ports:
- '8080:8080'
- '9901:9901'
187 changes: 104 additions & 83 deletions src/core/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,26 @@ interface Attachment {
* to the agent to synchronize with other replicas in remote.
*/
export class Client implements Observable<ClientEvent> {
private client: YorkieClient;
private id: ActorID;
private key: string;
private status: ClientStatus;
private attachmentMap: Map<string, Attachment>;
private inSyncing: boolean;
private syncLoopDuration: number;
private reconnectStreamDelay: number;

private client: YorkieClient;
private remoteChangeEventStream: any;
private eventStream: Observable<ClientEvent>;
private eventStreamObserver: Observer<ClientEvent>;

constructor(rpcAddr: string, key?: string) {
this.client = new YorkieClient(rpcAddr, null, null);
this.key = key ? key : uuid();
this.status = ClientStatus.Deactivated;
this.attachmentMap = new Map();
this.inSyncing = false;
this.syncLoopDuration = 300;
this.reconnectStreamDelay = 500;

this.client = new YorkieClient(rpcAddr, null, null);
this.eventStream = createObservable<ClientEvent>((observer) => {
this.eventStreamObserver = observer;
});
Expand All @@ -68,6 +71,10 @@ export class Client implements Observable<ClientEvent> {
* different clients.
*/
public activate(): Promise<void> {
if (this.isActive()) {
return Promise.resolve();
}

return new Promise((resolve, reject) => {
const req = new ActivateClientRequest();
req.setClientKey(this.key);
Expand All @@ -81,6 +88,8 @@ export class Client implements Observable<ClientEvent> {
this.id = res.getClientId();
this.status = ClientStatus.Activated;
this.runSyncLoop();
this.runWatchLoop();

this.eventStreamObserver.next({
name: ClientEventType.StatusChanged,
value: this.status
Expand All @@ -100,6 +109,11 @@ export class Client implements Observable<ClientEvent> {
return Promise.resolve();
}

if (this.remoteChangeEventStream) {
this.remoteChangeEventStream.close();
this.remoteChangeEventStream = null;
}

return new Promise((resolve, reject) => {
const req = new DeactivateClientRequest();
req.setClientId(this.id);
Expand Down Expand Up @@ -127,16 +141,16 @@ export class Client implements Observable<ClientEvent> {
* this client will synchronize the given document.
*/
public attach(doc: Document, isManualSync?: boolean): Promise<Document> {
if (this.status !== ClientStatus.Activated) {
if (!this.isActive()) {
throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`);
}

doc.setActor(this.id);

const attaching = new Promise((resolve, reject) => {
return new Promise((resolve, reject) => {
const req = new AttachDocumentRequest();
req.setClientId(this.id);
req.setChangePack(converter.toChangePack(doc.flushLocalChanges()));
req.setChangePack(converter.toChangePack(doc.createChangePack()));

this.client.attachDocument(req, {}, (err, res) => {
if (err) {
Expand All @@ -147,31 +161,15 @@ export class Client implements Observable<ClientEvent> {
const pack = converter.fromChangePack(res.getChangePack());
doc.applyChangePack(pack);

logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey().toIDString()}"`)
resolve(doc);
});
}) as Promise<Document>;

if (isManualSync) {
return attaching.then((doc) => {
this.attachmentMap.set(doc.getKey().toIDString(), {
doc: doc,
isRealtimeSync: !isManualSync,
});
this.runWatchLoop();

return doc;
});
}

return attaching.then((doc) => {
return this.watch(doc as Document);
}).then((doc) => {
this.attachmentMap.set(doc.getKey().toIDString(), {
doc: doc,
isRealtimeSync: !isManualSync,
logger.info(`[AD] c:"${this.getKey()}" attaches d:"${doc.getKey().toIDString()}"`)
resolve(doc);
});

return doc;
});
}

Expand All @@ -184,10 +182,14 @@ export class Client implements Observable<ClientEvent> {
* document is no longer used by this client, it should be detached.
*/
public detach(doc: Document): Promise<Document> {
if (!this.isActive()) {
throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`);
}

return new Promise((resolve, reject) => {
const req = new DetachDocumentRequest();
req.setClientId(this.id);
req.setChangePack(converter.toChangePack(doc.flushLocalChanges()));
req.setChangePack(converter.toChangePack(doc.createChangePack()));

this.client.detachDocument(req, {}, (err, res) => {
if (err) {
Expand Down Expand Up @@ -224,6 +226,22 @@ export class Client implements Observable<ClientEvent> {
});
}

public subscribe(nextOrObserver, error?, complete?): Unsubscribe {
return this.eventStream.subscribe(nextOrObserver, error, complete);
}

public getID(): string {
return this.id;
}

public getKey(): string {
return this.key;
}

public isActive(): boolean {
return this.status === ClientStatus.Activated;
}

private runSyncLoop(): void {
const doLoop = () => {
if (!this.isActive()) {
Expand All @@ -239,52 +257,39 @@ export class Client implements Observable<ClientEvent> {
}
}

Promise.all(promises).then(() => {
Promise.all(promises).finally(() => {
setTimeout(doLoop, this.syncLoopDuration);
});
};

doLoop();
}

private syncInternal(doc: Document): Promise<Document> {
return new Promise((resolve, reject) => {
private runWatchLoop(): void {
const doLoop = () => {
if (!this.isActive()) {
return;
}

const req = new PushPullRequest();
req.setClientId(this.id);
const localPack = doc.flushLocalChanges();
req.setChangePack(converter.toChangePack(localPack));
if (this.remoteChangeEventStream) {
this.remoteChangeEventStream.close();
this.remoteChangeEventStream = null;
}

this.client.pushPull(req, {}, (err, res) => {
if (err) {
reject(err);
return;
const keys = [];
for (const [_, attachment] of this.attachmentMap) {
if (attachment.isRealtimeSync) {
keys.push(attachment.doc.getKey());
}
}

const remotePack = converter.fromChangePack(res.getChangePack());
doc.applyChangePack(remotePack);

const docKey = doc.getKey().toIDString();
const localSize = localPack.getChangeSize();
const remoteSize = remotePack.getChangeSize();
logger.info(
`[PP] c:"${this.getKey()}" sync d:"${docKey}", push:${localSize} pull:${remoteSize}`
);
resolve(doc);
});
});
}

// TODO replace target with key pattern, not a document.
private watch(doc: Document): Promise<Document> {
if (this.status !== ClientStatus.Activated) {
throw new YorkieError(Code.ClientNotActive, `${this.key} is not active`);
}
if (!keys.length) {
return;
}

return new Promise((resolve, reject) => {
const req = new WatchDocumentsRequest();
req.setClientId(this.id);
req.setDocumentKeysList(converter.toDocumentKeys([doc.getKey()]));
req.setDocumentKeysList(converter.toDocumentKeys(keys));

const stream = this.client.watchDocuments(req, {});
stream.on('data', (response) => {
Expand All @@ -294,38 +299,54 @@ export class Client implements Observable<ClientEvent> {
value: keys,
});

const attachment = this.attachmentMap.get(doc.getKey().toIDString());
attachment.remoteChangeEventReceved = true;
});
stream.on('status', (status) => {
console.log(status.code);
console.log(status.details);
console.log(status.metadata);
for (const key of keys) {
const attachment = this.attachmentMap.get(key.toIDString());
attachment.remoteChangeEventReceved = true;
}
});
stream.on('end', (end) => {
console.log(end);
stream.on('end', () => {
// stream end signal
this.remoteChangeEventStream = null;
setTimeout(doLoop, this.reconnectStreamDelay);
});
this.remoteChangeEventStream = stream;

logger.info(`[WD] "${this.getKey()}", "${doc.getKey().toIDString()}"`)
resolve(doc);
});
}
logger.info(`[WD] "${this.getKey()}", "${keys}"`)
};

public subscribe(nextOrObserver, error?, complete?): Unsubscribe {
return this.eventStream.subscribe(nextOrObserver, error, complete);
doLoop();
}

public getID(): string {
return this.id;
}
private syncInternal(doc: Document): Promise<Document> {
return new Promise((resolve, reject) => {
const req = new PushPullRequest();
req.setClientId(this.id);
const localChangePack = doc.createChangePack();
req.setChangePack(converter.toChangePack(localChangePack));

public getKey(): string {
return this.key;
}
let isRejected = false;
this.client.pushPull(req, {}, (err, res) => {
if (err) {
isRejected = true;
reject(err);
return;
}

public isActive(): boolean {
return this.status === ClientStatus.Activated;
const remoteChangePack = converter.fromChangePack(res.getChangePack());
doc.applyChangePack(remoteChangePack);

const docKey = doc.getKey().toIDString();
const localSize = localChangePack.getChangeSize();
const remoteSize = remoteChangePack.getChangeSize();
logger.info(
`[PP] c:"${this.getKey()}" sync d:"${docKey}", push:${localSize} pull:${remoteSize}`
);
}).on('end', () => {
if (isRejected) {
return;
}
resolve(doc);
});
});
}
}

Loading

0 comments on commit e86ab28

Please sign in to comment.