Permalink
Fetching contributors…
Cannot retrieve contributors at this time
executable file 673 lines (601 sloc) 20.5 KB
#!/usr/bin/env node
// -*- mode: js -*-
// vim: set syntax=javascript ts=4 sts=4 sw=4 et:
/*
* Copyright 2017 Joyent, Inc.
*/
var net = require('net');
var url = require('url');
var assert = require('assert-plus');
var bunyan = require('bunyan');
var dashdash = require('dashdash');
var libuuid = require('uuid');
var path = require('path');
var vasync = require('vasync');
var manta = require('../lib');
///--- Globals
var QUIET = false;
var NAME = path.basename(process.argv[1]);
var LOG = bunyan.createLogger({
name: NAME,
level: process.env.LOG_LEVEL || bunyan.INFO,
stream: process.stderr
});
var JOB_POLL_PERIOD = 1000 * 5;
var MANTA_ASSET = '/poseidon/public/medusa/agent.sh';
var ESCAPE = '~';
var CC_ESCAPE = ESCAPE.charCodeAt(0);
var CC_HELP = '?'.charCodeAt(0);
var CC_INFO = 'i'.charCodeAt(0);
var CC_EXIT = '.'.charCodeAt(0);
var CC_CR = '\r'.charCodeAt(0);
var CC_NL = '\n'.charCodeAt(0);
var OPTIONS_PARSER = dashdash.createParser({
options: manta.DEFAULT_CLI_OPTIONS.concat([
{
group: NAME + ' options'
},
{
names: [ 'command', 'c' ],
type: 'string',
help: 'command to run instead of default shell',
helpArg: 'COMMAND'
},
{
names: [ 'asset', 's' ],
type: 'arrayOfString',
help: 'asset(s) to place in zones',
helpArg: 'ASSET_PATH'
},
{
names: [ 'quiet', 'q' ],
type: 'bool',
help: 'disable advisory/progress messages'
},
{
names: [ 'disk' ],
type: 'positiveInteger',
help: 'amount of disk space available for all phases (gigabytes)',
helpArg: 'DISK_GB'
},
{
names: [ 'memory' ],
type: 'positiveInteger',
help: 'amount of memory available for all phases (megabytes)',
helpArg: 'MEMORY_MB'
},
{
names: [ 'init' ],
type: 'string',
help: 'command to run before execution; must be valid bash script',
helpArg: 'INIT_COMMAND'
},
{
names: [ 'image' ],
type: 'string',
help: 'Allowed compute image string (semver)',
helpArg: 'IMAGE_SEMVER'
},
{
names: [ 'escape', 'e' ],
type: 'string',
help: 'Escape character (single non-newline character)',
helpArg: 'ESCAPE_CHARACTER'
}
])
});
///--- Functions
var endl_needed = false;
function _endl() {
if (!endl_needed)
return;
endl_needed = false;
process.stdout.write('\n\n');
}
function _log(str) {
if (QUIET)
return;
_endl();
process.stdout.write(' * ' + str + '\n');
}
function parseOptions() {
var opts, usercmd;
try {
opts = OPTIONS_PARSER.parse(process.argv);
} catch (e) {
manta.cli_usage(OPTIONS_PARSER, e.message, 'path...');
}
manta.cli_logger(opts, LOG);
if (opts.help)
manta.cli_usage(OPTIONS_PARSER, false, 'path...');
manta.cliVersionCheckPrintAndExit(opts);
manta.cliCompletionCheckPrintAndExit(opts, OPTIONS_PARSER, NAME, ['mpath']);
if (opts._args.length > 1)
manta.cli_usage(OPTIONS_PARSER, 'path required', 'path...');
if (opts._args.length === 0) {
// Keyless job
opts.keyless = true;
} else {
opts.keyless = false;
if (manta.assertPath(opts._args[0], true)) {
console.error('mlogin: "' + opts._args[0] +
'" is not a valid path');
process.exit(1);
}
opts.path = manta.path(opts._args[0], true);
}
/*
* The default behavior in Manta jobs is that if a process dumps core, then
* the whole task is aborted. That's not appropriate (and pretty annoying)
* for interactive use, so we run the command in our own contract that will
* ignore when a process dumps core.
*/
usercmd = opts.command;
opts.command = '/usr/bin/ctrun';
opts.arguments = [ '-i', 'core', '-l', 'child', '/bin/bash' ];
if (usercmd) {
opts.arguments.push('-c', usercmd);
} else {
opts.arguments.push('--norc');
}
opts.running = true;
opts.cancel_job = false;
if (opts.quiet)
QUIET = true;
if (opts.escape) {
if (opts.escape === 'none') {
// We accept, as does ssh(1), the value "none" to disable
// escape character recognition altogether.
ESCAPE = CC_ESCAPE = null;
} else if (opts.escape.length !== 1) {
console.error('mlogin: escape character sequences must ' +
'be a single character');
process.exit(1);
} else {
if (opts.escape === '\r' || opts.escape === '\n') {
console.error('mlogin: escape character must not be ' +
'carriage return or linefeed');
process.exit(1);
}
ESCAPE = opts.escape;
CC_ESCAPE = ESCAPE.charCodeAt(0);
}
}
opts.config_key = '/' + opts.account + '/stor/medusa-config-' +
libuuid.v4() + '.json';
try {
manta.checkBinEnv(opts);
} catch (e) {
manta.cli_usage(OPTIONS_PARSER, e.message, 'path...');
}
return (opts);
}
// Create the job in which our interactive agent will run
function create_job(opts, next) {
var the_job = {
name: 'interactive compute job',
phases: [
{
exec: '/assets' + MANTA_ASSET,
type: opts.keyless ? 'reduce' : 'map',
assets: [
MANTA_ASSET,
opts.config_key
]
}
]
};
if (opts.memory)
the_job.phases[0].memory = opts.memory;
if (opts.disk)
the_job.phases[0].disk = opts.disk;
if (opts.init)
the_job.phases[0].init = opts.init;
if (opts.image)
the_job.phases[0].image = opts.image;
if (opts.asset) {
opts.asset.forEach(function (asset) {
the_job.phases[0].assets.push(
manta.path(asset, true));
});
}
opts.mclient.createJob(the_job, function (err, jobid) {
if (!err) {
_log('created interactive job -- ' + jobid);
opts.job_id = jobid;
opts.cancel_job = true;
}
next(err);
});
}
// If this is a map job (on the single key provided as arguments to mlogin)
// then add it to the job. Otherwise we are a keyless reduce job, so do
// nothing.
function add_job_key(opts, next) {
if (opts.keyless) {
// This is a reduce job, so skip over this phase.
next();
return;
} else {
opts.mclient.addJobKey(opts.job_id, opts.path, next);
}
}
// Complete key input for our job
function end_job(opts, next) {
opts.mclient.endJob(opts.job_id, next);
}
// Poll every 15 seconds to keep the connection alive.
function start_server_pinger(opts) {
var pinger = function () {
if (!opts.running)
return;
if (opts.shed) {
opts.shed.send('ping');
}
setTimeout(pinger, 15 * 1000);
};
pinger();
}
// Try and print out some minimal extra information from the errors
// the job generated, if any:
function print_job_error_messages(opts, callback) {
opts.mclient.jobErrors(opts.job_id, function (err, res) {
if (err) {
callback();
return;
}
res.on('err', function (e) {
_log('IN-JOB ERROR: ' + e.code + ': ' + e.message);
});
res.once('end', function () {
callback();
});
});
}
// Set up a daemon to poll for Job status and detect completion.
function poll_for_completion(opts, next) {
var poller = function () {
opts.poll_timeout = null;
opts.mclient.job(opts.job_id, {}, function (err, job) {
if (err || job.state !== 'done') {
opts.poll_timeout = setTimeout(poller, JOB_POLL_PERIOD);
} else {
var stats = job.stats || {};
var retries = stats.retries || 0;
var errors = stats.errors || 0;
// The job is finished, so trigger a cleanup.
if (retries > 0) {
_log('Job experienced transient failures in the ' +
'Manta compute engine. Please try mlogin again.');
opts.cleanup();
} else if (errors > 0) {
_log('Error during job run; please check errors.');
print_job_error_messages(opts, function () {
opts.cleanup();
});
} else {
_log('Job is now "done"; cleaning up...');
opts.cleanup();
}
}
});
};
opts.poll_timeout = setTimeout(poller, JOB_POLL_PERIOD);
next();
}
// Pre-sign a URL for the agent to use in the context of the marlin job
// to connect back to the public manta endpoint. NOTE: This function
// requires job_id to be value, and so must not be run until the job
// itself has been created, as the job_id is included in the URL.
function get_signed_url(opts, next) {
opts.mclient.signURL({
expires: Math.floor((new Date().getTime() / 1000) + 300), // 5 Minutes
method: 'GET',
path: '/' + opts.account + '/medusa/attach/' + opts.job_id + '/slave'
}, function (err, res) {
if (!err) {
// Remove trailing slashes from MANTA_URL before combining with
// the signed path:
/* JSSTYLED */
var uu = opts.url.replace(/\/*$/, '');
opts.signed_url = uu + res;
}
next(err);
});
}
// Put a short-lived temporary object into Manta that we can pass to the
// agent via the asset mechanism.
function put_config_object(opts, next) {
assert.string(opts.signed_url, 'opts.signed_url');
var sobj = JSON.stringify({
signed_url: opts.signed_url,
insecure_tls: opts.insecure ? true : false
});
var putopts = {
copies: 1,
size: sobj.length
};
var inputstream = new manta.StringStream(sobj);
opts.mclient.put(opts.config_key, inputstream, putopts, next);
}
function connect_medusa(opts, next) {
opts.mclient.medusaAttach(opts.job_id, function (err, shed) {
if (err) {
next(err);
return;
}
opts.shed = shed;
var spinner = ['-', '\\', '|', '/'];
var waitcount = 0;
shed.on('text', function (msg) {
var m = msg.match(/^medusa:(.*)$/);
if (m) {
var o = JSON.parse(m[1]);
if (o.type === 'linkup') {
// We don't want the pre-session listeners anymore:
shed.removeAllListeners('text');
shed.removeAllListeners('end');
if (waitcount === 0)
_log('session established');
else
process.stdout.write(' established\n');
endl_needed = false;
start_session(opts);
} else if (o.type === 'wait') {
if (QUIET)
return;
endl_needed = true;
if (waitcount++ === 0) {
process.stdout.write(' * waiting for session... ');
} else {
process.stdout.write('\b' + spinner[
waitcount % spinner.length]);
}
}
}
});
shed.on('end', function (code, reason) {
_log('connection ended unexpectedly (' + code + '/' + reason + ')');
opts.cleanup(1);
});
next();
});
}
// Do some initial sanity checks to ensure the path we are passed on the
// command line is valid -- i.e. an extant object.
function check_path(opts, next) {
if (opts.keyless) {
// We don't have a path, so there is nothing to check.
next();
return;
}
opts.mclient.info(opts.path, function (err, info) {
if (err) {
if (err.code === 'NotFoundError') {
err = new Error('object not found in Manta');
} else if (err.code === 'ForbiddenError') {
err = new Error('you do not have permission to operate ' +
'on that object.');
}
next(err);
return;
}
if (info.extension === 'directory') {
next(new Error('mlogin only works on objects, not directories'));
return;
}
next();
});
}
// Once the Medusa reflector informs us that we are attached to the agent
// running in the marlin job, we send the agent some terminal details and
// a command and start relaying between the remote agent and the local
// terminal.
function start_session(opts) {
var shed = opts.shed;
// Request that the agent start a new process with the command
// we want, and the current local terminal configuration.
shed.send('mlogin:' + JSON.stringify({
type: 'start',
cwd: '/',
term: process.env.TERM || 'xterm',
columns: process.stdout.columns,
lines: process.stdout.rows,
command: opts.command,
arguments: opts.arguments
}));
start_server_pinger(opts);
var saw_cr = false;
var saw_escape = false;
var user_input = function (buf) {
var outbuf = new Buffer(buf.length);
var obpos = 0;
if (CC_ESCAPE === null)
return (buf);
for (var i = 0; i < buf.length; i++) {
if (saw_escape) {
// If the previous character we saw was an escape character,
// then we must decide what special action to take now.
if (buf[i] === CC_ESCAPE) {
// The user wants us to send the escape character
// to the remote host.
outbuf[obpos++] = CC_ESCAPE;
} else if (buf[i] === CC_HELP) {
// Print the help menu:
process.stdout.write('\n' + [
'Supported escape sequences:',
' !. - terminate session',
' !i - print job information',
' !? - this message',
' !! - send the escape character by typing ' +
'it twice',
'(Note that escapes are only recognized ' +
'immediately after newline.)'
].join('\n').replace(/!/g, ESCAPE) + '\n');
} else if (buf[i] === CC_INFO) {
// Print out job statistics
process.stdout.write('\n' + [
'Information about this Manta job:',
' Job ID: ' + opts.job_id
].join('\n') + '\n');
} else if (buf[i] === CC_EXIT) {
opts.cancel_job = true;
opts.cleanup();
}
saw_escape = false;
continue;
} else if (saw_cr) {
// If the previous character we saw was a carriage return,
// then this is the beginning of a new line. Check for
// the escape character.
if (buf[i] === CC_ESCAPE) {
saw_escape = true;
continue;
}
}
// If this character is a carriage return or line feed,
// remember it:
saw_cr = (buf[i] === CC_CR || buf[i] === CC_NL);
// Copy character into output buffer:
outbuf[obpos++] = buf[i];
}
return (outbuf.slice(0, obpos));
};
shed.on('text', function (x) {
if (!opts.running) {
shed.destroy();
return;
}
// Process the incoming message to look for control messages
// from the remote agent:
var m = x.match(/^mlogin:(.*)$/);
if (!m)
return;
var o = JSON.parse(m[1]);
if (o.type === 'started') {
endl_needed = true;
process.stdin.setRawMode(true);
process.stdin.on('data', function (ch) {
var buf = Buffer.isBuffer(ch) ? ch : new Buffer(ch);
var outbuf = user_input(buf);
shed.send(outbuf);
});
process.stdin.resume();
process.stdout.on('resize', function () {
shed.send('mlogin:' + JSON.stringify({
type: 'resize',
columns: process.stdout.columns,
lines: process.stdout.rows
}));
});
} else if (o.type === 'error') {
_log('remote error: ' + o.error);
shed.end('aborting');
opts.cleanup(1);
return;
} else if (o.type === 'exit') {
_log('remote process exited');
opts.cancel_job = false;
shed.end('done');
opts.cleanup(0);
return;
}
});
shed.on('binary', function (x) {
process.stdout.write(x);
});
shed.on('connectionReset', function () {
_log('connection reset by peer');
opts.cleanup(1);
});
shed.on('end', function (code, reason) {
_log('session complete');
opts.cancel_job = false;
opts.cleanup(0);
});
}
function wrap_run(func) {
return (function () {
if (!arguments[0].running) {
arguments[1](true);
return;
}
func.apply(this, arguments);
});
}
///--- Mainline
(function main() {
var options = parseOptions();
// As this is an interactive process, if we are not attached to
// a controlling terminal then we cannot usefully run. Bail out
// up front, rather than fail later. If node could be coerced to
// open("/dev/tty"), we could grab the controlling TTY even if the
// standard descriptors have been redirected, a la SSH, etc.
//
// Note that we explicitly don't check "stderr" here, so that
// the user can redirect bunyan logs from an mlogin session
// without interfering with the regular interactive use of
// the session.
[ 'stdin', 'stdout' ].forEach(function (ttyname) {
if (!Boolean(process[ttyname].isTTY)) {
_log('ERROR: ' + ttyname + ' is not a TTY; aborting.');
process.exit(1);
}
});
options.mclient = manta.createBinClient(options);
var done = function (err) {
// If we receive an error at this stage, then attempt to clean up
// the job we may have created, and the config key we may have put,
// etc. Otherwise, the job state poller will notice if the job
// completes early for some reason, and we'll otherwise clean up
// naturally at the end of the session.
if (err) {
_log('ERROR: ' + (err.message || err.code || 'Unknown Error'));
if (options.verbose)
_log(' ' + err.stack);
options.cleanup(1);
}
};
options.running = true;
options.cleanup = function (rc) {
if (!options.running)
return;
options.running = false;
_log('cleaning up resources...');
var b = vasync.barrier();
if (options.mclient) {
if (options.cancel_job) {
b.start('cancelJob');
options.mclient.cancelJob(options.job_id, function (err) {
b.done('cancelJob');
});
}
b.start('unlink');
options.mclient.unlink(options.config_key, function (err) {
b.done('unlink');
});
}
b.on('drain', function () {
process.exit(rc);
});
};
// This pipeline sets up the configuration object and the job that
// runs out interactive session agent:
process.once('SIGINT', function () {
_log('caught interrupt; aborting...');
options.cleanup(1);
});
vasync.pipeline({
arg: options,
funcs: [
wrap_run(check_path),
wrap_run(create_job),
wrap_run(poll_for_completion),
wrap_run(connect_medusa),
wrap_run(get_signed_url),
wrap_run(put_config_object),
wrap_run(add_job_key),
wrap_run(end_job)
]
}, done);
})();