Skip to content

Commit

Permalink
Changed multiprocess appender to use a single socket per client
Browse files Browse the repository at this point in the history
  • Loading branch information
Gareth Jones committed Jul 3, 2012
1 parent df491c0 commit a33e48c
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 84 deletions.
174 changes: 116 additions & 58 deletions lib/appenders/multiprocess.js
Original file line number Diff line number Diff line change
@@ -1,75 +1,133 @@
var log4js = require('../log4js');
var layouts = require('../layouts');
var net = require('net');
var util = require('util');

var LogServer = function createLogServer(config) {
var actualAppender = config.actualAppender;
var server = net.createServer(function serverCreated(clientSocket) {
clientSocket.on('connect', function clientConnected() {
var logMessage = '';
clientSocket.on('data', function chunkReceived(chunk) {
logMessage += chunk;
});
clientSocket.on('end', function gotChunks() {
var log4js = require('../log4js'),
net = require('net'),
END_MSG = '__LOG4JS__';

/**
* Creates a server, listening on config.loggerPort, config.loggerHost.
* Output goes to config.actualAppender (config.appender is used to
* set up that appender).
*/
function logServer(config) {

/**
* Takes a utf-8 string, returns an object with
* the correct log properties.
*/
function deserializeLoggingEvent(clientSocket, msg) {
var loggingEvent;
try {
var loggingEvent = JSON.parse(logMessage);
deserializeLoggingEvent(loggingEvent);
actualAppender(loggingEvent);
loggingEvent = JSON.parse(msg);
loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level.toString = function levelToString() {
return loggingEvent.level.levelStr;
};
} catch (e) {
// JSON.parse failed, just log the contents probably a naughty.
actualAppender(createLoggingEvent('ERROR', 'Unable to parse log: ' + logMessage));
// JSON.parse failed, just log the contents probably a naughty.
loggingEvent = {
startTime: new Date(),
categoryName: 'log4js',
level: { toString: function () {
return 'ERROR';
}
},
data: [ 'Unable to parse log:', msg ]
};
}
});
});
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost');
}

function createLoggingEvent(level, message) {
return {
startTime: new Date(),
categoryName: 'log4js',
level: { toString: function () {
return level;
}},
data: [ message ]
};
}
loggingEvent.remoteAddress = clientSocket.remoteAddress;
loggingEvent.remotePort = clientSocket.remotePort;

return loggingEvent;
}

var actualAppender = config.actualAppender,
server = net.createServer(function serverCreated(clientSocket) {
clientSocket.setEncoding('utf8');
clientSocket.on('connect', function clientConnected() {
var logMessage = '';

function logTheMessage(msg) {
if (logMessage.length > 0) {
actualAppender(deserializeLoggingEvent(clientSocket, msg));
}
}

function chunkReceived(chunk) {
var event;
logMessage += chunk || '';
if (logMessage.indexOf(END_MSG) > -1) {
event = logMessage.substring(0, logMessage.indexOf(END_MSG));
logTheMessage(event);
logMessage = logMessage.substring(event.length + END_MSG.length) || '';
//check for more, maybe it was a big chunk
chunkReceived();
}
}

function deserializeLoggingEvent(loggingEvent) {
loggingEvent.startTime = new Date(loggingEvent.startTime);
loggingEvent.level.toString = function levelToString() {
return loggingEvent.level.levelStr;
};
clientSocket.on('data', chunkReceived);
clientSocket.on('end', chunkReceived);
});
});
server.listen(config.loggerPort || 5000, config.loggerHost || 'localhost');

return actualAppender;
}

function workerAppender(config) {
return function log(loggingEvent) {
var socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
socket.on('connect', function socketConnected() {
socket.end(JSON.stringify(loggingEvent), 'utf8');
});
};
var canWrite = false,
buffer = [],
socket;

createSocket();

function createSocket() {
socket = net.createConnection(config.loggerPort || 5000, config.loggerHost || 'localhost');
socket.on('connect', function() {
emptyBuffer();
canWrite = true;
});
socket.on('timeout', socket.end.bind(socket));
//don't bother listening for 'error', 'close' gets called after that anyway
socket.on('close', createSocket);
}

function emptyBuffer() {
var evt;
while ((evt = buffer.shift())) {
write(evt);
}
}

function write(loggingEvent) {
socket.write(JSON.stringify(loggingEvent), 'utf8');
socket.write(END_MSG, 'utf8');
}

return function log(loggingEvent) {
if (canWrite) {
write(loggingEvent);
} else {
buffer.push(loggingEvent);
}
};
}

function createAppender(config) {
if (config.mode === 'master') {
var server = new LogServer(config);
return config.actualAppender;
} else {
return workerAppender(config);
}
if (config.mode === 'master') {
return logServer(config);
} else {
return workerAppender(config);
}
}

function configure(config) {
var actualAppender;
if (config.appender && config.mode === 'master') {
log4js.loadAppender(config.appender.type);
actualAppender = log4js.appenderMakers[config.appender.type](config.appender);
config.actualAppender = actualAppender;
}
return createAppender(config);
var actualAppender;
if (config.appender && config.mode === 'master') {
log4js.loadAppender(config.appender.type);
actualAppender = log4js.appenderMakers[config.appender.type](config.appender);
config.actualAppender = actualAppender;
}
return createAppender(config);
}

exports.appender = createAppender;
Expand Down
50 changes: 25 additions & 25 deletions lib/layouts.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
var dateFormat = require('./date_format')
, util = require('util')
, replacementRegExp = /%[sdj]/g
, layoutMakers = {
"messagePassThrough": function() { return messagePassThroughLayout; }
, "basic": function() { return basicLayout; }
, "colored": function() { return colouredLayout; }
, "coloured": function() { return colouredLayout; }
, "pattern": function (config) {
var pattern = config.pattern || undefined;
return patternLayout(pattern);
}
}
, colours = {
ALL: "grey"
, TRACE: "blue"
, DEBUG: "cyan"
, INFO: "green"
, WARN: "yellow"
, ERROR: "red"
, FATAL: "magenta"
, OFF: "grey"
};
, util = require('util')
, replacementRegExp = /%[sdj]/g
, layoutMakers = {
"messagePassThrough": function() { return messagePassThroughLayout; }
, "basic": function() { return basicLayout; }
, "colored": function() { return colouredLayout; }
, "coloured": function() { return colouredLayout; }
, "pattern": function (config) {
var pattern = config.pattern || undefined;
return patternLayout(pattern);
}
}
, colours = {
ALL: "grey"
, TRACE: "blue"
, DEBUG: "cyan"
, INFO: "green"
, WARN: "yellow"
, ERROR: "red"
, FATAL: "magenta"
, OFF: "grey"
};


function formatLogData(logData) {
var output = ""
, data = Array.isArray(logData) ? logData.slice() : Array.prototype.slice.call(arguments)
, format = data.shift();
, data = Array.isArray(logData) ? logData.slice() : Array.prototype.slice.call(arguments)
, format = data.shift();

if (typeof format === "string") {
output = format.replace(replacementRegExp, function(match) {
Expand Down Expand Up @@ -257,4 +257,4 @@ module.exports = {
, layout: function(name, config) {
return layoutMakers[name] && layoutMakers[name](config);
}
};
};
1 change: 0 additions & 1 deletion test/logging.js
Original file line number Diff line number Diff line change
Expand Up @@ -576,5 +576,4 @@ vows.describe('log4js').addBatch({
assert.equal(logEvents[1].data[0], 'info3');
}
}

}).export(module);

0 comments on commit a33e48c

Please sign in to comment.