Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

update to DBClient/inHub to v0.10.7

  • Loading branch information...
commit 7dad92124215a38776b3a60650109b0863bb5c69 2 parents d76562e + dd79008
@kaven276 authored
View
3  bin/inHub.sh
@@ -0,0 +1,3 @@
+#!/usr/bin/env node
+
+require(require('path').join(__dirname,'..')).inHub.startShell();
View
562 lib/DBClient.js
@@ -0,0 +1,562 @@
+/**
+ * Created by cuccpkfs on 15-2-4.
+ */
+var net = require('net')
+ , sys_cfg = require('./cfg.js')
+ , debug = require('debug')('noradle:DBPool')
+ , C = require('./constant.js')
+ , find = require('./util/util.js').find
+ , logRegular = false
+ , Request = require('./Request.js')
+ , util = require('util')
+ , cfgOverride = require('./util/util.js').override
+ , events = require("events")
+ ;
+
+var EMPTY = 0
+ , FREE = 1
+ , BUSY = 2
+ , FREEING = 3
+ , CLOSED = 4
+ , ERROR = 5
+ , QUITTING = 6
+ ;
+
+/**
+ * parse db names and session info from head of oraSock
+ * @param data Buffer
+ * @constructor
+ */
+function OraSockAttrSet(data){
+ var dbNamesLen = data.readInt32BE(32)
+ , dbNames = data.slice(36).toString().split('/')
+ ;
+
+ this.name = dbNames[0];
+ this.domain = dbNames[1];
+ this.uniqueName = dbNames[2];
+ this.role = dbNames[3];
+
+ this.oraSid = data.readInt32BE(4);
+ this.oraSerial = data.readInt32BE(8);
+ this.oraSpid = data.readInt32BE(12);
+ this.slotID = data.readInt32BE(16);
+ this.stime = Date.now();
+ this.lifeMin = data.readInt32BE(20);
+ this.reqCnt = data.readInt32BE(24);
+ this.instance = data.readInt32BE(28);
+}
+
+/** called when a slot first created on first arrival of OSP connection */
+function Slot(c, oraSockAttrSet, dbPool){
+ // properties below has statistics
+ this.hBytesRead = 0;
+ this.hBytesWritten = 0;
+ this.sockCount = 0;
+ this.reqCount = 0;
+ this.reqTimeAccum = 0; // ms
+ this.bindSock(c, oraSockAttrSet);
+ // properties below has value only when
+ this.bTime = undefined;
+ this.env = undefined;
+ this.response = undefined;
+ this.dbPool = dbPool;
+ this.readQuit = Slot.prototype.readQuit.bind(this);
+}
+/** called when OSP connect to DBPool */
+Slot.prototype.bindSock = function(oraSock, oraSockAttrSet){
+ this.oraSock = oraSock;
+ this.oraSockAttrSet = oraSockAttrSet;
+ this.slotID = oraSockAttrSet.slotID;
+ this.status = FREE;
+ logRegular && this.log();
+ logRegular && (debug((this.sockCount === 0) ? 'new slot and new oraSock' : 'reuse slot and new oraSock'));
+ this.sockCount++;
+};
+/** if got data on FREE status, it must be quit signal from OSP */
+Slot.prototype.readQuit = function(){
+ var slot = this;
+ if (slot.status === FREE) {
+ slot.quit();
+ slot.log();
+ debug(' got quitting signal on free sts!');
+ slot.oraSock.read();
+ } else {
+ slot.log();
+ console.error(new Date(), ' got quitting signal on not free sts!', slot.status, slot.oraSock.read());
+ }
+};
+/** mark slot busy */
+Slot.prototype.goBusy = function(env){
+ this.status = BUSY;
+ this.reqCount++;
+ this.env = env;
+ this.bTime = Date.now();
+ this.response = false;
+ this.overtime = false;
+ this.dbPool.busySet[this.slotID] = this;
+ this.oraSock.removeListener('readable', this.readQuit);
+ logRegular && this.log();
+ logRegular && debug('req#%d socket go busy', this.reqCount);
+};
+/** mark slot free, return back to dbPool's freeList */
+Slot.prototype.goFree = function(){
+ var slot = this
+ , slotID = this.slotID
+ , dbPool = this.dbPool
+ ;
+ this.reqTimeAccum += (Date.now() - this.bTime);
+ logRegular && slot.log();
+ logRegular && debug('req#%d socket go free', slot.reqCount);
+ delete dbPool.busySet[slotID];
+ // WHEN from BUSY to QUITTING
+ if (slot.status === QUITTING) {
+ slot.log();
+ debug(' got quitting signal tight after previous request!');
+ return;
+ }
+ logRegular && this.log();
+ logRegular && debug('req#%d socket go free', this.reqCount);
+ slot.status = FREE;
+ dbPool.freeList.unshift(slotID);
+ if (!dbPool.execQueuedCB()) {
+ slot.oraSock.on('readable', slot.readQuit);
+ }
+};
+Slot.prototype.quit = function(freeList){
+ var freeList = this.dbPool.freeList
+ , pos = freeList.indexOf(this.slotID)
+ ;
+ if (pos >= 0) {
+ freeList.splice(pos, 1);
+ }
+ this.status = QUITTING;
+};
+Slot.prototype.releaseSock = function(cause){
+ var slotID = this.slotID
+ , oraSock = this.oraSock
+ , dbPool = this.dbPool
+ ;
+ if (!oraSock) {
+ this.log();
+ debug(' socket release from pool repeatly');
+ return;
+ }
+ logRegular && this.log();
+ logRegular && debug(' socket release from pool');
+
+ this.hBytesRead += oraSock.bytesRead;
+ this.hBytesWritten += oraSock.bytesWritten;
+ oraSock.removeAllListeners();
+
+ switch (this.status) {
+ case FREE:
+ logRegular && debug(' release from freeList');
+ var freeList = dbPool.freeList;
+ freeList.splice(find(freeList, slotID), 1);
+ break;
+ case FREEING:
+ delete dbPool.busySet[slotID];
+ break;
+ case BUSY:
+ delete dbPool.busySet[slotID];
+ dbPool.waitTimeoutStats.busyEnd++;
+ this.log();
+ debug(' release from busyList', cause, this.status);
+ oraSock.emit('socket_released', Date.now() - this.bTime);
+ break;
+ case QUITTING:
+ this.log();
+ debug(' release from quitting', cause, this.status);
+ break;
+ default:
+ this.log();
+ debug('quit connection not in ether free/freeing/busy state!', cause, this.status);
+ }
+
+ this.oraSock = undefined;
+ this.status = CLOSED;
+ oraSock.end();
+};
+Slot.prototype.log = function(){
+ var o = this.oraSockAttrSet;
+ debug('\npool slot (#%d - %d:%d) of %d @%s.%s', o.slotID, o.oraSid, o.oraSerial, 0, o.name, o.domain);
+};
+
+function DBPool(port, cfg){
+ this.slots = [];
+ this.freeList = [];
+ this.busySet = {};
+ this.waitQueue = [];
+ this.waitTimeoutStats = {
+ conn : 0,
+ resp : 0,
+ fin : 0,
+ busyEnd : 0,
+ cancel : 0
+ };
+ if (port instanceof Array) {
+ // params: from DBPool.connect(address,amount,cfg)
+ this.address = arguments[0];
+ this.amount = arguments[1];
+ this.cfg = cfgOverride(sys_cfg, arguments[2] || {});
+ this.keep = true;
+ DBPool.pools[this.address.join(":")] = this;
+ } else {
+ this.port = port || 1522;
+ this.cfg = cfgOverride(sys_cfg, cfg || {});
+ this.listen();
+ this.checkInterval();
+ DBPool.pools[this.port] = this;
+ }
+}
+
+DBPool.pools = {};
+
+DBPool.prototype.listen = function(){
+ var port = this.port
+ , me = this
+ ;
+ var dbListener = net.createServer({allowHalfOpen : true}, function(c){
+ me.onConnectForDBPool(c);
+ });
+
+ dbListener.listen(port, function(){
+ debug('NodeJS server is listening for oracle connection at port ' + port);
+ });
+};
+
+DBPool.connect = function(address, amount, cfg){
+ debug('DBPool.connect', address, amount, cfg);
+ amount = amount || 1;
+ var dbPool = new DBPool(address, amount, cfg)
+ , actualAmount = 0
+ ;
+
+ for (var i = 0; i < amount; i++) {
+ connect(i);
+ }
+
+ return dbPool;
+
+ function connect(i){
+ var cliSock
+ , retryDelay = 0
+ ;
+
+ one();
+
+ function one(){
+ if (i >= dbPool.amount) return;
+ debug('do connect %s', i);
+ cliSock = net.connect.apply(net, address);
+ cliSock.on('connect', onConnect);
+ cliSock.on('error', onEndError);
+
+ function onConnect(){
+ debug('connected to %s', i);
+ cliSock.removeListener('error', onEndError);
+ cliSock.on('end', onEndError);
+ dbPool.onConnectForDBPool(cliSock, function(){
+ retryDelay = 0;
+ });
+ actualAmount++;
+ }
+
+ function onEndError(err){
+ if (err) {
+ debug('got connect error to %s, %s', i, retryDelay, err);
+ } else {
+ debug('got disconnected to %s', i, retryDelay);
+ actualAmount--;
+ }
+ if (!dbPool.keep) {
+ return;
+ }
+ if (retryDelay) {
+ setTimeout(one, retryDelay * 1000);
+ if (retryDelay < 64) {
+ retryDelay *= 2;
+ }
+ } else {
+ one();
+ retryDelay = 1;
+ }
+ }
+ }
+ }
+};
+
+DBPool.prototype.disconnect = function(){
+ this.keep = false;
+ this.slots.forEach(function(slot){
+ slot.oraSock.end();
+ });
+};
+
+DBPool.prototype.onConnectForDBPool = function(c, cb){
+ var me = this
+ , slots = me.slots
+ , freeList = me.freeList
+ , cfg = me.cfg
+ ;
+ {
+ var slot, slotID, oraSockAttrSet;
+
+ (function(){
+ var head, chunks = [];
+ c.on('readable', onHandshake);
+
+ function onHandshake(){
+ var data = c.read();
+
+ if (data === null) {
+ debug('null data on hand-shake found');
+ return;
+ }
+
+ if (!chunks.length) {
+ try {
+ var ptoken = data.readInt32BE(0);
+ } catch (e) {
+ ptoken = -1;
+ }
+ if (ptoken !== 197610261) {
+ if (ptoken === 197610262) {
+ // not free oracle process
+ debug('no free oracle connection available');
+ //c.end();
+ //c.destroy();
+ } else {
+ console.warn('none oracle connection attempt found', data);
+ c.end();
+ c.destroy();
+ }
+ return;
+ }
+ }
+
+ while (data.length < 7) {
+ if (chunks.length === 0) break;
+ data = Buffer.concat([chunks.pop(), data]);
+ }
+ chunks.push(data);
+
+ if (data.slice(-7).toString('ascii') !== '/080526') {
+ debug('partial oracle connect data', data, data.slice(36), data.slice(36).toString());
+ return;
+ }
+ head = Buffer.concat(chunks);
+ c.removeListener('readable', onHandshake);
+
+ oraSockAttrSet = new OraSockAttrSet(head);
+ debug(oraSockAttrSet);
+ slotID = oraSockAttrSet.slotID;
+ init();
+ cb && cb();
+ }
+ })();
+
+ function init(){
+ slot = slots[slotID];
+ if (slot) {
+ if (slot.oraSock) {
+ // if broken connection is still holding and in use, release it for replacement of new connection
+ debug(' slot replacement with new socket');
+ slot.releaseSock('override');
+ // slot.oraSock.destroy();
+ }
+ slot.bindSock(c, oraSockAttrSet);
+ } else {
+ slot = slots[slotID] = new Slot(c, oraSockAttrSet, me);
+ }
+
+ freeList.push(slotID);
+ if (!me.execQueuedCB()) {
+ slot.oraSock.on('readable', slot.readQuit);
+ }
+
+ c.on('end', function(){
+ if (slot.oraSock !== c) {
+ slot.log();
+ debug(' socket fin received but slot.oraSock is not the same one');
+ return;
+ }
+ logRegular && slot.log();
+ logRegular && debug(' socket fin received');
+ slot.releaseSock('on_end');
+ });
+ c.on('error', function(err){
+ slot.log();
+ debug(' socket error', err);
+ slot.releaseSock('on_error');
+ });
+ if (cfg.oracle_keep_alive) {
+ c.setKeepAlive(true, 1000 * cfg.oracle_keep_alive);
+ } else {
+ c.setKeepAlive(false);
+ }
+ }
+ }
+}
+
+function Interrupter(dbPool, env, dbSelector, cb){
+ events.EventEmitter.call(this);
+ this.dbPool = dbPool;
+ this.env = env;
+ this.cb = cb;
+ this.aborted = false;
+ this.overtime = false;
+ this.sTime = Date.now();
+}
+util.inherits(Interrupter, events.EventEmitter);
+Interrupter.prototype.abort = function(){
+ debug(this.env, 'aborted, catched by db.js');
+ this.aborted = true;
+ var waitQueue = this.dbPool.waitQueue;
+ var index = find(waitQueue, this);
+ if (index >= 0) {
+ debug(this.env, 'interrupted when waiting');
+ waitQueue.splice(index, 1);
+ }
+};
+
+/** got a request object to send request and receive response
+ dbPool.findFree(env, dbSelector, function(err, request) {
+ request.init(PROTOCOL, hprof);
+ request.addHeaders(...);
+ request.end(function(response){
+ response.status;
+ response.headers;
+ response.on('data', function(data){...});
+ response.on('end', function(){...});
+ });
+ });
+ */
+DBPool.prototype.findFree = function(env, dbSelector, cb, interrupter){
+ var freeList = this.freeList
+ , busySet = this.busySet
+ , waitQueue = this.waitQueue
+ ;
+ if (interrupter) {
+ // in the case of called from later queue
+ if (interrupter.aborted) {
+ cb(new Error('request aborted'));
+ return;
+ }
+ if (interrupter.overtime) {
+ cb(new Error('request wait db connection timeout'));
+ return;
+ }
+ } else {
+ interrupter = new Interrupter(this, env, dbSelector, cb);
+ }
+ if (freeList.length > 0) {
+ var slotID = freeList.shift()
+ , slot = this.slots[slotID]
+ , oraSock = slot.oraSock
+ , req = new Request(oraSock, env)
+ ;
+ slot.goBusy(env);
+ req.on('response', function(res){
+ slot.response = true;
+ });
+ cb(null, req);
+
+ req.on('fin', function(){
+ if (req.quitting) {
+ slot.status = QUITTING;
+ }
+ slot.goFree();
+ });
+
+ req.on('error', function(){
+ if (slotID in busySet) {
+ delete busySet[slotID];
+ slot.status = ERROR;
+ } else {
+ console.warn('None busy oraSock is used in db.reportProtocolError !');
+ }
+ slot.goFree();
+ });
+ } else {
+ waitQueue.push(interrupter);
+ logRegular && debug('later push', waitQueue.length);
+ }
+ return interrupter;
+};
+
+DBPool.prototype.execQueuedCB = function(){
+ var waitQueue = this.waitQueue
+ ;
+ while (true) {
+ var w = waitQueue.shift();
+ if (!w) {
+ return false;
+ }
+ if (w.aborted) {
+ debug(w.env, 'abort in later queue');
+ ;
+ continue;
+ }
+ debug('executing a wait queue item', waitQueue.length);
+ this.findFree(w.env, w.dbSelector, w.cb, w);
+ return true;
+ }
+}
+
+// database connection pool monitor
+DBPool.prototype.checkInterval = function(){
+ var dbPool = this
+ , cfg = dbPool.cfg
+ , waitTimeoutStats = this.waitTimeoutStats
+ , slots = this.slots
+ , busySet = this.busySet
+ , waitQueue = this.waitQueue
+ ;
+ setInterval(function(){
+ var now = Date.now()
+ ;
+ //check for long running busy oraSocks, and emit LongRun event for killing, alerting, and etc ...
+ for (var slotID in busySet) {
+ var slot = slots[slotID]
+ , oraSock = slot.oraSock
+ ;
+ if (slot.overtime === false && now - slot.bTime > cfg.ExecTimeout) {
+ if (slot.response) {
+ // todo: find too long executions that has header returned, timeout it
+ // it may use chunked transfer
+ } else {
+ waitTimeoutStats.resp++;
+ slot.overtime = true;
+ slot.log();
+ debug('response_timeout by interval checker', now - slot.bTime);
+ // todo: execute longer than 3s, may do alert, and kill the oracle session
+ }
+ }
+ }
+
+ // check if task wait too long, yes to call timeout callback and remove from wait queue
+ // low index item is waiting longer
+ for (var i = waitQueue.length - 1; i >= 0; i--) {
+ var w = waitQueue[i]
+ ;
+ if (w.overtime === false && now - w.sTime > cfg.FreeConnTimeout) {
+ waitTimeoutStats.conn++;
+ w.overtime = true;
+ // later.splice(i, 1);
+ debug('wait free oraSock timeout by interval checker', now - w.sTime);
+ }
+ }
+ }, cfg.DBPoolCheckInterval);
+};
+
+DBPool.getFirstPool = function(){
+ return DBPool.pools[Object.keys(DBPool.pools)[0] || 0];
+};
+
+exports.DBPool = DBPool;
+
+/*
+ * todo:
+ */
View
20 lib/handlerHTTP.js
@@ -3,6 +3,7 @@ var sysCfg = require('./cfg.js')
, ReqBase = require('./ReqBase.js')
, urlParse = require('url').parse
, urlEncoded = require('./util/urlEncoded.js')
+ , jsonParse = require('./util/jsonParse.js')
, mergeHeaders = require('./util/util.js').mergeHeaders
, mergeAdd = require('./util/util.js').mergeAdd
, override = require('./util/util.js').override
@@ -13,7 +14,6 @@ var sysCfg = require('./cfg.js')
, curFeedbackSeq = 0
, fbBuffer = {}
, cssBuffer = {}
- , upload = require('./middleware/upload.js')
, zip = require('./middleware/zip.js')
, chooseZip = zip.chooseZip
, zipFilter = zip.zipFilter
@@ -54,7 +54,9 @@ function parseCookie(req){
module.exports = function(dbPool, ReqBaseC, customCfg){
- var cfg = override(sysCfg, customCfg || {});
+ var cfg = override(sysCfg, customCfg || {})
+ , upload = require('./middleware/upload.js')(cfg)
+ ;
return function(req, res, next){
next = next || noop;
@@ -294,6 +296,20 @@ module.exports = function(dbPool, ReqBaseC, customCfg){
});
})();
break;
+ case 'application/json' :
+ req.setEncoding('utf8');
+ (function(){
+ var bdy = '';
+ req.on('data', function(chunk){
+ bdy += chunk;
+ });
+ req.on('end', function(){
+ oraReq.addHeaders(jsonParse(bdy), '');
+ console.log('oraReq._buf', oraReq._buf);
+ oraReq.end(onResponse);
+ });
+ })();
+ break;
case 'multipart/form-data' :
upload(req, oraReq, onResponse, next);// todo:
break;
View
182 lib/inHub.js
@@ -0,0 +1,182 @@
+/**
+ * Created with JetBrains WebStorm.
+ * User: kaven276
+ * Date: 13-5-16
+ * Time: 下午2:47
+ */
+
+var net = require('net')
+ , C = require('./constant.js')
+ , debug = require('debug')('noradle:inhub')
+ , gseq = 0
+ , marker = (new Buffer(4)).writeInt32BE(197610262, 0)
+ ;
+
+// for accept front end request
+// may accept from different front nodejs connection request
+exports.server4node = net.createServer({allowHalfOpen : true}, function(c){
+ var seq = ++gseq;
+ debug('node(%d) connected', seq);
+
+ var slotID = freeList.shift()
+ ;
+ if (!slotID) {
+ // no available free slot, disconnect client
+ c.end(marker);
+ c.destroy();
+ debug('node(%d) connected but no free', seq);
+ return;
+ }
+
+ var slot = oraPool[slotID]
+ ;
+ slot.oraSock.removeAllListeners('readable');
+ c.write(slot.head);
+ c.pipe(slot.oraSock, {end : false}).pipe(c, {end : true});
+ debug('node(%d) use free slot %s, %j', seq, slotID, freeList);
+
+ c.on('end', function(){
+ debug('node(%d) disconnected', seq);
+ slot.oraSock.unpipe(c);
+ // c.end();
+ if (!oraPool[slotID]) {
+ // not in oraPool, so oracle is disconnected, and not connected again yet
+ debug('node(%d) oraPool(%d) no data, %j', seq, slotID, freeList);
+ } else if (~freeList.indexOf(slotID)) {
+ // in freeList, it must be another new slot with same slotID
+ debug('node(%d) oraPool(%d) in freelist, %j', seq, slotID, freeList);
+ } else {
+ freeList.unshift(slotID);
+ debug('node(%d) return back slot %s, %j', seq, slotID, freeList);
+ }
+ });
+
+ c.on('error', function(err){
+ console.error(err, slotID);
+ })
+});
+
+
+var oraPool = new Array(1000)
+ , freeList = []
+ , gConnSeq = 0
+ ;
+// for oracle reverse connection
+// check magic number only
+exports.server4oracle = net.createServer(function(c){
+ var slotID, connSeq = ++gConnSeq;
+ debug('oracle(%d) connected', connSeq);
+
+ c.on('readable', onHandshake);
+
+ var head, chunks = [], chunkSeq = 0;
+
+ function onHandshake(){
+ var data = c.read();
+ chunkSeq++;
+
+ if (data === null) {
+ debug('onHandshake(%d,%d): data === null', connSeq, chunkSeq);
+ return;
+ } else {
+ chunks.push(data);
+ }
+
+ if (chunks.length === 1) {
+ try {
+ var ptoken = data.readInt32BE(0);
+ } catch (e) {
+ ptoken = -1;
+ }
+ if (ptoken !== 197610261) {
+ console.warn('none oracle connection attempt found', data);
+ c.end();
+ c.destroy();
+ return;
+ }
+ }
+
+ if (data.length < 7) {
+ debug('onHandshake(%d,%d): data.length < 7', connSeq, chunkSeq);
+ debug(chunks, data);
+ return;
+ }
+ if (data.slice(-7).toString('ascii') !== '/080526') {
+ debug('onHandshake(%d,%d): data.slice(-7).toString("ascii"") !== "/080526"', connSeq, chunkSeq);
+ debug('partial oracle connect data', data, data.slice(36), data.slice(36).toString());
+ return;
+ }
+
+ head = Buffer.concat(chunks);
+ c.removeListener('readable', onHandshake);
+
+ slotID = head.readInt32BE(16);
+ oraPool[slotID] = {oraSock : c, head : head};
+ freeList.push(slotID);
+ debug('oracle(%s,%s) slot add, freeList=%j', connSeq, slotID, freeList);
+
+ c.on('readable', readQuit);
+ }
+
+ function readQuit(){
+ // oraSock must be in freeList
+ debug('oracle(%d,%d) got quit signal', slotID, connSeq);
+ c.read(4);
+ }
+
+ c.on('end', function(){
+ debug('oracle(%d,%d) disconnected', slotID, connSeq);
+ // find free list and remove from free list
+ delete oraPool[slotID];
+ var pos = freeList.indexOf(slotID);
+ if (pos >= 0) {
+ freeList.splice(pos, 1);
+ debug('oracle(%d,%d) slot removed, freeList=%j', connSeq, slotID, freeList);
+ } else {
+ debug('oracle(%d,%d) slot not in freeList.freeList(%j)', connSeq, slotID, freeList);
+ // 应该将对应的 node socket 发送 tcp fin
+ // 而且可能马上就会有新的 oracle 连接建立,取代原先的连接
+ //
+ }
+ });
+});
+
+exports.listenOracle = function(port){
+ pool.listen(port, function(){
+ debug('listening to oracle at port:%d', port);
+ });
+};
+
+exports.listenClient = function(port){
+ server.listen(port, function(){
+ debug('listening to client at port:%d ', port);
+ });
+};
+
+exports.start = function(oraclePort, nodePorts){
+ exports.server4oracle.listen(oraclePort, function(){
+ debug('listening to oracle at port:%d', oraclePort);
+ });
+ nodePorts.forEach(function(nodePort){
+ exports.server4node.listen(nodePort, function(){
+ debug('listening to node at port:%d', nodePort);
+ });
+ });
+};
+
+exports.startShell = function(){
+ var oraclePort = process.argv[2]
+ , nodePorts = process.argv.slice(3)
+ ;
+ if (nodePorts.length === 0) {
+ console.error('usage: inHub.sh oraclePort nodePorts...');
+ return;
+ }
+ exports.start(oraclePort, nodePorts);
+};
+
+// for direct script file execution
+(function(){
+ if ((process.argv[1] !== __filename)) return;
+ require(require('path').join(__dirname, '..')).inHub.startShell();
+})();
View
194 lib/middleware/upload.js
@@ -9,112 +9,120 @@ var CRLF = '\r\n'
try {
var formidable = require('formidable');
- var uploadTrim = cfg.upload_dir.length;
} catch (e) {
console.warn('\n[WARN] Can not find/load "formidable" module, so multipart/form-data file upload is not supported !');
console.info('You can run "npm -g install formidable" to install formidable nodeJS module.\n');
}
-module.exports = function(req, oraReq, onResponse, next){
+module.exports = function(cfg){
- if (!formidable) {
- oraReq.end(onResponse);
- // todo: cleanup and release resource
- next(new Error(req.url + ' has file upload that is not support without formidable module'));
- return;
+ if (cfg.upload_dir.substr(-1) !== '/') {
+ cfg.upload_dir += '/';
}
- var form = new formidable.IncomingForm();
- var fields = {};
- var dirs = {};
- form.uploadDir = cfg.upload_dir;
- form.keepExtensions = true;
- form.on('field', function(field, value){
- if (field.substr(0, 1) === '_') {
- dirs[field.substr(1)] = value;
- } else {
- if (fields[field]) fields[field].push(value);
- else fields[field] = [value];
+ var uploadTrim = cfg.upload_dir.length;
+
+ return function(req, oraReq, onResponse, next){
+
+ if (!formidable) {
+ oraReq.end(onResponse);
+ // todo: cleanup and release resource
+ next(new Error(req.url + ' has file upload that is not support without formidable module'));
+ return;
}
- })
- .on('fileBegin', function(field, file){
- var rpath;
- if (file.name === '') {
- file.path = cfg.upload_dir + 'null';
- return;
- }
- if (dirs[field]) {
- switch (dirs[field].substr(-1)) {
- case '/':
- rpath = dirs[field] + file.name;
- break;
- case '.':
- rpath = dirs[field] + file.name.split('.').pop();
- break;
- default:
- rpath = dirs[field];
- }
+ var form = new formidable.IncomingForm();
+ var fields = {};
+ var dirs = {};
+ form.uploadDir = cfg.upload_dir;
+ form.keepExtensions = true;
+
+ form.on('field', function(field, value){
+ if (field.substr(0, 1) === '_') {
+ dirs[field.substr(1)] = value;
} else {
- rpath = file.path.split('/').pop();
- switch (cfg.upload_depth || 2) {
- case 1:
- break;
- case 2:
- rpath = rpath.substr(0, 16) + '/' + rpath.substr(16);
- break;
- case 3:
- rpath = rpath.substr(0, 10) + '/' + rpath.substr(10, 10) + '/' + rpath.substr(20);
- break;
- case 4:
- rpath = rpath.substr(0, 8) + '/' + rpath.substr(8, 8) + '/' + rpath.substr(16, 8) + '/' + rpath.substr(24);
- break;
- default:
- rpath = rpath.substr(0, 16) + '/' + rpath.substr(16);
- }
- rpath = 'auto/' + rpath;
+ if (fields[field]) fields[field].push(value);
+ else fields[field] = [value];
}
- util.ensureDir(rpath);
- file.path = cfg.upload_dir + rpath;
})
- .on('file', function(field, file){
- var value = file.path.substr(uploadTrim);
- if (value === 'null') {
- value = '';
- }
- if (fields[field]) {
- fields[field].push(value);
- fields[field + '.size'].push(file.size);
- } else {
- fields[field] = [value];
- fields[field + '.size'] = [file.size];
- }
- // strip <script>...</script> for html
- if (!file.name || file.size === 0) return;
- var ext = file.name.split('.').pop();
- if (!ext && file.mime() !== 'text/html') return;
- if (ext.match(/(html|htm)/ || file.mime() === 'text/html')) {
- // console.warn('html file "%s" upload to "%s", it may contain harmful script', file.name, file.path);
- fs.readFile(file.path, 'UTF8', function(err, data){
- if (err) {
- console.error('Can not strip script tag in html file "%s" !', file.path);
- return;
+ .on('fileBegin', function(field, file){
+ var rpath;
+ if (file.name === '') {
+ file.path = cfg.upload_dir + 'null';
+ return;
+ }
+ if (dirs[field]) {
+ switch (dirs[field].substr(-1)) {
+ case '/':
+ rpath = dirs[field] + file.name;
+ break;
+ case '.':
+ rpath = dirs[field] + file.name.split('.').pop();
+ break;
+ default:
+ rpath = dirs[field];
+ }
+ } else {
+ rpath = file.path.split('/').pop();
+ switch (cfg.upload_depth || 2) {
+ case 1:
+ break;
+ case 2:
+ rpath = rpath.substr(0, 16) + '/' + rpath.substr(16);
+ break;
+ case 3:
+ rpath = rpath.substr(0, 10) + '/' + rpath.substr(10, 10) + '/' + rpath.substr(20);
+ break;
+ case 4:
+ rpath = rpath.substr(0, 8) + '/' + rpath.substr(8, 8) + '/' + rpath.substr(16, 8) + '/' + rpath.substr(24);
+ break;
+ default:
+ rpath = rpath.substr(0, 16) + '/' + rpath.substr(16);
}
- data = data.replace(/<\s*script[^<>]+>(.|\n|\r)*?<\/\s*script\s*>/gim,
- "<em>The script tag and it's content has been striped for security reason at the time of file upload!</em>");
- fs.writeFile(file.path, data, function(err){
- if (err) console.warn('html upload file "%s" can not be striped of script tag !');
+ rpath = 'auto/' + rpath;
+ }
+ util.ensureDir(cfg.upload_dir + rpath);
+ file.path = cfg.upload_dir + rpath;
+ })
+ .on('file', function(field, file){
+ var value = file.path.substr(uploadTrim);
+ if (value === 'null') {
+ value = '';
+ }
+ if (fields[field]) {
+ fields[field].push(value);
+ fields[field + '.size'].push(file.size);
+ } else {
+ fields[field] = [value];
+ fields[field + '.size'] = [file.size];
+ }
+ // strip <script>...</script> for html
+ if (!file.name || file.size === 0) return;
+ var ext = file.name.split('.').pop();
+ if (!ext && file.mime() !== 'text/html') return;
+ if (ext.match(/(html|htm)/ || file.mime() === 'text/html')) {
+ // console.warn('html file "%s" upload to "%s", it may contain harmful script', file.name, file.path);
+ fs.readFile(file.path, 'UTF8', function(err, data){
+ if (err) {
+ console.error('Can not strip script tag in html file "%s" !', file.path);
+ return;
+ }
+ data = data.replace(/<\s*script[^<>]+>(.|\n|\r)*?<\/\s*script\s*>/gim,
+ "<em>The script tag and it's content has been striped for security reason at the time of file upload!</em>");
+ fs.writeFile(file.path, data, function(err){
+ if (err) console.warn('html upload file "%s" can not be striped of script tag !');
+ });
});
- });
- }
- })
- .on('end', function(){
- oraReq.addHeaders(fields, '');
- oraReq.end(onResponse);
+ }
+ })
+ .on('end', function(){
+ oraReq.addHeaders(fields, '');
+ oraReq.end(onResponse);
- })
- .on('error', function(e){
- // todo: clean up and release resource
- next(e);
- })
- .parse(req);
+ })
+ .on('error', function(e){
+ // todo: clean up and release resource
+ next(e);
+ })
+ .parse(req);
+ };
};
View
18 lib/poolMonitor.js
@@ -1,4 +1,4 @@
-var db = require('./DBPool.js')
+var db = require('./DBClient.js')
, DBPool = db.DBPool
, urlParse = require('url').parse
;
@@ -15,10 +15,12 @@ exports.stat = stat;
exports.showStatus = function(req, res){
var url = urlParse(req.url)
;
- try {
- var dbPool = DBPool.pools[parseInt(url.query.port)]
- } catch (e) {
- ;
+ if (url.query && url.query.port) {
+ try {
+ var dbPool = DBPool.pools[parseInt(url.query.port)]
+ } catch (e) {
+ ;
+ }
}
if (!dbPool) {
dbPool = DBPool.getFirstPool();
@@ -38,7 +40,11 @@ exports.showStatus = function(req, res){
w('Server started at ' + startTime);
w('');
- w('DBPool listen at ' + dbPool.port);
+ if (dbPool.port) {
+ w('DBPool listen at ' + dbPool.port);
+ } else {
+ w('DBPool connect at ' + dbPool.address);
+ }
w('');
w('[Memory Using]');
View
28 lib/util/jsonParse.js
@@ -0,0 +1,28 @@
+/**
+ * Created by cuccpkfs on 15-3-9.
+ */
+
+var _ = require('underscore');
+
+module.exports = function(text){
+ var data = JSON.parse(text)
+ , obj = {}
+ ;
+ if (data instanceof Array) {
+ var arr = data
+ , keys = Object.keys(arr[0])
+ ;
+ for (var i = 0, len = keys.length; i < len; i++) {
+ var key = keys[i];
+ obj[key] = _.map(_.pluck(arr, key), encodeURIComponent);
+ }
+ } else {
+ // object
+ var keys = Object.keys(data);
+ for (var i = 0, len = keys.length; i < len; i++) {
+ var key = keys[i];
+ obj[key] = encodeURIComponent(data[key]);
+ }
+ }
+ return obj;
+};
View
40 oracle/psp/gateway.bdy
@@ -27,6 +27,7 @@ create or replace package body gateway is
v_last_time date;
v_count pls_integer;
v_sts number;
+ v_open boolean;
v_quitting boolean := false;
v_reconnect boolean := false;
@@ -38,7 +39,10 @@ create or replace package body gateway is
-- private
procedure close_conn is
begin
- utl_tcp.close_connection(pv.c);
+ if v_open then
+ v_open := false;
+ utl_tcp.close_connection(pv.c);
+ end if;
exception
when utl_tcp.network_error then
null;
@@ -63,6 +67,7 @@ create or replace package body gateway is
return utl_raw.cast_from_binary_integer(i);
end;
begin
+ dbms_application_info.set_module('utl_tcp', 'open_connection');
c := utl_tcp.open_connection(remote_host => v_cfg.gw_host,
remote_port => v_cfg.gw_port,
charset => null,
@@ -89,6 +94,7 @@ create or replace package body gateway is
pi2r(v_inst),
pi2r(lengthb(v_all))));
pv.wlen := utl_tcp.write_text(pv.c, v_all);
+ v_open := true;
end;
function get_alert_quit return boolean is
@@ -99,6 +105,7 @@ create or replace package body gateway is
procedure quit is
begin
+ k_debug.trace(st('call quit', v_trc, 'raise_application_error'), 'keep_conn');
raise_application_error(-20526, '');
end;
@@ -130,18 +137,28 @@ create or replace package body gateway is
<<make_connection>>
begin
close_conn;
+ if v_quitting then
+ k_debug.trace(st('v_quitting=true', v_trc, 'quit indirect in conn loop'), 'keep_conn');
+ quit;
+ end if;
make_conn(pv.c, 1);
+ k_debug.trace(st('conn:utl_tcp.connect ok', v_trc), 'keep_conn');
v_last_time := sysdate;
exception
when utl_tcp.network_error then
if get_alert_quit then
+ k_debug.trace(st('conn:utl_tcp.network_error', v_trc, 'quit direct in conn loop'), 'keep_conn');
quit; -- prevent endless connect fail&retry, allow quit
end if;
- dbms_lock.sleep(3);
+ if sysdate > v_svr_stime + v_cfg.max_lifetime then
+ quit;
+ end if;
+ pv.c := null;
goto make_connection;
end;
v_quitting := false;
+ dbms_application_info.set_module('utl_tcp', 'get_line');
loop
-- accept arrival of new request
@@ -153,13 +170,16 @@ create or replace package body gateway is
-- check if stop singal arrived
-- after previous process and wait timeout
if v_quitting then
- -- quit immediately
+ -- close connected connection and quit process
+ close_conn;
+ k_debug.trace(st('v_quitting=true', v_trc, 'quit in read loop'), 'keep_conn');
quit;
elsif sysdate > v_svr_stime + v_cfg.max_lifetime or v_svr_req_cnt >= v_cfg.max_requests or get_alert_quit then
-- signal quit, but allow this loop of read request
pv.wlen := utl_tcp.write_raw(pv.c, utl_raw.cast_from_binary_integer(-1));
utl_tcp.flush(pv.c);
v_quitting := true;
+ k_debug.trace(st('>max_lifetime', v_trc, 'got quit'), 'keep_conn');
end if;
begin
@@ -168,28 +188,33 @@ create or replace package body gateway is
v_reconnect := false;
exception
when utl_tcp.transfer_timeout then
+ if v_quitting then
+ goto read_request;
+ end if;
k_cfg.server_control(v_cfg);
-- if target node or NATs suddenly abort, like lost of power
-- they will not send fin to socket
-- when they restart, ogw will not know and wait silly forever
-- so timeout and reconnect design is needed
if (sysdate - v_last_time) * 24 * 60 * 60 > v_cfg.idle_timeout then
- k_debug.trace(st('utl_tcp.transfer_timeout', v_trc, 'reconnect'), 'keep_conn');
+ k_debug.trace(st('read:utl_tcp.transfer_timeout', v_trc, 'reconnect'), 'keep_conn');
if v_reconnect then
+ v_reconnect := false;
goto make_connection;
end if;
+ -- may received by free oraSock or a sending in-the-way oraSock
pv.wlen := utl_tcp.write_raw(pv.c, utl_raw.cast_from_binary_integer(-1));
utl_tcp.flush(pv.c);
v_reconnect := true;
end if;
goto read_request;
when utl_tcp.end_of_input then
- k_debug.trace(st('utl_tcp.end_of_input', v_trc), 'keep_conn');
+ k_debug.trace(st('read:utl_tcp.end_of_input', v_trc), 'keep_conn');
-- not sleep will cause reconnect raise ORA-29260 TNS no listener
dbms_lock.sleep(1);
goto make_connection;
when utl_tcp.network_error then
- k_debug.trace(st('utl_tcp.network_error', v_trc), 'keep_conn');
+ k_debug.trace(st('read:utl_tcp.network_error', v_trc), 'keep_conn');
goto make_connection;
end;
@@ -271,7 +296,7 @@ create or replace package body gateway is
output.finish;
utl_tcp.flush(pv.c);
- dbms_application_info.set_module('free', null);
+ dbms_application_info.set_module('utl_tcp', 'get_line');
dbms_session.set_identifier(v_clinfo);
dbms_session.clear_context('SERVER_PROCESS', v_clinfo);
@@ -285,6 +310,7 @@ create or replace package body gateway is
exception
when others then
+ k_debug.trace(st('quit at end', v_trc), 'keep_conn');
dbms_application_info.set_module('killed', 'server quit');
utl_tcp.close_all_connections;
if v_sts = 0 then
View
12 oracle/psp/kill.prc
@@ -13,9 +13,15 @@ begin
else
v_clinfo := 'Noradle-' || cfg || '#' || slot;
end if;
- for i in (select a.client_info from v$session a where a.client_info like v_clinfo) loop
- dbms_pipe.pack_message('SIGKILL');
- v_return := dbms_pipe.send_message(i.client_info);
+ for i in (select a.client_info, a.module, a.action, a.sid, a.serial#
+ from v$session a
+ where a.client_info like v_clinfo) loop
+ if i.module = 'utl_tcp' and i.action = 'open_connection' then
+ sys.pw.kill_session(i.sid, i.serial#);
+ else
+ dbms_pipe.pack_message('SIGKILL');
+ v_return := dbms_pipe.send_message(i.client_info);
+ end if;
end loop;
end kill;
/
View
5 oracle/psp/output.bdy
@@ -224,8 +224,9 @@ create or replace package body output is
end if;
end if;
goto print_http_headers;
- elsif pv.feedback or (pv.protocol = 'HTTP' and pv.feedback is null and r.type = 'c' and pv.status_code = 200) then
- -- have content, but have feedback indication or _c sts=200
+ elsif pv.feedback or (pv.protocol = 'HTTP' and pv.feedback is null and r.type = 'c' and pv.status_code = 200 and
+ pv.headers('Content-Type') like 'text/html;%' and r.header('x-requested-with') is null) then
+ -- have content, but have feedback indication or _c sts=200, not XMLHttpRequest
declare
v varchar2(4000);
nl varchar2(2) := chr(13) || chr(10);
View
22 oracle/psp/r.bdy
@@ -165,7 +165,7 @@ create or replace package body r is
-- read post from application/x-www-form-urlencoded or multipart/form-data or other mime types
if pv.protocol = 'HTTP' and v_method = 'POST' then
if header('content-type') like 'application/x-www-form-urlencoded%' or
- header('content-type') like 'multipart/form-data%' then
+ header('content-type') like 'application/json%' or header('content-type') like 'multipart/form-data%' then
null; -- form key-value pairs already got
else
if not is_null('y$instream') then
@@ -490,11 +490,11 @@ create or replace package body r is
) return varchar2 is
begin
if name not like '_$%' then
- return utl_url.unescape(ra.params(name) (idx), pv.cs_req);
+ return nvl(utl_url.unescape(ra.params(name) (idx), pv.cs_req), defval);
elsif substrb(name, 1, 1) = 's' then
- return utl_url.unescape(ra.params(name) (idx), 'AL32UTF8');
+ return nvl(utl_url.unescape(ra.params(name) (idx), 'AL32UTF8'), defval);
else
- return ra.params(name)(idx);
+ return nvl(ra.params(name) (idx), defval);
end if;
exception
when no_data_found then
@@ -509,11 +509,11 @@ create or replace package body r is
) return nvarchar2 is
begin
if name not like '_$%' then
- return utl_url.unescape(to_nchar(ra.params(name) (idx)), pv.cs_req);
+ return nvl(utl_url.unescape(to_nchar(ra.params(name) (idx)), pv.cs_req), defval);
elsif substrb(name, 1, 1) = 's' then
- return utl_url.unescape(to_nchar(ra.params(name) (idx)), 'AL32UTF8');
+ return nvl(utl_url.unescape(to_nchar(ra.params(name) (idx)), 'AL32UTF8'), defval);
else
- return ra.params(name)(idx);
+ return nvl(ra.params(name) (idx), defval);
end if;
exception
when no_data_found then
@@ -529,9 +529,9 @@ create or replace package body r is
) return number is
begin
if format is null then
- return to_number(ra.params(name) (idx));
+ return nvl(to_number(ra.params(name) (idx)), defval);
else
- return to_number(ra.params(name) (idx), format);
+ return nvl(to_number(ra.params(name) (idx), format), defval);
end if;
exception
when no_data_found then
@@ -547,9 +547,9 @@ create or replace package body r is
) return date is
begin
if format is null then
- return to_date(utl_url.unescape(ra.params(name) (idx)), gc_date_fmt);
+ return nvl(to_date(utl_url.unescape(ra.params(name) (idx)), gc_date_fmt), defval);
else
- return to_date(utl_url.unescape(ra.params(name) (idx)), format);
+ return nvl(to_date(utl_url.unescape(ra.params(name) (idx)), format), defval);
end if;
exception
when no_data_found then
View
4 oracle/psp/rs.bdy
@@ -9,10 +9,10 @@ create or replace package body rs is
descrec dbms_sql.desc_rec;
desctab dbms_sql.desc_tab;
colcnt number;
- namevar varchar2(50);
+ namevar varchar2(4000);
numvar number;
datevar date;
- vsize number := 50;
+ vsize number := 4000;
sep varchar2(2);
lsep varchar2(2) := chr(30) || chr(10);
csep varchar2(2) := chr(31) || ',';
View
15 oracle/pw.pck
@@ -6,6 +6,12 @@ create or replace package sys.pw is
procedure recompile(p_sql varchar2);
+ procedure kill_session
+ (
+ p_sid pls_integer,
+ p_serial pls_integer
+ );
+
end pw;
/
create or replace package body sys.pw is
@@ -30,5 +36,14 @@ create or replace package body sys.pw is
execute immediate p_sql;
end;
+ procedure kill_session
+ (
+ p_sid pls_integer,
+ p_serial pls_integer
+ ) is
+ begin
+ execute immediate 'alter system kill session ''' || p_sid || ',' || p_serial || ''' immediate';
+ end;
+
end pw;
/
View
52 package.json
@@ -1,8 +1,8 @@
{
- "author" : "Li Yong <kaven276@vip.sina.com>",
- "name" : "noradle",
- "description" : "A NodeJS and Oracle DB integration, NodeJS act as http gateway for plsql server pages",
- "keywords" : [
+ "author": "Li Yong <kaven276@vip.sina.com>",
+ "name": "noradle",
+ "description": "A NodeJS and Oracle DB integration, NodeJS act as http gateway for plsql server pages",
+ "keywords": [
"oracle",
"sql",
"plsql",
@@ -14,32 +14,34 @@
"ndbc",
"pool"
],
- "version" : "0.10.6",
- "homepage" : "https://github.com/kaven276/noradle",
- "repository" : {
- "type" : "git",
- "url" : "git@github.com:kaven276/noradle.git"
+ "version": "0.10.7",
+ "homepage": "https://github.com/kaven276/noradle",
+ "repository": {
+ "type": "git",
+ "url": "git@github.com:kaven276/noradle.git"
},
- "main" : "server.js",
- "bin" : {
+ "main": "server.js",
+ "bin": {
+ "inhub": "./bin/inHub.sh"
},
- "scripts" : {
- "start" : "node demo/server.js",
- "testDBCall" : "node test/call_plsql_for_result_sets.js"
+ "scripts": {
+ "start": "node demo/server.js",
+ "testDBCall": "node test/call_plsql_for_result_sets.js"
},
- "config" : {
+ "config": {},
+ "engines": {
+ "node": ">=0.8"
},
- "engines" : {
- "node" : ">=0.8"
+ "dependencies": {
+ "underscore": "^1.8.2",
+ "debug": "~2.1.1"
},
- "dependencies" : {},
- "devDependencies" : {},
- "optionalDependencies" : {
- "formidable" : ">1.0.11",
- "debug" : "~2.1.1"
+ "devDependencies": {},
+ "optionalDependencies": {
+ "formidable": ">1.0.11"
},
- "bugs" : {
- "url" : "https://github.com/kaven276/noradle/issues",
- "email" : "kaven276@vip.sina.com"
+ "bugs": {
+ "url": "https://github.com/kaven276/noradle/issues",
+ "email": "kaven276@vip.sina.com"
}
}
View
7 server.js
@@ -34,7 +34,7 @@ Object.defineProperties(exports, {
},
DBPool : {
get : function(){
- return require('./lib/DBPool.js').DBPool;
+ return require('./lib/DBClient.js').DBPool;
}
},
poolMonitor : {
@@ -42,6 +42,11 @@ Object.defineProperties(exports, {
return require('./lib/poolMonitor.js');
}
},
+ inHub : {
+ get : function(){
+ return require('./lib/inHub.js');
+ }
+ },
RSParser : {
get : function(){
Please sign in to comment.
Something went wrong with that request. Please try again.