Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
[CONJ-1095] Retrieve session information on first Ok_Packet when avai…
…lable
  • Loading branch information
rusher committed Jul 31, 2023
1 parent 128ad1d commit 16e85bd
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 25 deletions.
2 changes: 2 additions & 0 deletions src/main/java/org/mariadb/jdbc/HostAddress.java
Expand Up @@ -22,6 +22,8 @@ public class HostAddress {
/** primary node */
public Boolean primary;

public Long threadsConnected;

/**
* Constructor.
*
Expand Down
26 changes: 25 additions & 1 deletion src/main/java/org/mariadb/jdbc/client/Context.java
Expand Up @@ -135,7 +135,7 @@ public interface Context {
*
* @return connection transaction isolation level
*/
int getTransactionIsolationLevel();
Integer getTransactionIsolationLevel();

/**
* Set current connection transaction isolation level
Expand Down Expand Up @@ -170,4 +170,28 @@ public interface Context {
* @param state indicate that some connection state has changed
*/
void addStateFlag(int state);

/**
* Indicate server charset change
*
* @param charset server charset
*/
void setCharset(String charset);

/**
* Indicate server connection Id (not truncated)
*
* @param connectionId connection id
*/
void setThreadId(long connectionId);

/** Indicate the number of connection on this server */
void setTreadsConnected(long threadsConnected);

/**
* Retrieve current charset if session state get it
*
* @return current charset
*/
String getCharset();
}
30 changes: 27 additions & 3 deletions src/main/java/org/mariadb/jdbc/client/context/BaseContext.java
Expand Up @@ -7,6 +7,7 @@
import static org.mariadb.jdbc.util.constants.Capabilities.STMT_BULK_OPERATIONS;

import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.client.Context;
import org.mariadb.jdbc.client.PrepareCache;
import org.mariadb.jdbc.client.ServerVersion;
Expand All @@ -17,7 +18,7 @@
/** Context (current connection state) of a connection */
public class BaseContext implements Context {

private final long threadId;
private long threadId;
private final long serverCapabilities;
private final long clientCapabilities;
private final byte[] seed;
Expand All @@ -28,14 +29,16 @@ public class BaseContext implements Context {
private final Configuration conf;
private final ExceptionFactory exceptionFactory;

private String charset;

/** Server status context */
protected int serverStatus;

/** Server current database */
private String database;

/** Server current transaction isolation level */
private int transactionIsolationLevel;
private Integer transactionIsolationLevel;

/** Server current warning count */
private int warning;
Expand All @@ -46,21 +49,26 @@ public class BaseContext implements Context {
/** Connection state use flag */
private int stateFlag = 0;

private final HostAddress hostAddress;

/**
* Constructor of connection context
*
* @param hostAddress host address
* @param handshake server handshake
* @param clientCapabilities client capabilities
* @param conf connection configuration
* @param exceptionFactory connection exception factory
* @param prepareCache LRU prepare cache
*/
public BaseContext(
HostAddress hostAddress,
InitialHandshakePacket handshake,
long clientCapabilities,
Configuration conf,
ExceptionFactory exceptionFactory,
PrepareCache prepareCache) {
this.hostAddress = hostAddress;
this.threadId = handshake.getThreadId();
this.seed = handshake.getSeed();
this.serverCapabilities = handshake.getCapabilities();
Expand Down Expand Up @@ -144,7 +152,7 @@ public Configuration getConf() {
return conf;
}

public int getTransactionIsolationLevel() {
public Integer getTransactionIsolationLevel() {
return transactionIsolationLevel;
}

Expand All @@ -171,4 +179,20 @@ public void resetStateFlag() {
public void addStateFlag(int state) {
stateFlag |= state;
}

public void setCharset(String charset) {
this.charset = charset;
}

public void setThreadId(long connectionId) {
threadId = connectionId;
}

public void setTreadsConnected(long threadsConnected) {
if (hostAddress != null) hostAddress.threadsConnected = threadsConnected;
}

public String getCharset() {
return charset;
}
}
Expand Up @@ -5,6 +5,7 @@
package org.mariadb.jdbc.client.context;

import org.mariadb.jdbc.Configuration;
import org.mariadb.jdbc.HostAddress;
import org.mariadb.jdbc.client.PrepareCache;
import org.mariadb.jdbc.client.impl.TransactionSaver;
import org.mariadb.jdbc.export.ExceptionFactory;
Expand All @@ -21,19 +22,21 @@ public class RedoContext extends BaseContext {
/**
* Constructor
*
* @param hostAddress host address
* @param handshake server handshake
* @param clientCapabilities client capabilities
* @param conf configuration
* @param exceptionFactory connection exception factory
* @param prepareCache LRU prepare cache
*/
public RedoContext(
HostAddress hostAddress,
InitialHandshakePacket handshake,
long clientCapabilities,
Configuration conf,
ExceptionFactory exceptionFactory,
PrepareCache prepareCache) {
super(handshake, clientCapabilities, conf, exceptionFactory, prepareCache);
super(hostAddress, handshake, clientCapabilities, conf, exceptionFactory, prepareCache);
this.transactionSaver = new TransactionSaver(conf.transactionReplaySize());
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.mariadb.jdbc.message.client.SslRequestPacket;
import org.mariadb.jdbc.message.server.AuthSwitchPacket;
import org.mariadb.jdbc.message.server.ErrorPacket;
import org.mariadb.jdbc.message.server.OkPacket;
import org.mariadb.jdbc.plugin.AuthenticationPlugin;
import org.mariadb.jdbc.plugin.Credential;
import org.mariadb.jdbc.plugin.CredentialPlugin;
Expand Down Expand Up @@ -273,11 +274,7 @@ public static void authenticationHandler(
// OK_Packet -> Authenticated !
// see https://mariadb.com/kb/en/library/ok_packet/
// *************************************************************************************
buf.skip(); // 0x00 OkPacket Header
buf.readLongLengthEncodedNotNull(); // skip affectedRows
buf.readLongLengthEncodedNotNull(); // skip insert id
// insertId
context.setServerStatus(buf.readShort());
new OkPacket(buf, context);
break authentication_loop;

default:
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/mariadb/jdbc/client/impl/StandardClient.java
Expand Up @@ -130,12 +130,14 @@ public StandardClient(
this.context =
conf.transactionReplay()
? new RedoContext(
hostAddress,
handshake,
clientCapabilities,
conf,
this.exceptionFactory,
new PrepareCache(conf.prepStmtCacheSize(), this))
: new BaseContext(
hostAddress,
handshake,
clientCapabilities,
conf,
Expand Down Expand Up @@ -314,7 +316,7 @@ private void postConnectionQueries() throws SQLException {
}

String serverTz = conf.timezone() != null ? handleTimezone() : null;
String sessionVariableQuery = createSessionVariableQuery(serverTz);
String sessionVariableQuery = createSessionVariableQuery(serverTz, context);
if (sessionVariableQuery != null) commands.add(sessionVariableQuery);

if (hostAddress != null
Expand All @@ -330,7 +332,8 @@ private void postConnectionQueries() throws SQLException {
commands.add(String.format("CREATE DATABASE IF NOT EXISTS `%s`", escapedDb));
commands.add(String.format("USE `%s`", escapedDb));
}
commands.add("SET NAMES utf8mb4");
if (context.getCharset() == null || !"utf8mb4".equals(context.getCharset()))
commands.add("SET NAMES utf8mb4");

if (conf.initSql() != null) {
commands.add(conf.initSql());
Expand Down Expand Up @@ -397,9 +400,10 @@ private void postConnectionQueries() throws SQLException {
* Create session variable if configuration requires additional commands.
*
* @param serverTz server timezone
* @param context context
* @return sql setting session command
*/
public String createSessionVariableQuery(String serverTz) {
public String createSessionVariableQuery(String serverTz, Context context) {
// In JDBC, connection must start in autocommit mode
// [CONJ-269] we cannot rely on serverStatus & ServerStatus.AUTOCOMMIT before this command to
// avoid this command.
Expand Down Expand Up @@ -459,12 +463,12 @@ public String createSessionVariableQuery(String serverTz) {
&& ((major >= 8 && context.getVersion().versionGreaterOrEqual(8, 0, 3))
|| (major < 8 && context.getVersion().versionGreaterOrEqual(5, 7, 20)))) {
sessionCommands.add(
"transaction_isolation='" + conf.transactionIsolation().getValue() + "'");
"@@session.transaction_isolation='" + conf.transactionIsolation().getValue() + "'");
} else {
sessionCommands.add("tx_isolation='" + conf.transactionIsolation().getValue() + "'");
sessionCommands.add(
"@@session.tx_isolation='" + conf.transactionIsolation().getValue() + "'");
}
}

if (!sessionCommands.isEmpty()) {
return "set " + sessionCommands.stream().collect(Collectors.joining(","));
}
Expand Down
Expand Up @@ -280,9 +280,11 @@ public byte[] readBytesNullEnd() {

public StandardReadableByteBuf readLengthBuffer() {
int len = this.readIntLengthEncodedNotNull();
byte[] tmp = new byte[len];
readBytes(tmp);
return new StandardReadableByteBuf(tmp, len);

StandardReadableByteBuf b = new StandardReadableByteBuf(buf, pos + len);
b.pos = pos;
pos += len;
return b;
}

public String readString(int length) {
Expand Down
26 changes: 20 additions & 6 deletions src/main/java/org/mariadb/jdbc/message/server/OkPacket.java
Expand Up @@ -39,11 +39,25 @@ public OkPacket(ReadableByteBuf buf, Context context) {
while (sessionStateBuf.readableBytes() > 0) {
switch (sessionStateBuf.readByte()) {
case StateChange.SESSION_TRACK_SYSTEM_VARIABLES:
ReadableByteBuf tmpBuf2 = sessionStateBuf.readLengthBuffer();
String variable = tmpBuf2.readString(tmpBuf2.readIntLengthEncodedNotNull());
Integer len = tmpBuf2.readLength();
String value = len == null ? null : tmpBuf2.readString(len);
logger.debug("System variable change: {} = {}", variable, value);
ReadableByteBuf tmpBufsv;
do {
tmpBufsv = sessionStateBuf.readLengthBuffer();
String variableSv = tmpBufsv.readString(tmpBufsv.readIntLengthEncodedNotNull());
Integer lenSv = tmpBufsv.readLength();
String valueSv = lenSv == null ? null : tmpBufsv.readString(lenSv);
logger.debug("System variable change: {} = {}", variableSv, valueSv);
switch (variableSv) {
case "character_set_client":
context.setCharset(valueSv);
break;
case "connection_id":
context.setThreadId(Long.parseLong(valueSv));
break;
case "threads_Connected":
context.setTreadsConnected(Long.parseLong(valueSv));
break;
}
} while (tmpBufsv.readableBytes() > 0);
break;

case StateChange.SESSION_TRACK_SCHEMA:
Expand All @@ -55,7 +69,7 @@ public OkPacket(ReadableByteBuf buf, Context context) {
break;

default:
buf.skip(buf.readIntLengthEncodedNotNull());
// buf.skip(buf.readIntLengthEncodedNotNull());
}
}
}
Expand Down

0 comments on commit 16e85bd

Please sign in to comment.