Skip to content

Commit

Permalink
Merge pull request #97 from stanford-oval/wip/parallel-augment
Browse files Browse the repository at this point in the history
Parallelize dataset augmentation as well
  • Loading branch information
gcampax committed Oct 10, 2019
2 parents 0355aa5 + 5ba81fb commit 66fa24c
Show file tree
Hide file tree
Showing 6 changed files with 612 additions and 571 deletions.
25 changes: 18 additions & 7 deletions lib/binary_ppdb.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,13 @@
const assert = require('assert');
const fs = require('fs');
const util = require('util');
const mmap = require('mmap-io');
let mmap;
try {
mmap = require('mmap-io');
} catch(e) {
console.error('mmap-io not available, falling back to normal read for PPDB');
mmap = null;
}

// Compact, binary representation of a PPDB database, suitable for mmap'ing
//
Expand Down Expand Up @@ -190,12 +196,17 @@ module.exports = class BinaryPPDB {
}

static async mapFile(filename) {
const fd = await util.promisify(fs.open)(filename, 'r');
const stats = await util.promisify(fs.fstat)(fd);

const buffer = mmap.map(Math.ceil(stats.size / mmap.PAGESIZE) * mmap.PAGESIZE,
mmap.PROT_READ, mmap.MAP_SHARED | mmap.MAP_POPULATE, fd, 0, mmap.MADV_RANDOM);
return new BinaryPPDB(buffer);
if (mmap) {
const fd = await util.promisify(fs.open)(filename, 'r');
const stats = await util.promisify(fs.fstat)(fd);

const buffer = mmap.map(Math.ceil(stats.size / mmap.PAGESIZE) * mmap.PAGESIZE,
mmap.PROT_READ, mmap.MAP_SHARED | mmap.MAP_POPULATE, fd, 0, mmap.MADV_RANDOM);
return new BinaryPPDB(buffer);
} else {
const buffer = await util.promisify(fs.readFile)(filename);
return new BinaryPPDB(buffer);
}
}
};
module.exports.Builder = BinaryPPDBBuilder;
Expand Down
62 changes: 32 additions & 30 deletions lib/parallelize-worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,40 +9,42 @@
// See COPYING for details
"use strict";

const Stream = require('stream');
const { workerData, parentPort } = require('worker_threads');
(async function main() {
const Stream = require('stream');
const { workerData, parentPort } = require('worker_threads');

const worker = require(workerData.workerPath);
const stream = worker(workerData.args, workerData.shard);
const worker = require(workerData.workerPath);
const stream = await worker(workerData.args, workerData.shard);

const input = new Stream.Readable({
objectMode: true,
const input = new Stream.Readable({
objectMode: true,

read() {}
});
parentPort.on('message', (msg) => {
if (msg.data)
input.push(msg.data);
else if (msg.end)
input.push(null);
else
throw new Error('unrecognized message: ' + JSON.stringify(msg));
});
read() {}
});
parentPort.on('message', (msg) => {
if (msg.data)
input.push(msg.data);
else if (msg.end)
input.push(null);
else
throw new Error('unrecognized message: ' + JSON.stringify(msg));
});

const output = new Stream.Writable({
objectMode: true,
const output = new Stream.Writable({
objectMode: true,

write(data, encoding, callback) {
parentPort.postMessage({ data, end: false });
callback();
},
final(callback) {
parentPort.postMessage({ data: undefined, end: true });
process.nextTick(() => {
parentPort.close();
write(data, encoding, callback) {
parentPort.postMessage({ data, end: false });
callback();
});
}
});
},
final(callback) {
parentPort.postMessage({ data: undefined, end: true });
process.nextTick(() => {
parentPort.close();
callback();
});
}
});

input.pipe(stream).pipe(output);
input.pipe(stream).pipe(output);
})();
Loading

0 comments on commit 66fa24c

Please sign in to comment.