Skip to content

Commit

Permalink
Merge pull request #203 from BeaudanBrown/optimise-pinging
Browse files Browse the repository at this point in the history
Optimise pinging
  • Loading branch information
sachaaaaa committed Feb 21, 2019
2 parents 1c4d9b4 + 08dee14 commit 888d5f1
Show file tree
Hide file tree
Showing 8 changed files with 99 additions and 47 deletions.
4 changes: 2 additions & 2 deletions js/background.js
Expand Up @@ -223,8 +223,8 @@
textsecure.storage.user.getNumber()
);
window.lokiP2pAPI.on('pingContact', pubKey => {
const forceP2p = true;
window.libloki.api.sendOnlineBroadcastMessage(pubKey, forceP2p);
const isPing = true;
window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing);
});

// These make key operations available to IPC handlers created in preload.js
Expand Down
26 changes: 20 additions & 6 deletions js/modules/loki_message_api.js
Expand Up @@ -63,11 +63,11 @@ class LokiMessageAPI {
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
}

async sendMessage(pubKey, data, messageTimeStamp, ttl, forceP2p = false) {
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000);
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (p2pDetails && (forceP2p || p2pDetails.isOnline)) {
if (p2pDetails && (isPing || p2pDetails.isOnline)) {
try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
const url = `${p2pDetails.address}${port}/store`;
Expand All @@ -82,6 +82,10 @@ class LokiMessageAPI {
} catch (e) {
log.warn('Failed to send P2P message, falling back to storage', e);
lokiP2pAPI.setContactOffline(pubKey);
if (isPing) {
// If this was just a ping, we don't bother sending to storage server
return;
}
}
}

Expand Down Expand Up @@ -143,6 +147,7 @@ class LokiMessageAPI {
nodeComplete(nodeUrl);
successfulRequests += 1;
} catch (e) {
log.warn('Send message error:', e);
if (e instanceof NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
Expand All @@ -155,8 +160,12 @@ class LokiMessageAPI {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
log.error('Loki SendMessages:', e);
if (lokiSnodeAPI.unreachableNode(pubKey, nodeUrl)) {
const removeNode = await lokiSnodeAPI.unreachableNode(
pubKey,
nodeUrl
);
if (removeNode) {
log.error('Loki SendMessages:', e);
nodeComplete(nodeUrl);
failedNodes.push(nodeUrl);
}
Expand Down Expand Up @@ -242,6 +251,7 @@ class LokiMessageAPI {
}
successfulRequests += 1;
} catch (e) {
log.warn('Retrieve message error:', e);
if (e instanceof NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
Expand All @@ -254,8 +264,12 @@ class LokiMessageAPI {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
log.error('Loki RetrieveMessages:', e);
if (lokiSnodeAPI.unreachableNode(ourKey, nodeUrl)) {
const removeNode = await lokiSnodeAPI.unreachableNode(
ourKey,
nodeUrl
);
if (removeNode) {
log.error('Loki RetrieveMessages:', e);
nodeComplete(nodeUrl);
}
}
Expand Down
63 changes: 52 additions & 11 deletions js/modules/loki_p2p_api.js
Expand Up @@ -2,6 +2,8 @@

const EventEmitter = require('events');

const offlinePingTime = 2 * 60 * 1000; // 2 minutes

class LokiP2pAPI extends EventEmitter {
constructor(ourKey) {
super();
Expand All @@ -16,43 +18,81 @@ class LokiP2pAPI extends EventEmitter {
});
}

updateContactP2pDetails(pubKey, address, port, isOnline = false) {
updateContactP2pDetails(pubKey, address, port, isPing = false) {
// Stagger the timers so the friends don't ping each other at the same time
const timerDuration =
pubKey < this.ourKey
? 60 * 1000 // 1 minute
: 2 * 60 * 1000; // 2 minutes

if (this.contactP2pDetails[pubKey]) {
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
if (!this.contactP2pDetails[pubKey]) {
// We didn't have this contact's details
this.contactP2pDetails[pubKey] = {
address,
port,
timerDuration,
pingTimer: null,
isOnline: false,
};
if (isPing) {
this.setContactOnline(pubKey);
return;
}
// Try ping
this.pingContact(pubKey);
return;
}

this.contactP2pDetails[pubKey] = {
address,
port,
timerDuration,
isOnline: false,
pingTimer: null,
};
// We already had this contact's details
const baseDetails = { ...this.contactP2pDetails[pubKey] };

if (isOnline) {
if (isPing) {
// Received a ping
// Update details in case they are new and mark online
this.contactP2pDetails[pubKey].address = address;
this.contactP2pDetails[pubKey].port = port;
this.setContactOnline(pubKey);
return;
}

// Received a storage broadcast message
if (
baseDetails.isOnline ||
baseDetails.address !== address ||
baseDetails.port !== port
) {
// Had the contact marked as online and details we had were the same
this.pingContact(pubKey);
return;
}

// Had the contact marked as offline or got new details
this.contactP2pDetails[pubKey].address = address;
this.contactP2pDetails[pubKey].port = port;
this.setContactOffline(pubKey);
this.pingContact(pubKey);
}

getContactP2pDetails(pubKey) {
return this.contactP2pDetails[pubKey] || null;
}

isContactOnline(pubKey) {
const contactDetails = this.contactP2pDetails[pubKey];
return !!(contactDetails && contactDetails.isOnline);
}

setContactOffline(pubKey) {
this.emit('offline', pubKey);
if (!this.contactP2pDetails[pubKey]) {
return;
}
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
this.contactP2pDetails[pubKey].pingTimer = setTimeout(
this.pingContact.bind(this),
offlinePingTime,
pubKey
);
this.contactP2pDetails[pubKey].isOnline = false;
}

Expand All @@ -78,6 +118,7 @@ class LokiP2pAPI extends EventEmitter {

pingContact(pubKey) {
if (!this.contactP2pDetails[pubKey]) {
// Don't ping if we don't have their details
return;
}
this.emit('pingContact', pubKey);
Expand Down
23 changes: 6 additions & 17 deletions js/modules/loki_snode_api.js
Expand Up @@ -68,10 +68,11 @@ class LokiSnodeAPI {
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount >= FAILURE_THRESHOLD) {
delete this.ourSwarmNodes[nodeUrl];
if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
}
return false;
delete this.ourSwarmNodes[nodeUrl];
return true;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
Expand All @@ -85,7 +86,7 @@ class LokiSnodeAPI {
}
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
if (nodeUrl in swarmNodes) {
if (swarmNodes.includes(nodeUrl)) {
const filteredNodes = swarmNodes.filter(node => node !== nodeUrl);
await conversation.updateSwarmNodes(filteredNodes);
delete this.contactSwarmNodes[nodeUrl];
Expand Down Expand Up @@ -161,7 +162,7 @@ class LokiSnodeAPI {
newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) {
// TODO: Handle these errors sensibly
log.error('Failed to get new swarm nodes');
log.error(e);
newSwarmNodes = [];
}
resolve(newSwarmNodes);
Expand Down Expand Up @@ -205,12 +206,6 @@ class LokiSnodeAPI {
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(
options.type,
options.url,
0,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('getSwarmNodes fetch error', 0, e.toString());
}

Expand All @@ -229,12 +224,6 @@ class LokiSnodeAPI {
if (response.status >= 0 && response.status < 400) {
return result.nodes;
}
log.error(
options.type,
options.url,
response.status,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('getSwarmNodes: error response', response.status, result);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libloki/api.js
Expand Up @@ -23,7 +23,7 @@
);
}

async function sendOnlineBroadcastMessage(pubKey, forceP2p = false) {
async function sendOnlineBroadcastMessage(pubKey, isPing = false) {
const myLokiAddress = await window.lokiSnodeAPI.getMyLokiAddress();
const lokiAddressMessage = new textsecure.protobuf.LokiAddressMessage({
p2pAddress: `http://${myLokiAddress}`,
Expand All @@ -41,7 +41,7 @@
log.info('Online broadcast message sent successfully');
}
};
const options = { messageType: 'onlineBroadcast', forceP2p };
const options = { messageType: 'onlineBroadcast', isPing };
// Send a empty message with information about how to contact us directly
const outgoingMessage = new textsecure.OutgoingMessage(
null, // server
Expand Down
10 changes: 5 additions & 5 deletions libloki/test/node/loki_p2p_api_test.js
@@ -1,7 +1,7 @@
const { assert } = require('chai');
const LokiP2pAPI = require('../../../js/modules/loki_p2p_api');

describe('LocalLokiServer', () => {
describe('LokiP2pAPI', () => {
const usedKey = 'aPubKey';
const usedAddress = 'anAddress';
const usedPort = 'aPort';
Expand Down Expand Up @@ -64,16 +64,16 @@ describe('LocalLokiServer', () => {
usedKey,
usedAddress,
usedPort,
true
false
);
assert.isTrue(this.lokiP2pAPI.isOnline(usedKey));
assert.isFalse(this.lokiP2pAPI.isOnline(usedKey));
this.lokiP2pAPI.updateContactP2pDetails(
usedKey,
usedAddress,
usedPort,
false
true
);
assert.isFalse(this.lokiP2pAPI.isOnline(usedKey));
assert.isTrue(this.lokiP2pAPI.isOnline(usedKey));
});

it('Should set a contact as offline', () => {
Expand Down
8 changes: 8 additions & 0 deletions libtextsecure/message_receiver.js
Expand Up @@ -948,6 +948,14 @@ MessageReceiver.prototype.extend({
return this.removeFromCache(envelope);
},
handleDataMessage(envelope, msg) {
if (!envelope.isP2p) {
const timestamp = envelope.timestamp.toNumber();
const now = Date.now();
const ageInSeconds = (now - timestamp) / 1000;
if (ageInSeconds <= 120) {
lokiP2pAPI.pingContact(envelope.source);
}
}
window.log.info('data message from', this.getEnvelopeId(envelope));
let p = Promise.resolve();
// eslint-disable-next-line no-bitwise
Expand Down
8 changes: 4 additions & 4 deletions libtextsecure/outgoing_message.js
Expand Up @@ -42,13 +42,13 @@ function OutgoingMessage(
this.failoverNumbers = [];
this.unidentifiedDeliveries = [];

const { numberInfo, senderCertificate, online, messageType, forceP2p } =
const { numberInfo, senderCertificate, online, messageType, isPing } =
options || {};
this.numberInfo = numberInfo;
this.senderCertificate = senderCertificate;
this.online = online;
this.messageType = messageType || 'outgoing';
this.forceP2p = forceP2p || false;
this.isPing = isPing || false;
}

OutgoingMessage.prototype = {
Expand Down Expand Up @@ -192,7 +192,7 @@ OutgoingMessage.prototype = {
data,
timestamp,
ttl,
this.forceP2p
this.isPing
);
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
Expand Down Expand Up @@ -347,7 +347,7 @@ OutgoingMessage.prototype = {
if (this.messageType === 'friend-request') {
ttl = 4 * 24 * 60 * 60; // 4 days for friend request message
} else if (this.messageType === 'onlineBroadcast') {
ttl = 10 * 60; // 10 minutes for online broadcast message
ttl = 60; // 1 minute for online broadcast message
} else {
const hours = window.getMessageTTL() || 24; // 1 day default for any other message
ttl = hours * 60 * 60;
Expand Down

0 comments on commit 888d5f1

Please sign in to comment.