Skip to content

Commit

Permalink
[CONJ-1115] Make connector become virtual-thread friendly - part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Oct 23, 2023
1 parent db65710 commit f5a33b6
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ public StandardClient(
// **********************************************************************
// creating socket
// **********************************************************************
OutputStream out = new BufferedOutputStream(socket.getOutputStream(), 16384);
OutputStream out = socket.getOutputStream();
InputStream in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(socket.getInputStream(), lock)
? new ReadAheadBufferedStream(socket.getInputStream())
: new BufferedInputStream(socket.getInputStream(), 16384);

assignStream(out, in, conf, null);
Expand Down Expand Up @@ -165,7 +165,7 @@ public StandardClient(
out = new BufferedOutputStream(sslSocket.getOutputStream(), 16384);
in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(sslSocket.getInputStream(), lock)
? new ReadAheadBufferedStream(sslSocket.getInputStream())
: new BufferedInputStream(sslSocket.getInputStream(), 16384);
assignStream(out, in, conf, handshake.getThreadId());
}
Expand Down Expand Up @@ -199,7 +199,7 @@ public StandardClient(
if ((clientCapabilities & Capabilities.COMPRESS) != 0) {
assignStream(
new CompressOutputStream(out, compressionSequence),
new CompressInputStream(in, compressionSequence, lock),
new CompressInputStream(in, compressionSequence),
conf,
handshake.getThreadId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,19 @@ public class CompressInputStream extends InputStream {
private int end;
private int pos;
private volatile byte[] buf;
private final ReentrantLock lock;

/**
* Constructor. When this handler is used, driver expect packet with 7 byte compression header
*
* Implementation doesn't use synchronized/semaphore because all used are already locked by
* Statement/PreparedStatement Reentrant lock
*
* @param in socket input stream
* @param compressionSequence compression sequence
*/
public CompressInputStream(InputStream in, MutableByte compressionSequence, ReentrantLock lock) {
public CompressInputStream(InputStream in, MutableByte compressionSequence) {
this.in = in;
this.sequence = compressionSequence;
this.lock = lock;
}

/**
Expand Down Expand Up @@ -90,23 +91,18 @@ public int read(byte[] b, int off, int len) throws IOException {
}

int totalReads = 0;
lock.lock();
try {
do {
if (end - pos <= 0) {
retrieveBuffer();
}
// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
} while (totalReads < len && super.available() > 0);
do {
if (end - pos <= 0) {
retrieveBuffer();
}
// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
} while (totalReads < len && super.available() > 0);

return totalReads;
} finally {
lock.unlock();
}
return totalReads;
}

private void retrieveBuffer() throws IOException {
Expand Down Expand Up @@ -225,12 +221,7 @@ public long skip(long n) throws IOException {
*/
@Override
public int available() throws IOException {
lock.lock();
try {
return in.available();
} finally {
lock.unlock();
}
return in.available();
}

/**
Expand Down Expand Up @@ -270,12 +261,7 @@ public void close() throws IOException {
*/
@Override
public void mark(int readlimit) {
lock.lock();
try {
in.mark(readlimit);
} finally {
lock.unlock();
}
in.mark(readlimit);
}

/**
Expand Down Expand Up @@ -317,12 +303,7 @@ public void mark(int readlimit) {
*/
@Override
public void reset() throws IOException {
lock.lock();
try {
in.reset();
} finally {
lock.unlock();
}
in.reset();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,22 @@ public class ReadAheadBufferedStream extends FilterInputStream {

private static final int BUF_SIZE = 16384;
private final byte[] buf;
private final ReentrantLock lock;
private int end;
private int pos;

/**
* Constructor
*
* Implementation doesn't use synchronized/semaphore because all used are already locked by
* Statement/PreparedStatement Reentrant lock
*
* @param in socket input stream
*/
public ReadAheadBufferedStream(InputStream in, ReentrantLock lock) {
public ReadAheadBufferedStream(InputStream in) {
super(in);
buf = new byte[BUF_SIZE];
end = 0;
pos = 0;
this.lock = lock;
}

public ReentrantLock getLock() {
return lock;
}

/**
Expand All @@ -51,44 +48,39 @@ public int read(byte[] externalBuf, int off, int len) throws IOException {
if (len == 0) {
return 0;
}
lock.lock();
try {
int totalReads = 0;
while (true) {

// read
if (end - pos <= 0) {
if (len - totalReads >= buf.length) {
// buf length is less than asked byte and buf is empty
// => filling directly into external buf
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
if (reads <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
return totalReads + reads;

} else {

// filling internal buf
fillingBuffer(len - totalReads);
if (end <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
int totalReads = 0;
while (true) {

// read
if (end - pos <= 0) {
if (len - totalReads >= buf.length) {
// buf length is less than asked byte and buf is empty
// => filling directly into external buf
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
if (reads <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
}
return totalReads + reads;

// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
} else {

if (totalReads >= len || super.available() <= 0) {
return totalReads;
// filling internal buf
fillingBuffer(len - totalReads);
if (end <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
}
}
} finally{
lock.unlock();

// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;

if (totalReads >= len || super.available() <= 0) {
return totalReads;
}
}
}

Expand All @@ -115,12 +107,7 @@ public void close() throws IOException {
}

public int available() throws IOException {
lock.lock();
try {
return end - pos + super.available();
} finally {
lock.unlock();
}
return end - pos + super.available();
}

public int read() throws IOException {
Expand Down

0 comments on commit f5a33b6

Please sign in to comment.