Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow proxy through master #427

Open
wants to merge 28 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -607,6 +607,10 @@ Setting this parameter to `true` will force Cronicle's Web UI to connect to the

This property only takes effect if [web_direct_connect](#web_direct_connect) is also set to `true`.

#### live_log_poll_interval

The interval at which live logs are polled from the running job.

### socket_io_transports

This is an advanced configuration property that you will probably never need to worry about. This allows you to customize the [socket.io transports](https://socket.io/docs/client-api/) used to connect to the server for real-time updates. By default, this property is set internally to an array containing the `websocket` transport only, e.g.
Expand Down
130 changes: 33 additions & 97 deletions htdocs/js/pages/JobDetails.class.js
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ Class.subclass( Page.Base, "Page.JobDetails", {
],
backgroundColor: [
(cpu_avg < jcm*0.5) ? this.pie_colors.cool :
(cpu_avg < jcm*0.5) ? this.pie_colors.cool :
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing this, really appreciate it! But just as a question is this extra line a typo?

((cpu_avg < jcm*0.75) ? this.pie_colors.warm : this.pie_colors.hot),
this.pie_colors.empty
],
Expand Down Expand Up @@ -864,21 +865,9 @@ Class.subclass( Page.Base, "Page.JobDetails", {

html += '</div>';

// live job log tail
var remote_api_url = app.proto + job.hostname + ':' + app.port + config.base_api_uri;
if (config.custom_live_log_socket_url) {
// custom websocket URL for single-master systems behind an LB
remote_api_url = config.custom_live_log_socket_url + config.base_api_uri;
}
else if (!config.web_socket_use_hostnames && app.servers && app.servers[job.hostname] && app.servers[job.hostname].ip) {
// use ip if available, may work better in some setups
remote_api_url = app.proto + app.servers[job.hostname].ip + ':' + app.port + config.base_api_uri;
}

html += '<div class="subtitle" style="margin-top:15px;">';
html += 'Live Job Event Log';
html += '<div class="subtitle_widget" style="margin-left:2px;"><a href="'+remote_api_url+'/app/get_live_job_log?id='+job.id+'" target="_blank"><i class="fa fa-external-link">&nbsp;</i><b>View Full Log</b></a></div>';
html += '<div class="subtitle_widget"><a href="'+remote_api_url+'/app/get_live_job_log?id='+job.id+'&download=1"><i class="fa fa-download">&nbsp;</i><b>Download Log</b></a></div>';
html += '<div class="subtitle_widget"><a href="/app/get_live_job_log_proxy?id='+job.id+'&download=1"><i class="fa fa-download">&nbsp;</i><b>Download Log</b></a></div>';
html += '<div class="clear"></div>';
html += '</div>';

Expand Down Expand Up @@ -1040,91 +1029,38 @@ Class.subclass( Page.Base, "Page.JobDetails", {
start_live_log_watcher: function(job) {
// open special websocket to target server for live log feed
var self = this;
var $cont = null;
var chunk_count = 0;
var error_shown = false;

var url = app.proto + job.hostname + ':' + app.port;
if (config.custom_live_log_socket_url) {
// custom websocket URL for single-master systems behind an LB
url = config.custom_live_log_socket_url;
}
else if (!config.web_socket_use_hostnames && app.servers && app.servers[job.hostname] && app.servers[job.hostname].ip) {
// use ip if available, may work better in some setups
url = app.proto + app.servers[job.hostname].ip + ':' + app.port;
var $cont = $('#d_live_job_log');

self.curr_live_log_job = job.id;

var previous_data = []

// poll live_console api until job is running or some error occur
function refresh() {
if(self.curr_live_log_job != job.id) return; // prevent double logging
app.api.post('/api/app/get_live_job_log_proxy', { id: job.id }
, (data) => { // success callback
if (!data.data) return; // stop polling if no data

// Prevent short logs from showing duplicate lines
var new_data = data.data.split(/\r?\n/)
var trimmed_data = new_data.filter((item) => previous_data.indexOf(item)< 0)
previous_data = previous_data.concat(trimmed_data)

$cont.append('<pre class="log_chunk">' + trimmed_data.join('\n') + '</pre>');
pollInterval = parseInt(config.live_log_poll_interval)
if(!pollInterval || pollInterval < 1000) pollInterval = 1000;
setTimeout(refresh, 1000);
}
// stop polling on error, report unexpected errors
, (e) => {
if(e.code != 'job') console.error('Live log poll error: ', e)
return
}
)
}

$('#d_live_job_log').append(
'<pre class="log_chunk" style="color:#888">Log Watcher: Connecting to server: ' + url + '...</pre>'
);

this.socket = io( url, {
forceNew: true,
transports: config.socket_io_transports || ['websocket'],
reconnection: true,
reconnectionDelay: 1000,
reconnectionDelayMax: 5000,
reconnectionAttempts: 9999,
timeout: 5000
} );

this.socket.on('connect', function() {
Debug.trace("JobDetails socket.io connected successfully: " + url);

// cache this for later
$cont = $('#d_live_job_log');

$cont.append(
'<pre class="log_chunk" style="color:#888; margin-bottom:14px;">Log Watcher: Connected successfully!</pre>'
);

// get auth token from master server (uses session)
app.api.post( 'app/get_log_watch_auth', { id: job.id }, function(resp) {
// now request log watch stream on target server
self.socket.emit( 'watch_job_log', {
token: resp.token,
id: job.id
} );
}); // api.post
} );
this.socket.on('connect_error', function(err) {
Debug.trace("JobDetails socket.io connect error: " + err);
$('#d_live_job_log').append(
'<pre class="log_chunk">Log Watcher: Server Connect Error: ' + err + ' (' + url + ')</pre>'
);
error_shown = true;
} );
this.socket.on('connect_timeout', function(err) {
Debug.trace("JobDetails socket.io connect timeout");
if (!error_shown) $('#d_live_job_log').append(
'<pre class="log_chunk">Log Watcher: Server Connect Timeout: ' + err + ' (' + url + ')</pre>'
);
} );
this.socket.on('reconnect', function() {
Debug.trace("JobDetails socket.io reconnected successfully");
} );

this.socket.on('log_data', function(lines) {
// received log data, as array of lines
var scroll_y = $cont.scrollTop();
var scroll_max = Math.max(0, $cont.prop('scrollHeight') - $cont.height());
var need_scroll = ((scroll_max - scroll_y) <= 10);

$cont.append(
'<pre class="log_chunk">' +
lines.map( function(line) { return line.replace(/</g, '&lt;'); } ).join("\n").trim() +
'</pre>'
);

// only show newest 1K chunks
chunk_count++;
if (chunk_count >= 1000) {
$cont.children().first().remove();
chunk_count--;
}

if (need_scroll) $cont.scrollTop( $cont.prop('scrollHeight') );
} );

refresh();
},

update_live_progress: function(job) {
Expand Down
1 change: 1 addition & 0 deletions lib/api/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ module.exports = Class.create({
external_user_api: this.usermgr.config.get('external_user_api') || '',
web_socket_use_hostnames: this.server.config.get('web_socket_use_hostnames') || 0,
web_direct_connect: this.server.config.get('web_direct_connect') || 0,
live_log_poll_interval: this.server.config.get('live_log_poll_interval') || 1000,
socket_io_transports: this.server.config.get('socket_io_transports') || 0
} ),
port: args.request.headers.ssl ? this.web.config.get('https_port') : this.web.config.get('http_port'),
Expand Down
72 changes: 72 additions & 0 deletions lib/api/job.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
var fs = require('fs');
var assert = require("assert");
var async = require('async');
var readLastLines = require('read-last-lines');

var Class = require("pixl-class");
var Tools = require("pixl-tools");
Expand Down Expand Up @@ -81,6 +82,77 @@ module.exports = Class.create({
callback( "200 OK", headers, stream );
} );
},

api_get_live_job_log_proxy: function(args, callback) {
// get live job logs from a remote worker
// client API, no auth
var self = this;

var params = Tools.mergeHashes(args.params, args.query);

self.loadSession(args, function (err, session, user) {
if (err) return self.doError('session', err.message, callback);
if (!self.requireValidUser(session, user, callback)) return;

if (!self.requireParams(params, {
id: /^\w+$/
}, callback)) return;

var job = self.findJob(params);
if (!job) return self.doError('job', "Failed to locate job: " + params.id, callback);

var slave = self.slaves[ job.hostname ];
if (!slave) {
this.logError('job', "Failed to locate slave: " + job.hostname + " for job: " + job.id);
slave = { hostname: job.hostname }; // hail mary
}

var api_url = self.getServerBaseAPIURL( slave.hostname, slave.ip ) + '/app/get_live_log_tail';
var tailSize = parseInt(params.tail) || 80;
var auth = Tools.digestHex(params.id + self.server.config.get('secret_key'))
var reqParams = { id: job.id, tail: tailSize, download: params.download || 0, auth: auth }

self.request.json(api_url, reqParams, (err, resp, data) => {
if (err) return self.doError('job', "Failed to fetch live job log: " + err.message, callback);
data.hostname = job.hostname;
data.event_title = job.event_title;
callback(data);
});
});
},

api_get_live_log_tail: function (args, callback) {
// internal api, runs on target machine
var self = this;

let params = Tools.mergeHashes(args.params, args.query);

if (!self.requireParams(params, {
id: /^\w+$/,
auth: /^\w+$/
}, callback)) return;

if (params.auth != Tools.digestHex(params.id + self.server.config.get('secret_key'))) {
return callback("403 Forbidden", {}, "Authentication failure.\n");
}

// see if log file exists on this server
var log_file = self.server.config.get('log_dir') + '/jobs/' + params.id + '.log';

let tailSize = parseInt(params.tail) || 80;
if (params.download == 1) { // read entire file
fs.readFile(log_file, { encoding: 'utf-8' }, (err, data) => {
if (err) return self.doError('job', "Failed to fetch job log: invalid or completed job", callback);
callback({ data: data });
});

}
else {
readLastLines.read(log_file, tailSize )
.then( lines => callback({data: lines}))
.catch(e => { return self.doError('job', "Failed to fetch job log: invalid or completed job", callback)})
}
},

api_get_live_job_log: function(args, callback) {
// get live job job, as it is being written
Expand Down
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "Cronicle",
"version": "0.8.62",
"version": "0.8.63",
"description": "A simple, distributed task scheduler and runner with a web based UI.",
"author": "Joseph Huckaby <jhuckaby@gmail.com>",
"homepage": "https://github.com/jhuckaby/Cronicle",
Expand Down Expand Up @@ -55,7 +55,8 @@
"pixl-server-storage": "^2.0.10",
"pixl-server-web": "^1.1.7",
"pixl-server-api": "^1.0.2",
"pixl-server-user": "^1.0.9"
"pixl-server-user": "^1.0.9",
"read-last-lines": "^1.8.0"
},
"devDependencies": {
"pixl-unit": "^1.0.10"
Expand Down
1 change: 1 addition & 0 deletions sample_conf/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
"server_comm_use_hostnames": false,
"web_direct_connect": false,
"web_socket_use_hostnames": false,
"live_log_poll_interval": 1000,

"job_memory_max": 1073741824,
"job_memory_sustain": 0,
Expand Down