Skip to content
Permalink
Browse files

Merge branch 'stream-management-caching'

  • Loading branch information...
legastero committed Apr 25, 2019
2 parents 65c201e + e4a0e4f commit b4ca56b4841edf8954646998ab2569ab05bbc660
Showing with 65 additions and 8 deletions.
  1. +8 −0 demo.html
  2. +1 −1 src/client.js
  3. +56 −7 src/sm.js
@@ -147,6 +147,14 @@ <h1>Connection Settings</h1>
transports: transports
});
const cachedSM = sessionStorage.cachedSM;
if (cachedSM) {
client.sm.load(JSON.parse(cachedSM));
}
client.sm.cache(state => {
sessionStorage.cachedSM = JSON.stringify(state);
});
client.on('*', log);
client.on('session:started', function() {
@@ -328,7 +328,7 @@ export default class Client extends WildEmitter {
}

send(data) {
this.sm.track(data);
this.sm.track(data._name, data);
if (this.transport) {
this.transport.send(data);
}
@@ -17,9 +17,16 @@ export default class StreamManagement {
this.unacked = [];
this.pendingAck = false;

this.cacheHandler = () => {
return;
};

this.stanzas = {
Ack: client.stanzas.getDefinition('a', NS.SMACKS_3),
Enable: client.stanzas.getDefinition('enable', NS.SMACKS_3),
IQ: client.stanzas.getIQ(),
Message: client.stanzas.getMessage(),
Presence: client.stanzas.getPresence(),
Request: client.stanzas.getDefinition('r', NS.SMACKS_3),
Resume: client.stanzas.getDefinition('resume', NS.SMACKS_3)
};
@@ -36,6 +43,18 @@ export default class StreamManagement {
}
}

load(opts) {
this.id = opts.id;
this.allowResume = true;
this.handled = opts.handled;
this.lastAck = opts.lastAck;
this.unacked = opts.unacked;
}

cache(handler) {
this.cacheHandler = handler;
}

enable() {
const enable = new this.stanzas.Enable();
enable.resume = this.allowResume;
@@ -57,6 +76,8 @@ export default class StreamManagement {
this.id = resp.id;
this.handled = 0;
this.inboundStarted = true;

this._cache();
}

resumed(resp) {
@@ -65,6 +86,8 @@ export default class StreamManagement {
this.process(resp, true);
}
this.inboundStarted = true;

this._cache();
}

failed() {
@@ -74,6 +97,8 @@ export default class StreamManagement {
this.lastAck = 0;
this.handled = 0;
this.unacked = [];

this._cache();
}

ack() {
@@ -96,33 +121,47 @@ export default class StreamManagement {
this.pendingAck = false;

for (let i = 0; i < numAcked && this.unacked.length > 0; i++) {
this.client.emit('stanza:acked', this.unacked.shift());
const [kind, stanza] = this.unacked.shift();
this.client.emit('stanza:acked', stanza, kind);
}
this.lastAck = ack.h;

if (resend) {
const resendUnacked = this.unacked;
this.unacked = [];
for (const stanza of resendUnacked) {
self.client.send(stanza);
for (const [kind, stanza] of resendUnacked) {
let rebuilt;
if (kind === 'message') {
rebuilt = new this.stanzas.Message(stanza);
}
if (kind === 'presence') {
rebuilt = new this.stanzas.Presence(stanza);
}
if (kind === 'iq') {
rebuilt = new this.stanzas.IQ(stanza);
}
self.client.send(rebuilt);
}
}

this._cache();

if (this.needAck()) {
this.request();
}
}

track(stanza) {
const name = stanza._name;
track(kind, stanza) {
const acceptable = {
iq: true,
message: true,
presence: true
};

if (this.outboundStarted && acceptable[name]) {
this.unacked.push(stanza);
if (this.outboundStarted && acceptable[kind]) {
this.unacked.push([kind, stanza.toJSON()]);
this._cache();

if (this.needAck()) {
this.request();
}
@@ -132,10 +171,20 @@ export default class StreamManagement {
handle() {
if (this.inboundStarted) {
this.handled = mod(this.handled + 1, MAX_SEQ);
this._cache();
}
}

needAck() {
return !this.pendingAck && this.unacked.length >= this.windowSize;
}

_cache() {
this.cacheHandler({
handled: this.handled,
id: this.id,
lastAck: this.lastAck,
unacked: this.unacked
});
}
}

0 comments on commit b4ca56b

Please sign in to comment.
You can’t perform that action at this time.