Skip to content

Commit

Permalink
Fix for Bug#99708 (31510398), mysql-connector-java 8.0.20 ASSERTION F…
Browse files Browse the repository at this point in the history
…AILED: Unknown message type: 57 s.close.
  • Loading branch information
fjssilva committed Feb 10, 2021
1 parent 109443b commit 2e73f25
Show file tree
Hide file tree
Showing 6 changed files with 228 additions and 69 deletions.
2 changes: 2 additions & 0 deletions CHANGES
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

Version 8.0.24

- Fix for Bug#99708 (31510398), mysql-connector-java 8.0.20 ASSERTION FAILED: Unknown message type: 57 s.close.

- Fix for Bug#32122553, EXTRA BYTE IN COM_STMT_EXECUTE.

- Fix for Bug#101558 (32141210), NULLPOINTEREXCEPTION WHEN EXECUTING INVALID QUERY WITH USEUSAGEADVISOR ENABLED.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates.
* Copyright (c) 2020, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -133,7 +133,7 @@ public int read(byte[] b) throws IOException {

/**
* Reads bytes from the underlying {@link InputStream} either from the one that gets data directly from the original source {@link InputStream} or from
* a compressorable {@link InputStream}, if reading of a compressed X Protocol frame is in progress.
* a compressor able {@link InputStream}, if reading of a compressed X Protocol frame is in progress.
*
* @see FilterInputStream#read(byte[], int, int)
*/
Expand Down Expand Up @@ -210,7 +210,8 @@ private void peekNextFrame() throws IOException {
* Checks if current X Protocol frame is compressed.
*
* @return
* <code>true</code> if the type of current frame is {@link ServerMessages.Type#COMPRESSION}, <code>false</code> otherwise.
* <code>true</code> if the type of current frame is {@link com.mysql.cj.x.protobuf.Mysqlx.ServerMessages.Type#COMPRESSION}, <code>false</code>
* otherwise.
*/
private boolean isCompressedFrame() {
return ServerMessages.Type.forNumber(this.xMessageHeader.getMessageType()) == ServerMessages.Type.COMPRESSION;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, Oracle and/or its affiliates.
* Copyright (c) 2020, 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
Expand Down Expand Up @@ -42,7 +42,7 @@ public class CompressorStreamsFactory {
private CompressionAlgorithm compressionAlgorithm;

private InputStream compressorInputStreamInstance = null;
private ReusableInputStream underlyingInputStream = null;
private ContinuousInputStream underlyingInputStream = null;

private OutputStream compressorOutputStreamInstance = null;
private ReusableOutputStream underlyingOutputStream = null;
Expand All @@ -55,14 +55,17 @@ public CompressionMode getCompressionMode() {
return this.compressionAlgorithm.getCompressionMode();
}

public boolean areCompressedStreamsReusable() {
public boolean areCompressedStreamsContinuous() {
return getCompressionMode() == CompressionMode.STREAM;
}

/**
* Creates an instance of an {@link InputStream} that wraps around the given {@link InputStream} and knows how to inflate data using the algorithm given in
* this class' constructor.
*
* If the compression algorithm operates in steam mode (continuous) then create and reuse one single instance of the compressor {@link InputStream}, else
* create a new instance every time.
*
* @param in
* the {@link InputStream} to use as source of the bytes to inflate.
* @return
Expand All @@ -71,20 +74,20 @@ public boolean areCompressedStreamsReusable() {
public InputStream getInputStreamInstance(InputStream in) {
InputStream underlyingIn = in;

if (areCompressedStreamsReusable()) {
if (areCompressedStreamsContinuous()) {
if (this.compressorInputStreamInstance != null) {
this.underlyingInputStream.setInputStream(underlyingIn);
this.underlyingInputStream.addInputStream(underlyingIn);
return this.compressorInputStreamInstance;
}
this.underlyingInputStream = new ReusableInputStream(underlyingIn);
this.underlyingInputStream = new ContinuousInputStream(underlyingIn);
underlyingIn = this.underlyingInputStream;
}

InputStream compressionIn = (InputStream) Util.getInstance(this.compressionAlgorithm.getInputStreamClass().getName(),
new Class<?>[] { InputStream.class }, new Object[] { underlyingIn }, null, Messages.getString("Protocol.Compression.IoFactory.0",
new Object[] { this.compressionAlgorithm.getInputStreamClass().getName(), this.compressionAlgorithm }));

if (areCompressedStreamsReusable()) {
if (areCompressedStreamsContinuous()) {
this.compressorInputStreamInstance = compressionIn;
}
return compressionIn;
Expand All @@ -94,6 +97,9 @@ public InputStream getInputStreamInstance(InputStream in) {
* Creates an instance of an {@link OutputStream} that wraps around the given {@link OutputStream} and knows how to deflate data using the algorithm given
* in this class' constructor.
*
* If the compression algorithm operates in steam mode (continuous) then create and reuse one single instance of the compressor {@link OutputStream}, else
* create a new instance every time.
*
* @param out
* the {@link OutputStream} to use as target of the bytes to deflate.
* @return
Expand All @@ -102,7 +108,7 @@ public InputStream getInputStreamInstance(InputStream in) {
public OutputStream getOutputStreamInstance(OutputStream out) {
OutputStream underlyingOut = out;

if (areCompressedStreamsReusable()) {
if (areCompressedStreamsContinuous()) {
if (this.compressorOutputStreamInstance != null) {
this.underlyingOutputStream.setOutputStream(underlyingOut);
return this.compressorOutputStreamInstance;
Expand All @@ -115,7 +121,7 @@ public OutputStream getOutputStreamInstance(OutputStream out) {
new Class<?>[] { OutputStream.class }, new Object[] { underlyingOut }, null, Messages.getString("Protocol.Compression.IoFactory.1",
new Object[] { this.compressionAlgorithm.getOutputStreamClass().getName(), this.compressionAlgorithm }));

if (areCompressedStreamsReusable()) {
if (areCompressedStreamsContinuous()) {
compressionOut = new ContinuousOutputStream(compressionOut);
this.compressorOutputStreamInstance = compressionOut;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Copyright (c) 2021, Oracle and/or its affiliates.
*
* This program is free software; you can redistribute it and/or modify it under
* the terms of the GNU General Public License, version 2.0, as published by the
* Free Software Foundation.
*
* This program is also distributed with certain software (including but not
* limited to OpenSSL) that is licensed under separate terms, as designated in a
* particular file or component or in included license documentation. The
* authors of MySQL hereby grant you an additional permission to link the
* program and your derivative works with the separately licensed software that
* they have included with MySQL.
*
* Without limiting anything contained in the foregoing, this file, which is
* part of MySQL Connector/J, is also subject to the Universal FOSS Exception,
* version 1.0, a copy of which can be found at
* http://oss.oracle.com/licenses/universal-foss-exception.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
* FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0,
* for more details.
*
* You should have received a copy of the GNU General Public License along with
* this program; if not, write to the Free Software Foundation, Inc.,
* 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
*/

package com.mysql.cj.protocol.x;

import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.LinkedList;
import java.util.Queue;

/**
* An {@link InputStream} wrapper that reads from a queue of underlying {@link InputStream}s, giving the impression that all data is coming from a single,
* continuous, source.
*/
public class ContinuousInputStream extends FilterInputStream {
private Queue<InputStream> inputStreams = new LinkedList<>();

private boolean closed = false;

protected ContinuousInputStream(InputStream in) {
super(in);
}

/**
* Returns the number of bytes available in the active underlying {@link InputStream}.
*
* @return the number of bytes available.
* @see FilterInputStream#available()
*/
@Override
public int available() throws IOException {
ensureOpen();
int available = super.available();
if (available == 0 && nextInLine()) {
return available();
}
return available;
}

/**
* Closes this stream and all underlying {@link InputStream}s.
*
* @see FilterInputStream#close()
*/
@Override
public void close() throws IOException {
if (!this.closed) {
this.closed = true;
super.close();
for (InputStream is : this.inputStreams) {
is.close();
}
}
}

/**
* Reads one byte from the underlying {@link InputStream}. When EOF is reached, then reads from the next {@link InputStream} in the queue.
*
* @see FilterInputStream#read()
*/
@Override
public int read() throws IOException {
ensureOpen();
int read = super.read();
if (read >= 0) {
return read;
}
if (nextInLine()) {
return read();
}
return read;
}

/**
* Forwards the read to {@link #read(byte[], int, int)}.
*
* @see FilterInputStream#read(byte[])
*/
@Override
public int read(byte[] b) throws IOException {
ensureOpen();
return read(b, 0, b.length);
}

/**
* Reads bytes from the underlying {@link InputStream}. When EOF is reached, then reads from the next {@link InputStream} in the queue.
*
* @see FilterInputStream#read(byte[], int, int)
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
ensureOpen();
int toRead = Math.min(len, available());
int read = super.read(b, off, toRead);
if (read > 0) {
return read;
}
if (nextInLine()) {
return read(b, off, len);
}
return read;
}

/**
* Adds another {@link InputStream} to the {@link InputStream}s queue.
*
* @param newIn
* the {@link InputStream} to add.
* @return
* <code>true</code> if the element was added to the {@link InputStream}s queue.
*/
protected boolean addInputStream(InputStream newIn) {
return this.inputStreams.offer(newIn);
}

/**
* Closes the currently active {@link InputStream} and replaces it by the the head of the {@link InputStream}s queue.
*
* @return
* <code>true</code> if the currently active {@link InputStream} was replaced by a new one.
* @throws IOException
* if errors occur while closing the currently active {@link InputStream}.
*/
private boolean nextInLine() throws IOException {
InputStream nextInputStream = this.inputStreams.poll();
if (nextInputStream != null) {
super.close();
this.in = nextInputStream;
return true;
}
return false;
}

/**
* Ensures that this {@link InputStream} wasn't closed yet.
*
* @throws IOException
* if this {@link InputStream} was closed.
*/
private void ensureOpen() throws IOException {
if (this.closed) {
throw new IOException("Stream closed");
}
}
}

This file was deleted.

Loading

0 comments on commit 2e73f25

Please sign in to comment.