Skip to content
This repository has been archived by the owner on Jul 8, 2022. It is now read-only.

Commit

Permalink
Version 1.3.0: Resend messages when the connection fails.
Browse files Browse the repository at this point in the history
  • Loading branch information
michaellperry committed Jan 7, 2016
1 parent 75e15cc commit f27cac2
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 12 deletions.
12 changes: 12 additions & 0 deletions callgraph
@@ -0,0 +1,12 @@
digraph "Jinaga" {
rankdir=LR

"JinagaDistributor.onMessage" -> "JinagaCoordinator.onDelivered"
"JinagaCoordinator.onDelivered" -> "MemoryProvider.dequeue"

"Jinaga.fact" -> "JinagaCoordinator.fact"
"JinagaCoordinator.fact" -> "MemoryProvider.save"
"MemoryProvider.save" -> "MemoryProvider.insertNode"
"MemoryProvider.insertNode" -> "JinagaCoordinator.onSaved"
"JinagaCoordinator.onSaved" -> "MemoryProvider.push"
}
2 changes: 1 addition & 1 deletion node/package.json
@@ -1,6 +1,6 @@
{
"name": "jinaga",
"version": "1.2.1",
"version": "1.3.0",
"description": "JSON Messaging Platform",
"main": "jinaga.js",
"repository": {
Expand Down
4 changes: 4 additions & 0 deletions src/interface.ts
Expand Up @@ -258,9 +258,12 @@ export interface Coordinator {
onSaved(fact: Object, source: any);
send(fact: Object, source: any);
onReceived(fact: Object, userFact: Object, source: any);
onDelivered(token: number, destination: any);
onDone(token: number);
onProgress(queueCount: number);
onError(err: string);
onLoggedIn(userFact: Object, profile: Object);
resendMessages();
}

export interface StorageProvider {
Expand All @@ -274,6 +277,7 @@ export interface StorageProvider {
);
sendAllFacts();
push(fact: Object);
dequeue(token: number, destination: any);
}

export interface KeystoreProvider {
Expand Down
48 changes: 43 additions & 5 deletions src/jinaga.distributor.client.ts
Expand Up @@ -4,19 +4,20 @@ import Interface = require("./interface");
import NetworkProvider = Interface.NetworkProvider;
import Query = Interface.Query;
import Coordinator = Interface.Coordinator;
import {computeHash} from "./interface";

class JinagaDistributor implements NetworkProvider {
socket: Socket;
coordinator: Coordinator;
isOpen: boolean = false;
pending: Array<string> = [];

private maxTimeout: number = 1 * 1000;

constructor(
endpoint: string
private endpoint: string
) {
this.socket = new Socket(endpoint);
this.socket.on("open", () => { this.onOpen(); });
this.socket.on("message", (message) => { this.onMessage(message); });
this.createSocket();
}

public init(coordinator: Coordinator) {
Expand All @@ -43,10 +44,17 @@ class JinagaDistributor implements NetworkProvider {
public fact(fact: Object) {
this.send(JSON.stringify({
type: "fact",
fact: fact
fact: fact,
token: computeHash(fact)
}));
}

private createSocket() {
this.socket = new Socket(this.endpoint);
this.socket.on("open", () => { this.onOpen(); });
this.socket.on("error", () => { this.onError(); });
}

private send(message: string) {
if (this.isOpen)
this.socket.send(message);
Expand All @@ -55,25 +63,55 @@ class JinagaDistributor implements NetworkProvider {
}

private onOpen() {
this.socket.on("message", (message) => { this.onMessage(message); });
this.socket.on("close", () => { this.onClose(); });

this.maxTimeout = 1 * 1000;

this.isOpen = true;
this.pending.forEach((message: string) => {
this.socket.send(message);
});
this.pending = [];
}

private onError() {
this.retry();
}

private onMessage(message) {
var messageObj = JSON.parse(message);
if (messageObj.type === "fact") {
this.coordinator.onReceived(messageObj.fact, null, this);
}
if (messageObj.type === "received") {
this.coordinator.onDelivered(messageObj.token, this);
}
if (messageObj.type === "loggedIn") {
this.coordinator.onLoggedIn(messageObj.userFact, messageObj.profile);
}
if (messageObj.type === "done") {
this.coordinator.onDone(messageObj.token);
}
}

private onClose() {
this.isOpen = false;
this.retry();
}

private retry() {
setTimeout(() => { this.resendMessages(); }, Math.random() * this.maxTimeout);
this.maxTimeout *= 2;
if (this.maxTimeout > 30 * 1000)
this.maxTimeout = 30 * 1000;
}

private resendMessages() {
this.createSocket();
if (this.pending.length === 0)
this.coordinator.resendMessages();
}
}

export = JinagaDistributor;
16 changes: 15 additions & 1 deletion src/jinaga.distributor.server.ts
Expand Up @@ -129,6 +129,10 @@ class JinagaConnection {

debug("[" + this.identicon + "] Received " + JSON.stringify(message.fact));
this.distributor.onReceived(message.fact, this.userFact, this);
this.socket.send(JSON.stringify({
type: "received",
token: message.token
}));
}

distribute(fact: Object) {
Expand Down Expand Up @@ -222,7 +226,11 @@ class JinagaDistributor implements Coordinator {
onReceived(fact: Object, userFact: Object, source: any) {
if (this.authorizeWrite(fact, userFact))
this.storage.save(fact, source);
}
}

onDelivered(token:number, destination:any) {
// TODO: Remove the fact from the queue of messages bound for this client.
}

onSaved(fact:Object, source:any) {
this.send(fact, source);
Expand All @@ -231,13 +239,19 @@ class JinagaDistributor implements Coordinator {
onDone(token:number) {
}

onProgress(queueCount:number) {
}

onError(err: string) {
debug(err);
}

onLoggedIn(userFact:Object) {
}

resendMessages() {
}

private authorizeWrite(fact, userFact): boolean {
if (fact.hasOwnProperty("from")) {
if (!_isEqual(userFact, fact["from"])) {
Expand Down
3 changes: 3 additions & 0 deletions src/jinaga.mongo.ts
Expand Up @@ -429,6 +429,9 @@ class MongoProvider implements Interface.StorageProvider, Interface.KeystoreProv
push(fact:Object) {
}

dequeue(token:number, destination:any) {
}

save(fact:Object, source:any) {
var hash = computeHash(fact);
this.withCollection("facts", (facts, done: () => void) => {
Expand Down
34 changes: 31 additions & 3 deletions src/jinaga.ts
Expand Up @@ -56,6 +56,8 @@ class Watch {
}

class JinagaCoordinator implements Coordinator {
private errorHandlers: Array<(message: string) => void> = [];
private progressHandlers: Array<(count: number) => void> = [];
private watches: Array<Watch> = [];
private messages: StorageProvider = null;
private network: NetworkProvider = null;
Expand All @@ -79,6 +81,14 @@ class JinagaCoordinator implements Coordinator {
this.messages.sendAllFacts();
}

addErrorHandler(callback: (message: string) => void) {
this.errorHandlers.push(callback);
}

addProgressHandler(callback: (count: number) => void) {
this.progressHandlers.push(callback);
}

fact(message: Object) {
this.messages.save(message, null);
}
Expand Down Expand Up @@ -162,8 +172,8 @@ class JinagaCoordinator implements Coordinator {
if (source === null) {
this.messages.push(fact);
}
this.watches.map((watch: Watch) => {
watch.inverses.map((inverse: Inverse) => {
this.watches.forEach((watch: Watch) => {
watch.inverses.forEach((inverse: Inverse) => {
this.messages.executeQuery(fact, inverse.affected, this.userFact, (error2: string, affected: Array<Object>) => {
if (!error2) {
if (_some(affected, (obj: Object) => _isEqual(obj, watch.start))) {
Expand Down Expand Up @@ -199,6 +209,10 @@ class JinagaCoordinator implements Coordinator {
this.messages.save(fact, source);
}

onDelivered(token:number, destination:any) {
this.messages.dequeue(token, destination);
}

onDone(token: number) {
var index: number = -1;
for(var i = 0; i < this.queries.length; i++) {
Expand All @@ -213,8 +227,12 @@ class JinagaCoordinator implements Coordinator {
}
}

onProgress(queueCount:number) {
this.progressHandlers.forEach(handler => handler(queueCount));
}

onError(err: string) {
debug(err);
this.errorHandlers.forEach(h => h(err));
}

send(fact: Object, source: any) {
Expand All @@ -231,6 +249,10 @@ class JinagaCoordinator implements Coordinator {
});
this.loginCallbacks = [];
}

resendMessages() {
this.messages.sendAllFacts();
}
}

class WatchProxy {
Expand All @@ -253,6 +275,12 @@ class Jinaga {
this.coordinator.save(new MemoryProvider());
}

public onError(handler: (message: string) => void) {
this.coordinator.addErrorHandler(handler);
}
public onProgress(handler: (queueCount: number) => void) {
this.coordinator.addProgressHandler(handler);
}
public save(storage: StorageProvider) {
this.coordinator.save(storage);
}
Expand Down
13 changes: 12 additions & 1 deletion src/memory.ts
Expand Up @@ -129,8 +129,19 @@ class MemoryProvider implements StorageProvider {

push(fact:Object) {
this.queue.push({ hash: computeHash(fact), fact: fact });
if (this.coordinator)
if (this.coordinator) {
this.coordinator.send(fact, null);
this.coordinator.onProgress(this.queue.length);
}
}

dequeue(token:number, destination:any) {
for (var position = this.queue.length - 1; position >= 0; position--) {
if (this.queue[position].hash === token)
this.queue.splice(position, 1);
}
if (this.coordinator)
this.coordinator.onProgress(this.queue.length);
}

private findNodeWithFact(array: Array<Node>, fact: Object) : Node {
Expand Down
2 changes: 1 addition & 1 deletion web/bower.json
@@ -1,6 +1,6 @@
{
"name": "jinaga",
"version": "1.2.2",
"version": "1.3.0",
"homepage": "https://github.com/michaellperry/jinaga",
"authors": [
"Michael L Perry <michael@qedcode.com>"
Expand Down
1 change: 1 addition & 0 deletions web/index.html
Expand Up @@ -6,6 +6,7 @@
<link rel="stylesheet" href="style.css"/>
</head>
<body>
<p><span id="error"></span>&nbsp;<span id="queue">0</span></p>
<h1>Meetings</h1>
<div id="container">
</div>
Expand Down
12 changes: 12 additions & 0 deletions web/main.js
@@ -1,5 +1,7 @@
var j = new Jinaga();
j.sync(new JinagaDistributor("ws://localhost:8888/"));
j.onProgress(updateProgress);
j.onError(updateError);

function paragraph(className, innerText) {
var p = document.createElement('p');
Expand Down Expand Up @@ -58,6 +60,16 @@ function meetingsInGroup(g) {
}, [futureMeeting]);
}

function updateProgress(queueCount) {
var span = document.getElementById('queue');
span.innerText = queueCount;
}

function updateError(message) {
var span = document.getElementById('error');
error.innerText = message;
}

var group = {
type: "UserGroup",
name: "Papers We Love"
Expand Down

0 comments on commit f27cac2

Please sign in to comment.