Skip to content

Commit

Permalink
Decoupling JDBCBackendConnection from ConnectionSession (apache#13975)
Browse files Browse the repository at this point in the history
  • Loading branch information
TeslaCN committed Dec 7, 2021
1 parent cc98db4 commit baa9eda
Show file tree
Hide file tree
Showing 116 changed files with 895 additions and 621 deletions.
Expand Up @@ -27,7 +27,8 @@
import org.apache.shardingsphere.agent.plugin.tracing.jaeger.constant.JaegerConstants;
import org.apache.shardingsphere.agent.plugin.tracing.jaeger.span.JaegerErrorSpan;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;

import java.lang.reflect.Field;
Expand All @@ -54,7 +55,7 @@ public void afterMethod(final AdviceTargetObject target, final Method method, fi
ExecutorDataMap.getValue().remove(JaegerConstants.ROOT_SPAN);
Field field = CommandExecutorTask.class.getDeclaredField("connectionSession");
field.setAccessible(true);
JDBCConnectionSession connection = (JDBCConnectionSession) field.get(target);
JDBCBackendConnection connection = (JDBCBackendConnection) ((ConnectionSession) field.get(target)).getBackendConnection();
Scope scope = GlobalTracer.get().scopeManager().active();
scope.span().setTag(JaegerConstants.ShardingSphereTags.CONNECTION_COUNT.getKey(), connection.getConnectionSize());
scope.close();
Expand Down
Expand Up @@ -25,7 +25,6 @@
import lombok.SneakyThrows;
import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
import org.apache.shardingsphere.db.protocol.payload.PacketPayload;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -58,7 +57,7 @@ public static void setup() {
}
FieldReader fieldReader = new FieldReader(GlobalTracer.get(), GlobalTracer.class.getDeclaredField("tracer"));
tracer = (MockTracer) fieldReader.read();
executeCommandMethod = CommandExecutorTask.class.getDeclaredMethod("executeCommand", ChannelHandlerContext.class, PacketPayload.class, JDBCConnectionSession.class);
executeCommandMethod = CommandExecutorTask.class.getDeclaredMethod("executeCommand", ChannelHandlerContext.class, PacketPayload.class);
}

@Before
Expand Down
Expand Up @@ -21,7 +21,8 @@
import lombok.Getter;
import org.apache.shardingsphere.agent.api.advice.AdviceTargetObject;
import org.apache.shardingsphere.agent.plugin.tracing.AgentRunner;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.junit.runner.RunWith;
Expand All @@ -35,7 +36,10 @@ public abstract class AbstractCommandExecutorTaskAdviceTest implements AdviceTes
@SuppressWarnings("ConstantConditions")
@Override
public final void prepare() {
Object executorTask = new CommandExecutorTask(null, new JDBCConnectionSession(TransactionType.BASE, new DefaultAttributeMap()), null, null);
ConnectionSession connectionSession = new ConnectionSession(TransactionType.BASE, new DefaultAttributeMap());
JDBCBackendConnection backendConnection = new JDBCBackendConnection(connectionSession);
connectionSession.setBackendConnection(backendConnection);
Object executorTask = new CommandExecutorTask(null, connectionSession, null, null);
targetObject = (AdviceTargetObject) executorTask;
}
}
Expand Up @@ -25,7 +25,8 @@
import org.apache.shardingsphere.agent.api.result.MethodInvocationResult;
import org.apache.shardingsphere.agent.plugin.tracing.zipkin.constant.ZipkinConstants;
import org.apache.shardingsphere.infra.executor.kernel.model.ExecutorDataMap;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;
import org.apache.shardingsphere.proxy.frontend.command.CommandExecutorTask;

import java.lang.reflect.Field;
Expand All @@ -51,7 +52,7 @@ public void beforeMethod(final AdviceTargetObject target, final Method method, f
public void afterMethod(final AdviceTargetObject target, final Method method, final Object[] args, final MethodInvocationResult result) {
Field field = CommandExecutorTask.class.getDeclaredField("connectionSession");
field.setAccessible(true);
JDBCConnectionSession connection = (JDBCConnectionSession) field.get(target);
JDBCBackendConnection connection = (JDBCBackendConnection) ((ConnectionSession) field.get(target)).getBackendConnection();
Span span = (Span) ExecutorDataMap.getValue().remove(ZipkinConstants.ROOT_SPAN);
span.tag(ZipkinConstants.Tags.CONNECTION_COUNT, String.valueOf(connection.getConnectionSize()));
span.finish();
Expand Down
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.communication;

import org.apache.shardingsphere.proxy.backend.exception.BackendConnectionException;
import org.apache.shardingsphere.proxy.backend.session.ConnectionSession;

/**
* Backend connection for Proxy.
*/
public interface BackendConnection {

/**
* Get connection session of backend connection.
*
* @return connection session
*/
ConnectionSession getConnectionSession();

/**
* Prepare for task execution.
*
* @throws BackendConnectionException backend connection exception
*/
void prepareForTaskExecution() throws BackendConnectionException;

/**
* Close resources used in execution.
*
* @throws BackendConnectionException backend connection exception
*/
void closeExecutionResources() throws BackendConnectionException;

/**
* Close all resources.
*
* @throws BackendConnectionException backend connection exception
*/
void closeAllResources() throws BackendConnectionException;
}
Expand Up @@ -33,7 +33,7 @@
import org.apache.shardingsphere.infra.merge.result.MergedResult;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.infra.rule.identifier.type.DataNodeContainedRule;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseCell;
import org.apache.shardingsphere.proxy.backend.response.data.QueryResponseRow;
Expand Down Expand Up @@ -83,14 +83,14 @@ public final class DatabaseCommunicationEngine {

private final Collection<ResultSet> cachedResultSets = new CopyOnWriteArrayList<>();

public DatabaseCommunicationEngine(final String driverType, final ShardingSphereMetaData metaData, final LogicSQL logicSQL, final JDBCConnectionSession connectionSession) {
public DatabaseCommunicationEngine(final String driverType, final ShardingSphereMetaData metaData, final LogicSQL logicSQL, final JDBCBackendConnection backendConnection) {
this.driverType = driverType;
this.metaData = metaData;
this.logicSQL = logicSQL;
proxySQLExecutor = new ProxySQLExecutor(driverType, connectionSession, this);
proxySQLExecutor = new ProxySQLExecutor(driverType, backendConnection, this);
kernelProcessor = new KernelProcessor();
metadataRefreshEngine = new MetaDataRefreshEngine(metaData,
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getMetaData().getSchemas().get(connectionSession.getSchemaName()),
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getOptimizerContext().getMetaData().getSchemas().get(backendConnection.getConnectionSession().getSchemaName()),
ProxyContext.getInstance().getContextManager().getMetaDataContexts().getProps());
}

Expand Down
Expand Up @@ -23,7 +23,7 @@
import org.apache.shardingsphere.infra.binder.statement.SQLStatementContext;
import org.apache.shardingsphere.infra.executor.sql.prepare.driver.jdbc.JDBCDriverType;
import org.apache.shardingsphere.infra.metadata.ShardingSphereMetaData;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCConnectionSession;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.JDBCBackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;

import java.util.Collections;
Expand Down Expand Up @@ -51,14 +51,14 @@ public static DatabaseCommunicationEngineFactory getInstance() {
*
* @param sqlStatementContext SQL statement context
* @param sql SQL to be executed
* @param connectionSession connection session
* @param backendConnection backend connection
* @return text protocol backend handler
*/
public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatementContext<?> sqlStatementContext, final String sql, final JDBCConnectionSession connectionSession) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(connectionSession.getSchemaName());
public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatementContext<?> sqlStatementContext, final String sql, final JDBCBackendConnection backendConnection) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getConnectionSession().getSchemaName());
LogicSQL logicSQL = new LogicSQL(sqlStatementContext, sql, Collections.emptyList());
DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, connectionSession);
connectionSession.add(result);
DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.STATEMENT, metaData, logicSQL, backendConnection);
backendConnection.add(result);
return result;
}

Expand All @@ -68,15 +68,15 @@ public DatabaseCommunicationEngine newTextProtocolInstance(final SQLStatementCon
* @param sqlStatementContext SQL statement context
* @param sql SQL to be executed
* @param parameters SQL parameters
* @param connectionSession connection session
* @param backendConnection backend connection
* @return binary protocol backend handler
*/
public DatabaseCommunicationEngine newBinaryProtocolInstance(final SQLStatementContext<?> sqlStatementContext, final String sql,
final List<Object> parameters, final JDBCConnectionSession connectionSession) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(connectionSession.getSchemaName());
final List<Object> parameters, final JDBCBackendConnection backendConnection) {
ShardingSphereMetaData metaData = ProxyContext.getInstance().getMetaData(backendConnection.getConnectionSession().getSchemaName());
LogicSQL logicSQL = new LogicSQL(sqlStatementContext, sql, parameters);
DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, connectionSession);
connectionSession.add(result);
DatabaseCommunicationEngine result = new DatabaseCommunicationEngine(JDBCDriverType.PREPARED_STATEMENT, metaData, logicSQL, backendConnection);
backendConnection.add(result);
return result;
}
}

0 comments on commit baa9eda

Please sign in to comment.