Skip to content
Browse files

DCO: use 3nd int32 as rseq, tune k_debug, no sync

remove oraSeq sync code, alway in sync
rec in ext-hub is only for debug purpose, so rename to _rec
  • Loading branch information...
1 parent 6df30a9 commit 8b2a60c333f568f1fa1ee59bf7b24ac677adbf91 @kaven276 committed Oct 11, 2012
Showing with 24 additions and 33 deletions.
  1. +13 −18 lib/ext_hub.js
  2. +0 −1 oracle/psp/dcopv.bdy
  3. +11 −14 oracle/psp/k_ext_call.bdy
View
31 lib/ext_hub.js
@@ -48,21 +48,17 @@ function OraSock(oraSock, data){
this.sid = data.readInt32BE(4);
this.serial = data.readInt32BE(8);
this.asid = data.readUInt32BE(12);
- this.oraSeq = data.readInt32BE(16);
}
-/** @define {int} */
-var onDataCount = 0;
-
var server = net.createServer(function(oraSock){
- var rec;
+ var _rec, asid;
logConnOra('oracle connected');
oraSock.once('data', onHandshake);
oraSock.on('close', function(){
- if (rec && rec.asid) {
- delete oraSocks[rec.asid];
+ if (asid) {
+ delete oraSocks[asid];
}
logConnOra('connection closed, now have %d', server.connections);
});
@@ -81,31 +77,30 @@ var server = net.createServer(function(oraSock){
return;
}
- var asid = data.readUInt32BE(12);
- rec = oraSocks[asid] = new OraSock(oraSock, data);
+ asid = data.readUInt32BE(12);
+ _rec = oraSocks[asid] = new OraSock(oraSock, data);
- logConnOra('oracle connected(%d,%d,%d,%d)', rec.asid, rec.sid, rec.serial, rec.oraSeq);
+ logConnOra('oracle connected(%d,%d,%d)', _rec.asid, _rec.sid, _rec.serial);
new StreamSpliter(oraSock, 'readInt32BE', onOracleRequest);
- if (data.length !== 20) {
- logConnOra('first chunk is not 20 bytes length.');
- oraSock.emit('data', data.slice(20));
+ if (data.length !== 16) {
+ logConnOra('first chunk is not 16 bytes length.');
+ oraSock.emit('data', data.slice(16));
}
}
function onOracleRequest(oraReq){
- rec.oraSeq++;
var proxyID = oraReq.readInt32BE(4)
, wpw = wpSocks[proxyID]
;
- logPDU('> ora(%d,%d,%d,%d) - wp(%d)', rec.asid, rec.sid, rec.serial, rec.oraSeq, proxyID);
+ logPDU('> ora(%d,%d,%d,%d) - wp(%d)', _rec.asid, _rec.sid, _rec.serial, oraReq.readInt32BE(8), proxyID);
if (!wpw) {
console.warn('%s, proxy %d is not exists', new Date(), proxyID);
return;
}
/* replace proxyID field with oraSeq */
- oraReq.writeInt32BE(rec.oraSeq, 4);
+ oraReq.writeInt32BE(asid, 4);
(wpw.sts === 'opened') ? wpw.wpSock.write(oraReq) : wpw.queue.push(oraReq);
}
});
@@ -160,9 +155,9 @@ function WPWrapper(id, hostp, desc){
return;
}
- var oraSeq = proxyResp.readInt32BE(4)
- , asid = proxyResp.readInt32BE(8)
+ var asid = proxyResp.readInt32BE(4)
, rec = oraSocks[asid]
+ , oraSeq = proxyResp.readInt32BE(8)
;
if (!rec) {
console.warn('can not find the sending oracle connection, reply will lose');
View
1 oracle/psp/dcopv.bdy
@@ -5,7 +5,6 @@ begin
pos_head := 0;
pos_tail := 12;
rseq := 0;
- rseq2 := 0;
onway := 0;
onbuf := 0;
dbms_alert.register('Noradle-DCO-EXTHUB-QUIT');
View
25 oracle/psp/k_ext_call.bdy
@@ -64,14 +64,13 @@ create or replace package body k_ext_call is
in_buffer_size => 32767,
out_buffer_size => 0,
tx_timeout => 0);
- -- k_debug.trace(st('ext_hub connected'));
-- record which current connected ext-hub is, used when ask to reconnect the particular ext-hub
dcopv.host := i.host;
dcopv.port := i.port;
goto connected;
exception
when utl_tcp.network_error then
- -- k_debug.trace(st('ext_hub connect error'));
+ k_debug.trace(st('ext_hub connect error (host:port)', i.host, i.port));
continue;
end;
end loop;
@@ -83,8 +82,7 @@ create or replace package body k_ext_call is
utl_raw.concat(pi2r(197610262),
pi2r(v_sid),
pi2r(v_serial),
- pi2r(sys_context('userenv', 'sessionid')),
- pi2r(dcopv.rseq2)));
+ pi2r(sys_context('userenv', 'sessionid'))));
end;
-- private, detect ext-hub quit and try reconnect
@@ -95,10 +93,10 @@ create or replace package body k_ext_call is
begin
dbms_alert.waitone('Noradle-DCO-EXTHUB-QUIT', dcopv.tmp_s, v_sts, 0);
if v_sts = 0 and dcopv.tmp_s = dcopv.host || ':' || dcopv.port then
- -- k_debug.trace(st('check_reconnect find exthub quit signal', dcopv.onway));
+ k_debug.trace(st('check_reconnect find quit signal', sys_context('userenv', 'sessionid'), dcopv.onway));
-- read all pending reply and then reconnect
loop
- -- k_debug.trace(st('reading one pending reply countdown', dcopv.onway));
+ k_debug.trace(st('reading one pending reply', sys_context('userenv', 'sessionid'), dcopv.onway));
exit when dcopv.onway = 0;
dcopv.tmp_b := read_response(-1, dcopv.zblb, null);
end loop;
@@ -139,13 +137,11 @@ create or replace package body k_ext_call is
raise;
end if;
v_err := v_err + 1;
- -- dbms_output.put_line('auto conn starting2 dcopv.rseq=' || dcopv.rseq);
connect_router_proxy;
goto write_tcp;
end;
dcopv.pos_head := 0;
dcopv.pos_tail := 12;
- dcopv.rseq2 := dcopv.rseq;
dcopv.onway := dcopv.onway + dcopv.onbuf;
dcopv.onbuf := 0;
end flush;
@@ -162,16 +158,16 @@ create or replace package body k_ext_call is
if v_len = 0 then
return 0; -- ignore empty request body
end if;
+ dcopv.rseq := dcopv.rseq + 1;
dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 1, pi2r(v_len * sync));
dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 5, pi2r(proxy_id));
- dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 9, pi2r(sys_context('userenv', 'sessionid')));
+ dbms_lob.write(dcopv.msg, 4, dcopv.pos_head + 9, pi2r(dcopv.rseq));
dcopv.onbuf := dcopv.onbuf + 1;
dcopv.pos_head := dcopv.pos_tail;
dcopv.pos_tail := dcopv.pos_tail + 12;
if not buffered then
flush;
end if;
- dcopv.rseq := dcopv.rseq + 1;
dcopv.rsps(dcopv.rseq) := null;
return dcopv.rseq;
end;
@@ -218,14 +214,13 @@ create or replace package body k_ext_call is
if utl_tcp.available(dcopv.con, v_timeout / 100) = 0 then
return false;
end if;
- -- k_debug.trace(st('read', dcopv.onway, dcopv.onbuf));
dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
v_len := utl_raw.cast_to_binary_integer(v_int32) - 12;
dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
- v_rseq := utl_raw.cast_to_binary_integer(v_int32);
- dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
v_asid := utl_raw.cast_to_binary_integer(v_int32);
- k_debug.trace(st('read: rseq,asid', v_rseq, v_asid));
+ dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_int32, 4);
+ v_rseq := utl_raw.cast_to_binary_integer(v_int32);
+
dbms_lob.createtemporary(req_blb, cache => true, dur => dbms_lob.session);
for i in 1 .. floor(v_len / 8132) loop
dcopv.rtcp := utl_tcp.read_raw(dcopv.con, v_raw, 8132);
@@ -247,8 +242,10 @@ create or replace package body k_ext_call is
else
v_timeout := v_timeout - (dbms_utility.get_time - v_start);
if timeout is null or v_timeout > 0 then
+ -- k_debug.trace(st('dco before time out, continue try'));
goto read_response;
else
+ -- k_debug.trace(st('dco after time out, abort'));
req_blb := null;
dcopv.rsps.delete(req_seq);
return false;

0 comments on commit 8b2a60c

Please sign in to comment.
Something went wrong with that request. Please try again.