Skip to content

Commit

Permalink
[CONJ-1115] Make connector become virtual-thread friendly
Browse files Browse the repository at this point in the history
  • Loading branch information
rusher committed Oct 23, 2023
1 parent 10a1107 commit a177285
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public StandardClient(
OutputStream out = new BufferedOutputStream(socket.getOutputStream(), 16384);
InputStream in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(socket.getInputStream())
? new ReadAheadBufferedStream(socket.getInputStream(), lock)
: new BufferedInputStream(socket.getInputStream(), 16384);

assignStream(out, in, conf, null);
Expand Down Expand Up @@ -165,7 +165,7 @@ public StandardClient(
out = new BufferedOutputStream(sslSocket.getOutputStream(), 16384);
in =
conf.useReadAheadInput()
? new ReadAheadBufferedStream(sslSocket.getInputStream())
? new ReadAheadBufferedStream(sslSocket.getInputStream(), lock)
: new BufferedInputStream(sslSocket.getInputStream(), 16384);
assignStream(out, in, conf, handshake.getThreadId());
}
Expand Down Expand Up @@ -199,7 +199,7 @@ public StandardClient(
if ((clientCapabilities & Capabilities.COMPRESS) != 0) {
assignStream(
new CompressOutputStream(out, compressionSequence),
new CompressInputStream(in, compressionSequence),
new CompressInputStream(in, compressionSequence, lock),
conf,
handshake.getThreadId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.locks.ReentrantLock;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;
import org.mariadb.jdbc.client.util.MutableByte;
Expand All @@ -22,17 +23,19 @@ public class CompressInputStream extends InputStream {

private int end;
private int pos;
private byte[] buf;
private volatile byte[] buf;
private final ReentrantLock lock;

/**
* Constructor. When this handler is used, driver expect packet with 7 byte compression header
*
* @param in socket input stream
* @param compressionSequence compression sequence
*/
public CompressInputStream(InputStream in, MutableByte compressionSequence) {
public CompressInputStream(InputStream in, MutableByte compressionSequence, ReentrantLock lock) {
this.in = in;
this.sequence = compressionSequence;
this.lock = lock;
}

/**
Expand Down Expand Up @@ -87,19 +90,23 @@ public int read(byte[] b, int off, int len) throws IOException {
}

int totalReads = 0;
lock.lock();
try {
do {
if (end - pos <= 0) {
retrieveBuffer();
}
// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
} while (totalReads < len && super.available() > 0);

do {
if (end - pos <= 0) {
retrieveBuffer();
}
// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, b, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
} while (totalReads < len && super.available() > 0);

return totalReads;
return totalReads;
} finally {
lock.unlock();
}
}

private void retrieveBuffer() throws IOException {
Expand Down Expand Up @@ -218,7 +225,12 @@ public long skip(long n) throws IOException {
*/
@Override
public int available() throws IOException {
return in.available();
lock.lock();
try {
return in.available();
} finally {
lock.unlock();
}
}

/**
Expand Down Expand Up @@ -257,8 +269,13 @@ public void close() throws IOException {
* @see InputStream#reset()
*/
@Override
public synchronized void mark(int readlimit) {
in.mark(readlimit);
public void mark(int readlimit) {
lock.lock();
try {
in.mark(readlimit);
} finally {
lock.unlock();
}
}

/**
Expand Down Expand Up @@ -299,8 +316,13 @@ public synchronized void mark(int readlimit) {
* @see IOException
*/
@Override
public synchronized void reset() throws IOException {
in.reset();
public void reset() throws IOException {
lock.lock();
try {
in.reset();
} finally {
lock.unlock();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.locks.ReentrantLock;

/**
* Permit to buf socket data, reading not only asked bytes, but available number of bytes when
Expand All @@ -15,6 +16,7 @@ public class ReadAheadBufferedStream extends FilterInputStream {

private static final int BUF_SIZE = 16384;
private final byte[] buf;
private final ReentrantLock lock;
private int end;
private int pos;

Expand All @@ -23,11 +25,16 @@ public class ReadAheadBufferedStream extends FilterInputStream {
*
* @param in socket input stream
*/
public ReadAheadBufferedStream(InputStream in) {
public ReadAheadBufferedStream(InputStream in, ReentrantLock lock) {
super(in);
buf = new byte[BUF_SIZE];
end = 0;
pos = 0;
this.lock = lock;
}

public ReentrantLock getLock() {
return lock;
}

/**
Expand All @@ -39,45 +46,49 @@ public ReadAheadBufferedStream(InputStream in) {
* @return number of added bytes
* @throws IOException if exception during socket reading
*/
public synchronized int read(byte[] externalBuf, int off, int len) throws IOException {
public int read(byte[] externalBuf, int off, int len) throws IOException {

if (len == 0) {
return 0;
}

int totalReads = 0;
while (true) {

// read
if (end - pos <= 0) {
if (len - totalReads >= buf.length) {
// buf length is less than asked byte and buf is empty
// => filling directly into external buf
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
if (reads <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
return totalReads + reads;

} else {

// filling internal buf
fillingBuffer(len - totalReads);
if (end <= 0) {
return (totalReads == 0) ? -1 : totalReads;
lock.lock();
try {
int totalReads = 0;
while (true) {

// read
if (end - pos <= 0) {
if (len - totalReads >= buf.length) {
// buf length is less than asked byte and buf is empty
// => filling directly into external buf
int reads = super.read(externalBuf, off + totalReads, len - totalReads);
if (reads <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
return totalReads + reads;

} else {

// filling internal buf
fillingBuffer(len - totalReads);
if (end <= 0) {
return (totalReads == 0) ? -1 : totalReads;
}
}
}
}

// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;
// copy internal value to buf.
int copyLength = Math.min(len - totalReads, end - pos);
System.arraycopy(buf, pos, externalBuf, off + totalReads, copyLength);
pos += copyLength;
totalReads += copyLength;

if (totalReads >= len || super.available() <= 0) {
return totalReads;
if (totalReads >= len || super.available() <= 0) {
return totalReads;
}
}
} finally{
lock.unlock();
}
}

Expand All @@ -103,8 +114,13 @@ public void close() throws IOException {
pos = 0;
}

public synchronized int available() throws IOException {
return end - pos + super.available();
public int available() throws IOException {
lock.lock();
try {
return end - pos + super.available();
} finally {
lock.unlock();
}
}

public int read() throws IOException {
Expand Down

0 comments on commit a177285

Please sign in to comment.