Skip to content
This repository has been archived by the owner on May 29, 2021. It is now read-only.

Implement dependency injection into workers, resolves #203 #211

Merged
merged 2 commits into from
Nov 5, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion bin/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ function runWorker(host, port) {
totalmem: os.totalmem(),
hostname: hostname || process.env.puuid,
type: 'worker',
wsid: process.env.wsid,
wsid: Number(process.env.wsid),
jobId: ''
}
}, function (err, res) {
Expand All @@ -200,6 +200,8 @@ function runWorker(host, port) {
task.dlog = dlog;
task.lib = {aws: aws, azure: azure, sizeOf: sizeOf, fs: fs, readSplit: readSplit, Lines: Lines, task: task, mkdirp: mkdirp, parquet: parquet, stream: stream, url: url, uuid: uuid, zlib: zlib};
task.grid = grid;
// Indirect Eval to set user dependencies bundle in the worker global context
(0, eval)(task.bundle);
task.run(function(result) {
result.workerId = task.workerId;
grid.reply(msg, null, result);
Expand Down
67 changes: 51 additions & 16 deletions lib/context-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
var child_process = require('child_process');
var fs = require('fs');
var os = require('os');
var path = require('path');
var url = require('url');
var zlib = require('zlib');

var browserify = require('browserify');
var callsite = require('callsite');
var mkdirp = require('mkdirp');
var rimraf = require('rimraf');
var seedrandom = require('seedrandom');
Expand All @@ -30,8 +33,8 @@ function Context(arg) {
var tmp = process.env.SKALE_TMP || '/tmp';
var self = this;
log('workers:', nworker);
this.bundleDone = true;
this.env = {};
this.modules = {};
this.maxShufflePartitions = arg.maxShufflePartitions || process.env.SKALE_MAX_SHUFFLE_PARTITIONS;
this.blockSize = arg.blockSize || process.env.SKALE_BLOCK_SIZE || 128;
this.worker = new Array(nworker);
Expand Down Expand Up @@ -62,6 +65,23 @@ function Context(arg) {
this.log = log;
this.dlog = dlog;

this.require = function (obj) {
const stack = callsite();
const requester = stack[1].getFileName();
if (!this.browserify) {
this.browserify = browserify();
this.postBundle = '';
}
for (let name in obj) {
let file = obj[name];
let pathname = require.resolve(file, {paths: [path.dirname(requester)]});
this.browserify.require(pathname, {expose: file});
this.postBundle += 'var ' + name + '=require("' + file + '");';
}
this.bundleDone = false;
return this;
};

this.end = function () {
if (global._scn) {
global._scn--;
Expand Down Expand Up @@ -170,7 +190,6 @@ function Context(arg) {
if (!this.worker[wid].init) {
this.worker[wid].init = true;
task.env = this.env;
task.modules = this.modules;
}

var str = serialize(task);
Expand All @@ -196,25 +215,39 @@ function Context(arg) {
}
};

this.onBundle = function (callback) {
if (this.bundleDone) {
this.bundle = undefined;
return callback();
}
this.browserify.bundle(function (err, res) {
self.bundle = res.toString() + self.postBundle;
self.bundleDone = true;
callback();
});
};

this.runJob = function (opt, root, action, callback) {
var jobId = this.jobId++;
var totalStages;

//this.getWorkers(function () {
findShuffleStages(function(shuffleStages) {
totalStages = shuffleStages.length + 1;
if (shuffleStages.length === 0) runResultStage();
else {
var cnt = 0;
runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
}
function shuffleDone() {
if (++cnt < shuffleStages.length) {
this.onBundle(function () {
//this.getWorkers(function () {
findShuffleStages(function(shuffleStages) {
totalStages = shuffleStages.length + 1;
if (shuffleStages.length === 0) runResultStage();
else {
var cnt = 0;
runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
} else runResultStage();
}
}
function shuffleDone() {
if (++cnt < shuffleStages.length) {
runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
} else runResultStage();
}
});
//});
});
//});

function runShuffleStage(stage, stageNum, done) {
var stageStart = Date.now();
Expand All @@ -235,9 +268,10 @@ function Context(arg) {
for (i = 0; i < stage.nShufflePartitions; i++) {
tasks.push(new Task({
basedir: self.basedir,
bundle: self.bundle,
datasetId: stage.id,
jobId: jobId,
nodes: nodes,
datasetId: stage.id,
pid: i
}));
}
Expand Down Expand Up @@ -284,6 +318,7 @@ function Context(arg) {
for (var i = 0; i < root.nPartitions; i++) {
tasks.push(new Task({
basedir: self.basedir,
bundle: self.bundle,
jobId: jobId,
nodes: nodes,
datasetId: root.id,
Expand Down
61 changes: 49 additions & 12 deletions lib/context.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
'use strict';

var fs = require('fs');
var path = require('path');
var util = require('util');
var url = require('url');
var zlib = require('zlib');

var browserify = require('browserify');
var callsite = require('callsite');
var mkdirp = require('mkdirp');
var seedrandom = require('seedrandom');
var uuid = require('uuid');
Expand Down Expand Up @@ -36,6 +39,7 @@ function SkaleContext(arg) {

this.started = this.ended = false;
this.jobId = 0;
this.bundleDone = true;
this.env = arg.env || {};
this.maxShufflePartitions = arg.maxShufflePartitions || process.env.SKALE_MAX_SHUFFLE_PARTITIONS;
this.blockSize = arg.blockSize || process.env.SKALE_BLOCK_SIZE || 128;
Expand Down Expand Up @@ -108,6 +112,23 @@ function SkaleContext(arg) {
fs.createReadStream(msg.path, msg.opt).pipe(self.createStreamTo(msg));
});

this.require = function (obj) {
const stack = callsite();
const requester = stack[1].getFileName();
if (!this.browserify) {
this.browserify = browserify();
this.postBundle = '';
}
for (let name in obj) {
let file = obj[name];
let pathname = require.resolve(file, {paths: [path.dirname(requester)]});
this.browserify.require(pathname, {expose: file});
this.postBundle += 'var ' + name + '=require("' + file + '");';
}
this.bundleDone = false;
return this;
};

this.end = function () {
if (self.ended) return;
self.ended = true;
Expand Down Expand Up @@ -239,22 +260,36 @@ function SkaleContext(arg) {
}
};

this.onBundle = function (callback) {
if (this.bundleDone) {
this.bundle = undefined;
return callback();
}
this.browserify.bundle(function (err, res) {
self.bundle = res.toString() + self.postBundle;
self.bundleDone = true;
callback();
});
};

this.runJob = function(opt, root, action, callback) {
var jobId = this.jobId++;
var totalStages;

this.getWorkers(function () {
findShuffleStages(function(shuffleStages) {
totalStages = shuffleStages.length + 1;
if (shuffleStages.length === 0) runResultStage();
else {
var cnt = 0;
runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
}
function shuffleDone() {
if (++cnt < shuffleStages.length) runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
else runResultStage();
}
this.onBundle(function () {
self.getWorkers(function () {
findShuffleStages(function(shuffleStages) {
totalStages = shuffleStages.length + 1;
if (shuffleStages.length === 0) runResultStage();
else {
var cnt = 0;
runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
}
function shuffleDone() {
if (++cnt < shuffleStages.length) runShuffleStage(shuffleStages[cnt], cnt, shuffleDone);
else runResultStage();
}
});
});
});

Expand All @@ -276,6 +311,7 @@ function SkaleContext(arg) {
for (i = 0; i < stage.nShufflePartitions; i++) {
tasks.push(new Task({
basedir: self.basedir,
bundle: self.bundle,
jobId: jobId,
nodes: nodes,
datasetId: stage.id,
Expand Down Expand Up @@ -323,6 +359,7 @@ function SkaleContext(arg) {
for (var i = 0; i < root.nPartitions; i++) {
tasks.push(new Task({
basedir: self.basedir,
bundle: self.bundle,
jobId: jobId,
nodes: nodes,
datasetId: root.id,
Expand Down
1 change: 1 addition & 0 deletions lib/task.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ module.exports = Task;
// function Task(basedir, jobId, nodes, datasetId, pid, action) {
function Task(init) {
this.basedir = init.basedir;
this.bundle = init.bundle;
this.datasetId = init.datasetId;
this.pid = init.pid;
this.nodes = init.nodes;
Expand Down
4 changes: 3 additions & 1 deletion lib/worker-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ function runTask(msg) {
task.lib = {aws: aws, azure: azure, fs: fs, Lines: Lines, mkdirp: mkdirp, mm: mm, parquet: parquet, readSplit: readSplit, stream: stream, url: url, uuid: uuid, zlib: zlib};
task.log = log;
task.dlog = dlog;
// Expose critical core dependencies explicitely for user evaluated code in workers
// Expose system core dependencies explicitely for user evaluated code in workers
global.fs = fs;
// Indirect Eval to set user dependencies bundle in the worker global context
(0, eval)(task.bundle);
task.run(function (result) {
delete msg.req.args;
msg.result = result;
Expand Down