Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

DCO use 12 bytes header, more simple

  • Loading branch information...
commit ab7729dc861c65bad89ddc281db12566c3d709db 1 parent 35ab1b5
@kaven276 authored
View
42 lib/dco_proxy.js
@@ -34,9 +34,10 @@ Counter.prototype.finish = function(){
utl.gracefulExit(function(){
// stop accept new request
// detect end of current request and then quit
- var askClosePDU = new Buffer(6);
+ var askClosePDU = new Buffer(12);
askClosePDU.writeInt32BE(6, 0);
- askClosePDU.writeUInt16BE(0, 4);
+ askClosePDU.writeInt32BE(0, 4);
+ askClosePDU.writeInt32BE(0, 8);
extHubSocks.forEach(function(sock){
// tell ext-hub to half close the tcp connection
sock.write(askClosePDU);
@@ -59,6 +60,7 @@ exports.createServer = function(handler){
extHubSock.once('data', function onHandshake(data){
+ // for begin of connection
try {
var ptoken = data.readInt32BE(0);
} catch (e) {
@@ -71,26 +73,28 @@ exports.createServer = function(handler){
return;
}
- extHubSocks.push(extHubSock);
- (function(extHubID){
+ // for end of connection
+ (function(){
+ /** @const */
+ var extHubID = extHubSocks.length;
+ extHubSocks.push(extHubSock);
extHubSock.on('close', function(){
console.log('ext-hub(%d) will not send new request, in-bound socket half closed', extHubID);
- delete extHubSocks[extHubID - 1];
+ delete extHubSocks[extHubID];
});
- })(extHubSocks.length);
- new StreamSpliter(extHubSock, 'readInt32BE', onRequest);
- function onRequest(pdu){
+ })();
+
+ new StreamSpliter(extHubSock, 'readInt32BE', function onRequest(pdu){
counter.start();
if (pdu.readInt32BE(0) > 0) {
- handler(new Request(pdu, true), new Response(extHubSock, pdu.readUInt16BE(4)));
+ handler(new Request(pdu, true), new Response(extHubSock, pdu.slice(0, 12)));
} else {
handler(new Request(pdu, false), new DummyResponse());
}
- }
+ });
- if (data.length !== 4) {
- console.warn('first chunk is not 4 bytes length.');
- extHubSock.emit('data', data.slice(4));
+ if (data.length > 12) {
+ extHubSock.emit('data', data.slice(12));
}
});
});
@@ -107,15 +111,15 @@ DcoWorkerProxy.prototype.listen = function(port, host){
};
function Request(pdu, sync){
- this.content = pdu.slice(6);
+ this.content = pdu.slice(12);
this.sync = sync;
}
-function Response(extHubSock, traceBackSeq){
+function Response(extHubSock, header){
this._extHubSock = extHubSock;
- this._traceBackSeq = traceBackSeq;
+ this._header = header;
this._buffer = [];
- this._length = 6;
+ this._length = 12;
}
Response.prototype.write = function(data){
if (!data) return;
@@ -127,10 +131,8 @@ Response.prototype.write = function(data){
};
Response.prototype.end = function(data){
if (data) this.write(data);
- var header = new Buffer(6);
+ var header = this._header;
header.writeInt32BE(this._length, 0);
- header.writeUInt16BE(this._traceBackSeq, 4);
-
var extHubSock = this._extHubSock;
extHubSock.write(header);
this._buffer.forEach(function(buf){
View
111 lib/ext_hub.js
@@ -7,25 +7,15 @@
var net = require('net')
, StreamSpliter = require('./StreamSpliter').Class
- , oraSocks = []
+ , oraSocks = {}
, wpSocks = []
, utl = require('./util.js')
, logConnWP = utl.dummy
, logConnOra = console.log
, logPDU = console.log
- , logSeqQ = utl.dummy
, pendingReturn = 0
;
-/**
- * A TraceBack record.
- * @constructor
- */
-function TraceBack(oraSock, oraSeq){
- this.oraSock = oraSock;
- this.oraSeq = oraSeq;
-}
-
function gracefulQuit(){
console.log('ext-hub is exiting !');
console.log('current connection is %d', server.connections);
@@ -57,16 +47,18 @@ var onDataCount = 0;
var server = net.createServer(function(oraSock){
logConnOra('oracle connected');
+ console.log('oracle connected');
oraSock.on('data', function(data){
- logConnOra('NO.%d data arrived from oracle(%d,%d) len=%d', oraSock.sid || 0, oraSock.serial || 0, ++onDataCount, data.length);
+ logConnOra('NO.%d data arrived from oracle(%d,%d) len=%d', ++onDataCount, oraSock.sid || 0, oraSock.serial || 0, data.length);
});
- var sid, serial;
+ var sid, serial, asid;
oraSock.once('data', onHandshake);
oraSock.on('close', function(){
+ delete oraSocks[oraSock.asid];
logConnOra('connection closed, now have %d', server.connections);
});
@@ -86,52 +78,44 @@ var server = net.createServer(function(oraSock){
oraSock.sid = sid = data.readInt32BE(4);
oraSock.serial = serial = data.readInt32BE(8);
- oraSock.oraSeq = data.readInt32BE(12);
- oraSocks[sid] = oraSock;
- logConnOra('oracle connected sid = %d, serial = %d, initial oraSeq = %d', sid, serial, oraSock.oraSeq);
+ oraSock.asid = asid = data.readUInt32BE(12);
+ oraSock.oraSeq = data.readInt32BE(16);
+ oraSocks[asid] = oraSock;
+ logConnOra('oracle connected asid=%d, sid = %d, serial = %d, initial oraSeq = %d', asid, sid, serial, oraSock.oraSeq);
new StreamSpliter(oraSock, 'readInt32BE', onOracleRequest);
- if (data.length !== 16) {
- logConnOra('first chunk is not 16 bytes length.');
- oraSock.emit('data', data.slice(16));
+ if (data.length !== 20) {
+ logConnOra('first chunk is not 20 bytes length.');
+ oraSock.emit('data', data.slice(20));
}
}
function onOracleRequest(oraReq){
- var proxyID = oraReq.readUInt16BE(4)
+ var proxyID = oraReq.readInt32BE(4)
, wpw = wpSocks[proxyID]
- , oraSeq = oraSock.oraSeq++
;
if (!wpw) {
console.warn('%s, proxy %d is not exists', new Date(), proxyID);
return;
}
- wpw.whenHaveFreeTraceBackID(oraReq.readInt32BE(0) > 0, function(rpcSeq){
- if (rpcSeq === 0) {
- // for no reply or no rpc requests
- oraReq.writeUInt16BE(0, 4);
- } else {
- oraReq.writeUInt16BE(rpcSeq, 4);
- logPDU('> ora(%d,%d,%d) - wp(%d,%d)', sid, serial, oraSeq, proxyID, rpcSeq);
- wpw.rpcLog[rpcSeq] = new TraceBack(oraSock, oraSeq);
- }
- (wpw.sts === 'opened') ? wpw.wpSock.write(oraReq) : wpw.queue.push(oraReq);
- });
+ /* replace proxyID field with oraSeq */
+ logPDU('> ora(%d,%d,%d,%d) - wp(%d)', asid, sid, serial, oraSock.oraSeq, proxyID);
+
+ oraReq.writeInt32BE(oraSock.oraSeq++, 4);
+ (wpw.sts === 'opened') ? wpw.wpSock.write(oraReq) : wpw.queue.push(oraReq);
}
});
/**
* A worker proxy client wrapper class
* @constructor
- * @param wpSock worker proxy client socket.
* @param {number} id worker proxy client socket slot, start at 0
* @param {string }hostp worker proxy server address, format as "host:port"
- * @param {number} maxPendingRpc worker proxy request queue pending requests limit.
* @param {string} desc worker proxy description.
*/
-function WPWrapper(id, hostp, maxPendingRpc, desc){
+function WPWrapper(id, hostp, desc){
var wpw = this;
var wpSock = this.wpSock = new net.Socket({allowHalfOpen : true});
this.desc = desc;
@@ -139,10 +123,6 @@ function WPWrapper(id, hostp, maxPendingRpc, desc){
this.id = id;
this.sts = 'close';
this.queue = [];
- this.rpcLog = [];
- this.freeList = utl.makeArray(maxPendingRpc || 256);
- this.freeList.shift();
- this.noRpcSeqQueue = [];
wpSock.on('end', function(){
// worker proxy real closed
@@ -171,27 +151,24 @@ function WPWrapper(id, hostp, maxPendingRpc, desc){
function onWorkerProxyReply(proxyResp){
var len = proxyResp.readInt32BE(0);
- if (len === 6) {
+ if (len === 12) {
wpw.setStatus('exiting', 'wp said he will quit');
wpSock.end();
return;
}
- var rpcSeq = proxyResp.readUInt16BE(4);
- var traceBack = wpw.rpcLog[rpcSeq];
+ var oraSeq = proxyResp.readInt32BE(4)
+ , asid = proxyResp.readInt32BE(8)
+ , oraSock = oraSocks[asid]
+ ;
// logPDU('reply for proxySeq=%d, %s', proxySeq, proxyResp.slice(0, 6).toString('hex'));
- if (!traceBack) {
- console.warn('proxySeq:%d have no trace back, may ext-hub restarted and proxy send reply in that break', rpcSeq);
+ if (!oraSock) {
+ console.warn('can not find the sending oracle connection, reply will lose');
return;
}
- var oraSock = traceBack.oraSock
- , oraSeq = traceBack.oraSeq
- ;
- wpw.recycleRpcSeq(rpcSeq);
- proxyResp.writeUInt16BE(oraSeq, 4);
oraSock.write(proxyResp);
- logPDU('< ora(%d,%d,%d) - wp(%d,%d)', oraSock.sid, oraSock.serial, oraSeq, wpw.id, rpcSeq);
+ logPDU('< ora(%d,%d,%d,%d) - wp(%d)', oraSock.asid, oraSock.sid, oraSock.serial, oraSeq, wpw.id);
}
}
@@ -205,43 +182,15 @@ WPWrapper.prototype.setStatus = function(sts, env){
this.sts = sts;
};
-WPWrapper.prototype.whenHaveFreeTraceBackID = function(isRPC, callback){
- if (!isRPC) {
- callback(0);
- } else {
- pendingReturn++;
- var rpcSeq = this.freeList.shift();
- if (rpcSeq === undefined) {
- this.noRpcSeqQueue.push(callback);
- logSeqQ('no free rpc trace back sequence available.');
- } else {
- callback(rpcSeq);
- }
- }
-};
-
-WPWrapper.prototype.recycleRpcSeq = function(rpcSeq){
- pendingReturn--;
- var callback = this.noRpcSeqQueue.shift();
- if (callback) {
- callback(rpcSeq);
- logSeqQ('new free rpc trace back sequence available, and used for queued request.');
- } else {
- delete this.rpcLog[rpcSeq];
- this.freeList.push(rpcSeq);
- // logSeqQ('new free rpc trace back sequence available, just recycled.');
- }
-};
-
-exports.addWorkerProxy = function(slot, addr, desc, maxPendingRpc){
+exports.addWorkerProxy = function(slot, addr, desc){
if (wpSocks[slot]) {
console.warn('The NO.%d slot is already used by %s', wpSocks[slot].desc);
return;
}
- var wpw = new WPWrapper(slot, addr, maxPendingRpc, desc);
+ var wpw = new WPWrapper(slot, addr, desc);
wpSocks[slot] = wpw;
wpw.connect();
- console.log('set NO.%d worker proxy.', slot);
+ console.log('set NO.%d worker proxy : %s', slot, desc);
};
exports.run = function(port){
server.listen(parseInt(port || 1523));
View
10 oracle/psp/k_dco_adm.bdy
@@ -1,5 +1,15 @@
create or replace package body k_dco_adm is
+ procedure signal_reconnect
+ (
+ host varchar2,
+ port number
+ ) is
+ begin
+ dbms_alert.signal('Noradle-DCO-EXTHUB-QUIT', host || ':' || port);
+ commit;
+ end;
+
procedure wait_reconnect_exthub
(
host varchar2,
View
6 oracle/psp/k_dco_adm.spc
@@ -1,5 +1,11 @@
create or replace package k_dco_adm is
+ procedure signal_reconnect
+ (
+ host varchar2,
+ port number
+ );
+
procedure wait_reconnect_exthub
(
host varchar2,
View
187 oracle/psp/k_ext_call.bdy
@@ -6,16 +6,52 @@ create or replace package body k_ext_call is
return utl_raw.cast_from_binary_integer(i);
end;
- -- private
- function trim_raw
+ procedure init is
+ begin
+ dbms_lob.createtemporary(dcopv.msg, cache => true, dur => dbms_lob.session);
+ dcopv.chksz := dbms_lob.getchunksize(dcopv.msg);
+ dcopv.pos_head := 0;
+ dcopv.pos_tail := 12;
+ dcopv.rseq := 1;
+ dcopv.rseq2 := 1;
+ dcopv.onway := 0;
+ dcopv.onbuf := 0;
+ dbms_alert.register('Noradle-DCO-EXTHUB-QUIT');
+ end;
+
+ procedure write(content in out nocopy raw) is
+ v_len pls_integer := utl_raw.length(content);
+ begin
+ dbms_lob.write(dcopv.msg, v_len, dcopv.pos_tail + 1, content);
+ dcopv.pos_tail := dcopv.pos_tail + v_len;
+ end;
+
+ procedure line
(
- int32 pls_integer,
- bytes number := 4
- ) return raw is
+ str varchar2 character set any_cs,
+ nl varchar2 := chr(10),
+ indent pls_integer := null
+ ) is
+ v_out raw(32767);
+ v_len pls_integer;
+ v_cs varchar2(30);
begin
- return utl_raw.substr(utl_raw.cast_from_binary_integer(int32), 5 - bytes);
+ if str is null and nl is null then
+ return;
+ end if;
+
+ v_len := lengthb(str);
+ if v_len = length(str) then
+ v_cs := null;
+ else
+ v_cs := 'AL32UTF8';
+ end if;
+
+ v_out := utl_i18n.string_to_raw(lpad(' ', indent, ' ') || str || nl, v_cs);
+ write(v_out);
end;
+ -- private
procedure close_tcp is
begin
utl_tcp.close_connection(dcopv.con);
@@ -30,15 +66,7 @@ create or replace package body k_ext_call is
v_serial pls_integer;
v_count pls_integer := 0;
begin
- -- k_debug.trace(st('ext_hub try connect'));
- begin
- utl_tcp.close_connection(dcopv.con);
- -- k_debug.trace(st('ext_hub connect closed'));
- exception
- when others then
- -- k_debug.trace(st('ext_hub connect close error'));
- null;
- end;
+ close_tcp;
<<make_connection>>
v_count := v_count + 1;
for i in (select * from exthub_config_t a where a.sts = 'Y' order by a.seq asc nulls last) loop
@@ -50,6 +78,7 @@ create or replace package body k_ext_call is
out_buffer_size => 0,
tx_timeout => 0);
-- k_debug.trace(st('ext_hub connected'));
+ -- record which current connected ext-hub is, used when ask to reconnect the particular ext-hub
dcopv.host := i.host;
dcopv.port := i.port;
goto connected;
@@ -63,55 +92,15 @@ create or replace package body k_ext_call is
goto make_connection;
<<connected>>
select a.sid, a.serial# into v_sid, v_serial from v$session a where a.sid = sys_context('userenv', 'sid');
- dcopv.tmp_pi := utl_tcp.write_raw(dcopv.con, utl_raw.concat(pi2r(197610262), pi2r(v_sid), pi2r(v_serial), pi2r(dcopv.rseq2)));
- end;
-
- procedure init is
- begin
- dbms_lob.createtemporary(dcopv.msg, cache => true, dur => dbms_lob.session);
- dcopv.chksz := dbms_lob.getchunksize(dcopv.msg);
- dcopv.posbk := 0;
- dcopv.pos := 6;
- dcopv.rseq := 1;
- dcopv.rseq2 := 1;
- dcopv.onway := 0;
- dcopv.onbuf := 0;
- dbms_alert.register('Noradle-DCO-EXTHUB-QUIT');
- end;
-
- procedure write(content in out nocopy raw) is
- v_len pls_integer := utl_raw.length(content);
- begin
- dbms_lob.write(dcopv.msg, v_len, dcopv.pos + 1, content);
- dcopv.pos := dcopv.pos + v_len;
- end;
-
- procedure line
- (
- str varchar2 character set any_cs,
- nl varchar2 := chr(10),
- indent pls_integer := null
- ) is
- v_out raw(32767);
- v_len pls_integer;
- v_cs varchar2(30);
- begin
- if str is null and nl is null then
- return;
- end if;
-
- v_len := lengthb(str);
- if v_len = length(str) then
- v_cs := null;
- else
- v_cs := 'AL32UTF8';
- end if;
-
- v_out := utl_i18n.string_to_raw(lpad(' ', indent, ' ') || str || nl, v_cs);
- write(v_out);
+ dcopv.tmp_pi := utl_tcp.write_raw(dcopv.con,
+ utl_raw.concat(pi2r(197610262),
+ pi2r(v_sid),
+ pi2r(v_serial),
+ pi2r(sys_context('userenv', 'sessionid')),
+ pi2r(dcopv.rseq2)));
end;
- -- private
+ -- private, detect ext-hub quit and try reconnect
-- as autonomouse transaction to avoid main code in transaction
procedure check_reconnect is
pragma autonomous_transaction;
@@ -120,7 +109,7 @@ create or replace package body k_ext_call is
dbms_alert.waitone('Noradle-DCO-EXTHUB-QUIT', dcopv.tmp_s, v_sts, 0);
if v_sts = 0 and dcopv.tmp_s = dcopv.host || ':' || dcopv.port then
-- k_debug.trace(st('check_reconnect find exthub quit signal', dcopv.onway));
- -- read all pending reply and then to reconnect
+ -- read all pending reply and then reconnect
loop
-- k_debug.trace(st('reading one pending reply countdown', dcopv.onway));
exit when dcopv.onway = 0;
@@ -131,24 +120,27 @@ create or replace package body k_ext_call is
rollback;
end check_reconnect;
+ -- real network I/O write
procedure flush is
v_raw raw(8132);
v_wlen number(8);
v_pos number := 0;
- v_cnt number(1) := 0;
+ v_err number(1) := 0;
begin
- if dcopv.pos <= 6 then
- return;
+ if dcopv.pos_head <= 12 then
+ return; -- no whole request to send, buffer is empty
+ end if;
+ if dcopv.pos_tail - dcopv.pos_head > 12 then
+ raise_application_error(-20000, 'DCO flush attempt in half filled request, action aborted');
end if;
check_reconnect;
+ v_pos := 0;
<<write_tcp>>
begin
- v_pos := 0;
- for i in 1 .. ceil(dcopv.pos / dcopv.chksz) loop
- if v_pos + dcopv.chksz > dcopv.pos then
- v_wlen := dcopv.pos - v_pos;
- else
- v_wlen := dcopv.chksz;
+ v_wlen := dcopv.chksz;
+ for i in 1 .. ceil(dcopv.pos_head / dcopv.chksz) loop
+ if v_pos + dcopv.chksz > dcopv.pos_head then
+ v_wlen := dcopv.pos_head - v_pos;
end if;
dbms_lob.read(dcopv.msg, v_wlen, v_pos + 1, v_raw);
v_wlen := utl_tcp.write_raw(dcopv.con, v_raw, v_wlen);
@@ -156,19 +148,19 @@ create or replace package body k_ext_call is
end loop;
exception
when utl_tcp.network_error or dcopv.ex_tcp_security then
- if v_cnt > 0 then
+ if v_err > 0 then
raise;
end if;
- v_cnt := v_cnt + 1;
- dbms_output.put_line('auto conn starting2 dcopv.rseq=' || dcopv.rseq);
+ v_err := v_err + 1;
+ -- dbms_output.put_line('auto conn starting2 dcopv.rseq=' || dcopv.rseq);
connect_router_proxy;
goto write_tcp;
end;
- dcopv.posbk := 0;
- dcopv.pos := 6;
- dcopv.rseq2 := dcopv.rseq;
- dcopv.onway := dcopv.onway + dcopv.onbuf;
- dcopv.onbuf := 0;
+ dcopv.pos_head := 0;
+ dcopv.pos_tail := 12;
+ dcopv.rseq2 := dcopv.rseq;
+ dcopv.onway := dcopv.onway + dcopv.onbuf;
+ dcopv.onbuf := 0;
end flush;
-- private
@@ -178,13 +170,18 @@ create or replace package body k_ext_call is
sync pls_integer,
buffered boolean
) return pls_integer is
+ v_len pls_integer := dcopv.pos_tail - dcopv.pos_head;
begin
- dbms_lob.write(dcopv.msg, 4, dcopv.posbk + 1, trim_raw((dcopv.pos - dcopv.posbk) * sync, 4));
- dbms_lob.write(dcopv.msg, 2, dcopv.posbk + 5, trim_raw(proxy_id, 2));
- if buffered then
- dcopv.posbk := dcopv.pos;
- dcopv.pos := dcopv.pos + 6;
- else
+ if v_len = 0 then
+ return 0; -- ignore empty request body
+ end if;
+ dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 1, pi2r(v_len * sync));
+ dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 5, pi2r(proxy_id));
+ dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 9, pi2r(sys_context('userenv', 'sessionid')));
+ dcopv.onbuf := dcopv.onbuf + 1;
+ dcopv.pos_head := dcopv.pos_tail;
+ dcopv.pos_tail := dcopv.pos_tail + 12;
+ if not buffered then
flush;
end if;
dcopv.rsps(dcopv.rseq) := null;
@@ -198,8 +195,6 @@ create or replace package body k_ext_call is
buffered boolean := false
) return pls_integer is
begin
- dcopv.onbuf := dcopv.onbuf + 1;
- -- k_debug.trace(st('send', dcopv.onway, dcopv.onbuf));
return send(proxy_id, 1, buffered);
end;
@@ -219,10 +214,10 @@ create or replace package body k_ext_call is
timeout pls_integer := null
) return boolean is
v_int32 raw(4);
- v_uint16 raw(2) := hextoraw('0');
v_len pls_integer;
v_raw raw(8132);
v_rseq pls_integer;
+ v_asid pls_integer;
v_timeout number(8) := timeout * 100;
v_start number;
begin
@@ -239,10 +234,12 @@ create or replace package body k_ext_call is
dcopv.onway := dcopv.onway - 1;
-- k_debug.trace(st('read', dcopv.onway, dcopv.onbuf));
dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
- dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_uint16, 2);
- v_len := utl_raw.cast_to_binary_integer(v_int32) - 6;
- v_rseq := utl_raw.cast_to_binary_integer(v_uint16);
- k_debug.trace(st('read rseq', v_rseq));
+ v_len := utl_raw.cast_to_binary_integer(v_int32) - 12;
+ dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
+ v_rseq := utl_raw.cast_to_binary_integer(v_int32);
+ dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
+ v_asid := utl_raw.cast_to_binary_integer(v_int32);
+ k_debug.trace(st('read: rseq,asid', v_rseq, v_asid));
dbms_lob.createtemporary(req_blb, cache => true, dur => dbms_lob.session);
for i in 1 .. floor(v_len / 8132) loop
dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_raw, 8132);
@@ -275,11 +272,11 @@ create or replace package body k_ext_call is
function call_sync
(
proxy_id pls_integer,
- req_blb blob,
+ req_blb in out nocopy blob,
timeout pls_integer := null
) return boolean is
begin
- return false;
+ return read_response(send_request(proxy_id, false), req_blb, timeout);
end;
begin
View
2  oracle/psp/k_ext_call.spc
@@ -35,7 +35,7 @@ create or replace package k_ext_call is
function call_sync
(
proxy_id pls_integer,
- req_blb blob,
+ req_blb in out nocopy blob,
timeout pls_integer := null
) return boolean;
Please sign in to comment.
Something went wrong with that request. Please try again.