Skip to content

Commit

Permalink
LocalTokenizer: handle connection failures
Browse files Browse the repository at this point in the history
If the tokenizer disconnects cleanly (due to e.g. updates or pod
rescheduling) or with an error (crash), reconnect immediately
and reissue the request.

Complete with exhaustive tests.
  • Loading branch information
gcampax committed Aug 31, 2019
1 parent 397fe8b commit f7f1460
Show file tree
Hide file tree
Showing 4 changed files with 297 additions and 23 deletions.
6 changes: 4 additions & 2 deletions lib/json_datagram_socket.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@ module.exports = class JsonDatagramSocket extends events.EventEmitter {
}

destroy() {
this._reader.destroy();
this._writer.destroy();
if (this._reader !== null)
this._reader.destroy();
if (this._writer !== null)
this._writer.destroy();
this._reader = null;
this._writer = null;
}
Expand Down
92 changes: 71 additions & 21 deletions lib/tokenizer.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,42 +65,92 @@ function cleanTokens(tokens) {

class LocalTokenizer {
constructor(address = '127.0.0.1:8888') {
this._requests = new Map();
this._nextRequest = 0;

this._address = sockaddr(address);
this._socket = null;
this._ended = false;
this._reconnect();
}

_onMessage(msg) {
let req = this._requests.get(msg.req);
if (!req)
return;
this._requests.delete(msg.req);

if (msg.error) {
req.reject(new Error(msg.error));
} else {
req.resolve({
tokens: cleanTokens(msg.tokens),
entities: msg.values,
raw_tokens: msg.rawTokens,
pos_tags: msg.pos,
sentiment: msg.sentiment
});
}
}

_reconnect() {
const socket = new net.Socket();
socket.connect(sockaddr(address));
socket.connect(this._address);
this._socket = new JsonDatagramSocket(socket, socket, 'utf8');
this._socket.on('data', (msg) => {
let req = this._requests.get(msg.req);
if (!req)
return;
this._requests.delete(msg.req);

if (msg.error) {
req.reject(new Error(msg.error));
} else {
req.resolve({
tokens: cleanTokens(msg.tokens),
entities: msg.values,
raw_tokens: msg.rawTokens,
pos_tags: msg.pos,
sentiment: msg.sentiment
});
this._socket.on('data', this._onMessage.bind(this));
this._socket.on('error', (e) => {
console.error(`Error communicating with tokenizer: ${e.message}`);
this._socket.destroy();
this._socket = null;

// tokenizer failures are always transient, because the tokenizer is
// stateless and should be deployed redundantly and with rolling updates
// hence, try reconnecting immediately if there are requests in flight
if (!this._ended && this._requests.size > 0) {
this._reconnect();
this._retryAllRequests();
}
});
this._socket.on('end', () => {
console.error(`Connection to tokenizer closed`);
this._socket = null;

this._requests = new Map();
this._nextRequest = 0;
if (!this._ended && this._requests.size > 0) {
this._reconnect();
this._retryAllRequests();
}
});

}

_retryAllRequests() {
for (let req of this._requests.values()) {
req.attempts++;
if (req.attempts >= 3) {
req.reject(new Error(`Too many failures in communicating with the tokenizer`));
this._requests.delete(req.id);
continue;
}

this._socket.write(req.msg);
}
}

end() {
this._ended = true;
this._socket.end();
}

tokenize(locale, utterance, expect = null) {
const reqId = this._nextRequest++;
return new Promise((resolve, reject) => {
this._requests.set(reqId, { resolve, reject });
const msg = { req: reqId, utterance, languageTag: locale, expect };
const req = { id: reqId, msg, resolve, reject, attempts: 0 };

this._socket.write({ req: reqId, utterance, languageTag: locale, expect });
this._requests.set(reqId, req);
if (this._socket === null)
this._reconnect();
this._socket.write(msg);
});
}
}
Expand Down
1 change: 1 addition & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ seq([
('./test_augment'),
('./test_i18n_chinese'),
('./test_random'),
('./test_tokenizer')
]);
221 changes: 221 additions & 0 deletions test/test_tokenizer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
// -*- mode: js; indent-tabs-mode: nil; js-basic-offset: 4 -*-
//
// This file is part of Genie
//
// Copyright 2019 The Board of Trustees of the Leland Stanford Junior University
//
// Author: Giovanni Campagna <gcampagn@cs.stanford.edu>
//
// See COPYING for details
"use strict";

// we run a "mock" version of almond-tokenizer in process, to test that things go
// back and forth reasonably, and the client can handle network errors

const net = require('net');
const assert = require('assert');
const JsonDatagramSocket = require('../lib/json_datagram_socket');

const { LocalTokenizer } = require('../lib/tokenizer');

function delay(ms) {
return new Promise((resolve, reject) => {
setTimeout(resolve, ms);
});
}

function tokenize(string) {
var tokens = string.split(/(\s+|[,."'!?])/g);
return tokens.filter((t) => !(/^\s*$/).test(t)).map((t) => t.toLowerCase());
}

function mockTokenize(string) {
const entities = {};
let num = 0;

const rawTokens = tokenize(string);
const tokens = rawTokens.map((token) => {
if (/^[0-9]+$/.test(token) && token !== '1' && token !== '0' && token !== '911') {
const tok = `NUMBER_${num}`;
num++;
entities[tok] = parseInt(token);
return tok + '*1';
} else {
return token;
}
});
return [rawTokens, tokens, entities];
}

let cnt = 0;
function handleConnection(socket) {
const wrapped = new JsonDatagramSocket(socket, socket, 'utf8');

wrapped.on('data', (msg) => {
try {
assert.strictEqual(msg.languageTag, 'en');
if (msg.utterance === '$destroy') {
socket.destroy();
return;
}
if (msg.utterance === '$destroyonce' && cnt === 0) {
cnt++;
socket.destroy();
return;
}
if (msg.utterance === '$error')
throw new Error('nope');

const [rawTokens, tokens, values] = mockTokenize(msg.utterance);
wrapped.write({
req: msg.req,
tokens,
values,
rawTokens,
pos: rawTokens.map(() => 'NN'),
sentiment: 'neutral'
});

if (msg.utterance === '$end') {
socket.end();
return;
}
} catch(e) {
if (e.message !== 'nope')
console.error(e);
wrapped.write({
req: msg.req,
error: e.message
});
}
});
}

function startServer() {
const server = net.createServer();
server.on('connection', handleConnection);
server.listen(8888);
return server;
}

async function testSimple() {
const tok = new LocalTokenizer();

assert.deepStrictEqual(await tok.tokenize('en', 'get a cat picture'), {
entities: {},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', 'a', 'cat', 'picture'],
sentiment: 'neutral',
tokens: ['get', 'a', 'cat', 'picture']
});

assert.deepStrictEqual(await tok.tokenize('en', 'get 3 cat pictures'), {
entities: {
'NUMBER_0': 3
},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', '3', 'cat', 'pictures'],
sentiment: 'neutral',
tokens: ['get', 'NUMBER_0', 'cat', 'pictures']
});

await tok.end();
}


async function testParallel() {
const tok = new LocalTokenizer();

const r1 = tok.tokenize('en', 'get a cat picture');
const r2 = tok.tokenize('en', 'get 3 cat pictures');

assert.deepStrictEqual(await r1, {
entities: {},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', 'a', 'cat', 'picture'],
sentiment: 'neutral',
tokens: ['get', 'a', 'cat', 'picture']
});

assert.deepStrictEqual(await r2, {
entities: {
'NUMBER_0': 3
},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', '3', 'cat', 'pictures'],
sentiment: 'neutral',
tokens: ['get', 'NUMBER_0', 'cat', 'pictures']
});

await tok.end();
}

async function testInflight() {
const tok = new LocalTokenizer();
tok.tokenize('en', 'get a cat picture');

// end before the request comes
tok.end();
}

async function testErrors() {
const tok = new LocalTokenizer();
await assert.rejects(() => tok.tokenize('en', '$error'), new Error('nope'));

await assert.rejects(() => tok.tokenize('en', '$destroy'), new Error('Too many failures in communicating with the tokenizer'));

// the same tokenizer is still ok for a different sentence
assert.deepStrictEqual(await tok.tokenize('en', 'get a cat picture'), {
entities: {},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', 'a', 'cat', 'picture'],
sentiment: 'neutral',
tokens: ['get', 'a', 'cat', 'picture']
});

// handle transient failures
assert.deepStrictEqual(await tok.tokenize('en', '$destroyonce'), {
entities: {},
pos_tags: ['NN'],
raw_tokens: ['$destroyonce'],
sentiment: 'neutral',
tokens: ['$destroyonce']
});

// handle different kind of transient failures
assert.deepStrictEqual(await tok.tokenize('en', '$end'), {
entities: {},
pos_tags: ['NN'],
raw_tokens: ['$end'],
sentiment: 'neutral',
tokens: ['$end']
});

// wait until the connection actually closed
await delay(1000);

assert.deepStrictEqual(await tok.tokenize('en', 'get a cat picture'), {
entities: {},
pos_tags: ['NN','NN','NN','NN'],
raw_tokens: ['get', 'a', 'cat', 'picture'],
sentiment: 'neutral',
tokens: ['get', 'a', 'cat', 'picture']
});

await tok.end();
}

async function main() {
const server = startServer();
try {
await testSimple();
await testParallel();
await testInflight();
await testErrors();
} finally {
server.close();
}
}
module.exports = main;
if (!module.parent)
main();

0 comments on commit f7f1460

Please sign in to comment.