Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimise pinging #203

Merged
merged 7 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions js/background.js
Expand Up @@ -223,8 +223,8 @@
textsecure.storage.user.getNumber()
);
window.lokiP2pAPI.on('pingContact', pubKey => {
const forceP2p = true;
window.libloki.api.sendOnlineBroadcastMessage(pubKey, forceP2p);
const isPing = true;
window.libloki.api.sendOnlineBroadcastMessage(pubKey, isPing);
});

// These make key operations available to IPC handlers created in preload.js
Expand Down
26 changes: 20 additions & 6 deletions js/modules/loki_message_api.js
Expand Up @@ -63,11 +63,11 @@ class LokiMessageAPI {
this.messageServerPort = messageServerPort ? `:${messageServerPort}` : '';
}

async sendMessage(pubKey, data, messageTimeStamp, ttl, forceP2p = false) {
async sendMessage(pubKey, data, messageTimeStamp, ttl, isPing = false) {
const data64 = dcodeIO.ByteBuffer.wrap(data).toString('base64');
const timestamp = Math.floor(Date.now() / 1000);
const p2pDetails = lokiP2pAPI.getContactP2pDetails(pubKey);
if (p2pDetails && (forceP2p || p2pDetails.isOnline)) {
if (p2pDetails && (isPing || p2pDetails.isOnline)) {
try {
const port = p2pDetails.port ? `:${p2pDetails.port}` : '';
const url = `${p2pDetails.address}${port}/store`;
Expand All @@ -82,6 +82,10 @@ class LokiMessageAPI {
} catch (e) {
log.warn('Failed to send P2P message, falling back to storage', e);
lokiP2pAPI.setContactOffline(pubKey);
if (isPing) {
// If this was just a ping, we don't bother sending to storage server
return;
}
}
}

Expand Down Expand Up @@ -143,6 +147,7 @@ class LokiMessageAPI {
nodeComplete(nodeUrl);
successfulRequests += 1;
} catch (e) {
log.warn('Send message error:', e);
if (e instanceof NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
Expand All @@ -155,8 +160,12 @@ class LokiMessageAPI {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
log.error('Loki SendMessages:', e);
if (lokiSnodeAPI.unreachableNode(pubKey, nodeUrl)) {
const removeNode = await lokiSnodeAPI.unreachableNode(
pubKey,
nodeUrl
);
if (removeNode) {
log.error('Loki SendMessages:', e);
nodeComplete(nodeUrl);
failedNodes.push(nodeUrl);
}
Expand Down Expand Up @@ -242,6 +251,7 @@ class LokiMessageAPI {
}
successfulRequests += 1;
} catch (e) {
log.warn('Retrieve message error:', e);
if (e instanceof NotFoundError) {
canResolve = false;
} else if (e instanceof HTTPError) {
Expand All @@ -254,8 +264,12 @@ class LokiMessageAPI {
// We mark the node as complete as we could still reach it
nodeComplete(nodeUrl);
} else {
log.error('Loki RetrieveMessages:', e);
if (lokiSnodeAPI.unreachableNode(ourKey, nodeUrl)) {
const removeNode = await lokiSnodeAPI.unreachableNode(
ourKey,
nodeUrl
);
if (removeNode) {
log.error('Loki RetrieveMessages:', e);
nodeComplete(nodeUrl);
}
}
Expand Down
63 changes: 52 additions & 11 deletions js/modules/loki_p2p_api.js
Expand Up @@ -2,6 +2,8 @@

const EventEmitter = require('events');

const offlinePingTime = 2 * 60 * 1000; // 2 minutes

class LokiP2pAPI extends EventEmitter {
constructor(ourKey) {
super();
Expand All @@ -16,43 +18,81 @@ class LokiP2pAPI extends EventEmitter {
});
}

updateContactP2pDetails(pubKey, address, port, isOnline = false) {
updateContactP2pDetails(pubKey, address, port, isPing = false) {
// Stagger the timers so the friends don't ping each other at the same time
const timerDuration =
pubKey < this.ourKey
? 60 * 1000 // 1 minute
: 2 * 60 * 1000; // 2 minutes

if (this.contactP2pDetails[pubKey]) {
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
if (!this.contactP2pDetails[pubKey]) {
// We didn't have this contact's details
this.contactP2pDetails[pubKey] = {
address,
port,
timerDuration,
pingTimer: null,
isOnline: false,
};
if (isPing) {
this.setContactOnline(pubKey);
return;
}
// Try ping
this.pingContact(pubKey);
return;
}

this.contactP2pDetails[pubKey] = {
address,
port,
timerDuration,
isOnline: false,
pingTimer: null,
};
// We already had this contact's details
const baseDetails = { ...this.contactP2pDetails[pubKey] };

if (isOnline) {
if (isPing) {
// Received a ping
// Update details in case they are new and mark online
this.contactP2pDetails[pubKey].address = address;
this.contactP2pDetails[pubKey].port = port;
this.setContactOnline(pubKey);
return;
}

// Received a storage broadcast message
if (
baseDetails.isOnline ||
baseDetails.address !== address ||
baseDetails.port !== port
) {
// Had the contact marked as online and details we had were the same
this.pingContact(pubKey);
return;
}

// Had the contact marked as offline or got new details
this.contactP2pDetails[pubKey].address = address;
this.contactP2pDetails[pubKey].port = port;
this.setContactOffline(pubKey);
this.pingContact(pubKey);
}

getContactP2pDetails(pubKey) {
return this.contactP2pDetails[pubKey] || null;
}

isContactOnline(pubKey) {
const contactDetails = this.contactP2pDetails[pubKey];
return !!(contactDetails && contactDetails.isOnline);
}

setContactOffline(pubKey) {
this.emit('offline', pubKey);
if (!this.contactP2pDetails[pubKey]) {
return;
}
clearTimeout(this.contactP2pDetails[pubKey].pingTimer);
this.contactP2pDetails[pubKey].pingTimer = setTimeout(
this.pingContact.bind(this),
offlinePingTime,
pubKey
);
this.contactP2pDetails[pubKey].isOnline = false;
}

Expand All @@ -78,6 +118,7 @@ class LokiP2pAPI extends EventEmitter {

pingContact(pubKey) {
if (!this.contactP2pDetails[pubKey]) {
// Don't ping if we don't have their details
return;
}
this.emit('pingContact', pubKey);
Expand Down
23 changes: 6 additions & 17 deletions js/modules/loki_snode_api.js
Expand Up @@ -68,10 +68,11 @@ class LokiSnodeAPI {
} else {
this.ourSwarmNodes[nodeUrl].failureCount += 1;
}
if (this.ourSwarmNodes[nodeUrl].failureCount >= FAILURE_THRESHOLD) {
delete this.ourSwarmNodes[nodeUrl];
if (this.ourSwarmNodes[nodeUrl].failureCount < FAILURE_THRESHOLD) {
return false;
}
return false;
delete this.ourSwarmNodes[nodeUrl];
return true;
}
if (!this.contactSwarmNodes[nodeUrl]) {
this.contactSwarmNodes[nodeUrl] = {
Expand All @@ -85,7 +86,7 @@ class LokiSnodeAPI {
}
const conversation = ConversationController.get(pubKey);
const swarmNodes = [...conversation.get('swarmNodes')];
if (nodeUrl in swarmNodes) {
if (swarmNodes.includes(nodeUrl)) {
const filteredNodes = swarmNodes.filter(node => node !== nodeUrl);
await conversation.updateSwarmNodes(filteredNodes);
delete this.contactSwarmNodes[nodeUrl];
Expand Down Expand Up @@ -161,7 +162,7 @@ class LokiSnodeAPI {
newSwarmNodes = await this.getSwarmNodes(pubKey);
} catch (e) {
// TODO: Handle these errors sensibly
log.error('Failed to get new swarm nodes');
log.error(e);
newSwarmNodes = [];
}
resolve(newSwarmNodes);
Expand Down Expand Up @@ -205,12 +206,6 @@ class LokiSnodeAPI {
try {
response = await fetch(options.url, fetchOptions);
} catch (e) {
log.error(
options.type,
options.url,
0,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('getSwarmNodes fetch error', 0, e.toString());
}

Expand All @@ -229,12 +224,6 @@ class LokiSnodeAPI {
if (response.status >= 0 && response.status < 400) {
return result.nodes;
}
log.error(
options.type,
options.url,
response.status,
`Error getting swarm nodes for ${pubKey}`
);
throw HTTPError('getSwarmNodes: error response', response.status, result);
}
}
Expand Down
4 changes: 2 additions & 2 deletions libloki/api.js
Expand Up @@ -23,7 +23,7 @@
);
}

async function sendOnlineBroadcastMessage(pubKey, forceP2p = false) {
async function sendOnlineBroadcastMessage(pubKey, isPing = false) {
const myLokiAddress = await window.lokiSnodeAPI.getMyLokiAddress();
const lokiAddressMessage = new textsecure.protobuf.LokiAddressMessage({
p2pAddress: `http://${myLokiAddress}`,
Expand All @@ -41,7 +41,7 @@
log.info('Online broadcast message sent successfully');
}
};
const options = { messageType: 'onlineBroadcast', forceP2p };
const options = { messageType: 'onlineBroadcast', isPing };
// Send a empty message with information about how to contact us directly
const outgoingMessage = new textsecure.OutgoingMessage(
null, // server
Expand Down
10 changes: 5 additions & 5 deletions libloki/test/node/loki_p2p_api_test.js
@@ -1,7 +1,7 @@
const { assert } = require('chai');
const LokiP2pAPI = require('../../../js/modules/loki_p2p_api');

describe('LocalLokiServer', () => {
describe('LokiP2pAPI', () => {
const usedKey = 'aPubKey';
const usedAddress = 'anAddress';
const usedPort = 'aPort';
Expand Down Expand Up @@ -64,16 +64,16 @@ describe('LocalLokiServer', () => {
usedKey,
usedAddress,
usedPort,
true
false
);
assert.isTrue(this.lokiP2pAPI.isOnline(usedKey));
assert.isFalse(this.lokiP2pAPI.isOnline(usedKey));
this.lokiP2pAPI.updateContactP2pDetails(
usedKey,
usedAddress,
usedPort,
false
true
);
assert.isFalse(this.lokiP2pAPI.isOnline(usedKey));
assert.isTrue(this.lokiP2pAPI.isOnline(usedKey));
});

it('Should set a contact as offline', () => {
Expand Down
8 changes: 8 additions & 0 deletions libtextsecure/message_receiver.js
Expand Up @@ -948,6 +948,14 @@ MessageReceiver.prototype.extend({
return this.removeFromCache(envelope);
},
handleDataMessage(envelope, msg) {
if (!envelope.isP2p) {
const timestamp = envelope.timestamp.toNumber();
const now = Date.now();
const ageInSeconds = (now - timestamp) / 1000;
if (ageInSeconds <= 120) {
lokiP2pAPI.pingContact(envelope.source);
}
}
window.log.info('data message from', this.getEnvelopeId(envelope));
let p = Promise.resolve();
// eslint-disable-next-line no-bitwise
Expand Down
8 changes: 4 additions & 4 deletions libtextsecure/outgoing_message.js
Expand Up @@ -42,13 +42,13 @@ function OutgoingMessage(
this.failoverNumbers = [];
this.unidentifiedDeliveries = [];

const { numberInfo, senderCertificate, online, messageType, forceP2p } =
const { numberInfo, senderCertificate, online, messageType, isPing } =
options || {};
this.numberInfo = numberInfo;
this.senderCertificate = senderCertificate;
this.online = online;
this.messageType = messageType || 'outgoing';
this.forceP2p = forceP2p || false;
this.isPing = isPing || false;
}

OutgoingMessage.prototype = {
Expand Down Expand Up @@ -192,7 +192,7 @@ OutgoingMessage.prototype = {
data,
timestamp,
ttl,
this.forceP2p
this.isPing
);
} catch (e) {
if (e.name === 'HTTPError' && (e.code !== 409 && e.code !== 410)) {
Expand Down Expand Up @@ -347,7 +347,7 @@ OutgoingMessage.prototype = {
if (this.messageType === 'friend-request') {
ttl = 4 * 24 * 60 * 60; // 4 days for friend request message
} else if (this.messageType === 'onlineBroadcast') {
ttl = 10 * 60; // 10 minutes for online broadcast message
ttl = 60; // 1 minute for online broadcast message
} else {
const hours = window.getMessageTTL() || 24; // 1 day default for any other message
ttl = hours * 60 * 60;
Expand Down