From 8b4d925d99b495cf1ccb79164b31da3851b5dc57 Mon Sep 17 00:00:00 2001 From: Honghua Zhu Date: Mon, 1 Oct 2018 22:22:19 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=A0=E9=99=A4CommandBase=E7=B1=BB=EF=BC=8C?= =?UTF-8?q?=E5=9C=A8QUERY/UPDATE=E5=8D=8F=E8=AE=AE=E5=8C=85=E4=B8=AD?= =?UTF-8?q?=E6=90=BA=E5=B8=A6=E5=8F=AF=E9=80=89=E7=9A=84pageKeys=EF=BC=8C?= =?UTF-8?q?=E4=BC=98=E5=8C=96executeQuery/executeUpdate=E6=96=B9=E6=B3=95?= =?UTF-8?q?=E7=9A=84=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lealone/client/ClientBatchCommand.java | 3 +- .../org/lealone/client/ClientCommand.java | 155 +++----- .../src/main/java/org/lealone/db/Command.java | 64 +++- .../main/java/org/lealone/db/CommandBase.java | 47 --- .../src/main/java/org/lealone/db/Session.java | 18 +- .../org/lealone/db/async/AsyncHandler.java | 2 +- .../replication/ReplicationCommand.java | 13 +- .../java/org/lealone/db/ServerCommand.java | 2 +- .../java/org/lealone/net/TcpConnection.java | 214 ++++------- .../java/org/lealone/sql/StatementBase.java | 45 +-- .../org/lealone/sql/StatementWrapper.java | 353 +++++++++++------- .../main/java/org/lealone/sql/dml/Query.java | 12 - .../org/lealone/sql/router/SQLRouter.java | 4 +- 13 files changed, 425 insertions(+), 507 deletions(-) delete mode 100644 lealone-common/src/main/java/org/lealone/db/CommandBase.java diff --git a/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java b/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java index 982a7361e..867fb207f 100644 --- a/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java +++ b/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java @@ -24,7 +24,6 @@ import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; import org.lealone.db.Command; -import org.lealone.db.CommandBase; import org.lealone.db.CommandParameter; import org.lealone.db.CommandUpdateResult; import org.lealone.db.Session; @@ -33,7 +32,7 @@ import org.lealone.net.AsyncCallback; import org.lealone.net.Transfer; -public class ClientBatchCommand extends CommandBase { +public class ClientBatchCommand implements Command { private ClientSession session; private Transfer transfer; private final Trace trace; diff --git a/lealone-client/src/main/java/org/lealone/client/ClientCommand.java b/lealone-client/src/main/java/org/lealone/client/ClientCommand.java index bf2bc7bcb..862c38789 100644 --- a/lealone-client/src/main/java/org/lealone/client/ClientCommand.java +++ b/lealone-client/src/main/java/org/lealone/client/ClientCommand.java @@ -20,7 +20,6 @@ import org.lealone.common.trace.Trace; import org.lealone.common.util.Utils; import org.lealone.db.Command; -import org.lealone.db.CommandBase; import org.lealone.db.CommandParameter; import org.lealone.db.CommandUpdateResult; import org.lealone.db.Session; @@ -44,7 +43,7 @@ * @author H2 Group * @author zhh */ -public class ClientCommand extends CommandBase implements StorageCommand { +public class ClientCommand implements StorageCommand { private final Transfer transfer; private final ArrayList parameters; @@ -65,6 +64,11 @@ public ClientCommand(ClientSession session, Transfer transfer, String sql, int f this.session = session; } + @Override + public int getType() { + return CLIENT_COMMAND; + } + @Override public Command prepare() { prepare(session, true); @@ -166,27 +170,32 @@ public void runInternal() { @Override public Result executeQuery(int maxRows) { - return executeQuery(maxRows, false, null, false); + return query(maxRows, false, null, null); } @Override public Result executeQuery(int maxRows, boolean scrollable) { - return executeQuery(maxRows, scrollable, null, false); + return query(maxRows, scrollable, null, null); + } + + @Override + public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { + return query(maxRows, scrollable, pageKeys, null); } @Override public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { - executeQuery(maxRows, scrollable, handler, true); + query(maxRows, scrollable, null, handler); } @Override - public void executeQueryAsync(int maxRows, boolean scrollable, ArrayList pageKeys, + public void executeQueryAsync(int maxRows, boolean scrollable, List pageKeys, AsyncHandler> handler) { - executeQuery(maxRows, scrollable, handler, true); + query(maxRows, scrollable, pageKeys, handler); } - private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler> handler, - boolean async) { + private Result query(int maxRows, boolean scrollable, List pageKeys, + AsyncHandler> handler) { if (prepared) { checkParameters(); prepareIfRequired(); @@ -207,7 +216,6 @@ private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler pageKeys) throws IOException { + if (pageKeys == null) { + transfer.writeInt(0); + } else { + int size = pageKeys.size(); + transfer.writeInt(size); + for (int i = 0; i < size; i++) { + PageKey pk = pageKeys.get(i); + transfer.writeValue((Value) pk.key); + transfer.writeBoolean(pk.first); + } + } + } + private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultId, - AsyncHandler> handler, boolean async) throws IOException { + AsyncHandler> handler) throws IOException { isQuery = true; AsyncCallback ac = new AsyncCallback() { @Override @@ -263,12 +288,12 @@ public void runInternal() { } } }; - if (async) + if (handler != null) ac.setAsyncHandler(handler); transfer.addAsyncCallback(id, ac); transfer.flush(); - if (async) { + if (handler != null) { return null; } else { Result result = ac.getResult(); @@ -278,21 +303,26 @@ public void runInternal() { @Override public int executeUpdate() { - return executeUpdate(null, null, false, null); + return update(null, null, null, null); + } + + @Override + public int executeUpdate(List pageKeys) { + return update(null, null, pageKeys, null); } @Override public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { - return executeUpdate(replicationName, null, false, commandUpdateResult); + return update(replicationName, commandUpdateResult, null, null); } @Override public void executeUpdateAsync(AsyncHandler> handler) { - executeUpdate(null, handler, true, null); + update(null, null, null, handler); } - private int executeUpdate(String replicationName, AsyncHandler> handler, boolean async, - CommandUpdateResult commandUpdateResult) { + private int update(String replicationName, CommandUpdateResult commandUpdateResult, List pageKeys, + AsyncHandler> handler) { if (prepared) { checkParameters(); prepareIfRequired(); @@ -327,23 +357,26 @@ private int executeUpdate(String replicationName, AsyncHandler> handler, - boolean async, CommandUpdateResult commandUpdateResult) throws IOException { + private int getUpdateCount(boolean isDistributedUpdate, int id, CommandUpdateResult commandUpdateResult, + AsyncHandler> handler) throws IOException { isQuery = false; AsyncCallback ac = new AsyncCallback() { @Override @@ -369,13 +402,13 @@ public void runInternal() { } } }; - if (async) + if (handler != null) ac.setAsyncHandler(handler); transfer.addAsyncCallback(id, ac); transfer.flush(); int updateCount; - if (async) { + if (handler != null) { updateCount = -1; } else { updateCount = ac.getResult(); @@ -436,11 +469,6 @@ public String toString() { return sql + Trace.formatParams(getParameters()); } - @Override - public int getType() { - return CLIENT_COMMAND; - } - int getId() { return id; } @@ -770,65 +798,4 @@ public void runInternal() { } return null; } - - @Override - public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { - if (prepared) { - checkParameters(); - prepareIfRequired(); - } else { - id = session.getNextId(); - } - int resultId = session.getNextId(); - Result result = null; - try { - boolean isDistributedQuery = session.getParentTransaction() != null - && !session.getParentTransaction().isAutoCommit(); - - if (prepared) { - if (isDistributedQuery) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS", id); - transfer.writeRequestHeader(id, - Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS); - } else { - session.traceOperation("COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS", id); - transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS); - } - transfer.writeInt(resultId).writeInt(maxRows); - } else { - if (isDistributedQuery) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS", id); - transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS); - } else { - session.traceOperation("COMMAND_QUERY_WITH_PAGE_KEYS", id); - transfer.writeRequestHeader(id, Session.COMMAND_QUERY_WITH_PAGE_KEYS); - } - transfer.writeString(sql).writeInt(resultId).writeInt(maxRows); - } - int fetch; - if (scrollable) { - fetch = Integer.MAX_VALUE; - } else { - fetch = fetchSize; - } - transfer.writeInt(fetch); - if (prepared) - sendParameters(transfer); - if (pageKeys == null) { - transfer.writeInt(0); - } else { - int size = pageKeys.size(); - transfer.writeInt(size); - for (int i = 0; i < size; i++) { - PageKey pk = pageKeys.get(i); - transfer.writeValue((Value) pk.key); - transfer.writeBoolean(pk.first); - } - } - result = getQueryResult(isDistributedQuery, fetch, resultId, null, false); - } catch (Exception e) { - session.handleException(e); - } - return result; - } } diff --git a/lealone-common/src/main/java/org/lealone/db/Command.java b/lealone-common/src/main/java/org/lealone/db/Command.java index 7562c1dea..cec96a4fd 100644 --- a/lealone-common/src/main/java/org/lealone/db/Command.java +++ b/lealone-common/src/main/java/org/lealone/db/Command.java @@ -5,7 +5,6 @@ */ package org.lealone.db; -import java.util.ArrayList; import java.util.List; import org.lealone.db.async.AsyncHandler; @@ -24,6 +23,7 @@ public interface Command { int CLIENT_COMMAND = -1; int CLIENT_BATCH_COMMAND = -2; int SERVER_COMMAND = -3; + int REPLICATION_COMMAND = -4; /** * Get command type. @@ -32,6 +32,16 @@ public interface Command { */ int getType(); + /** + * Cancel the command if it is still processing. + */ + void cancel(); + + /** + * Close the command. + */ + void close(); + /** * Get the parameters (if any). * @@ -70,11 +80,22 @@ public interface Command { */ Result executeQuery(int maxRows, boolean scrollable); - void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler); + default Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { + return executeQuery(maxRows, scrollable); + } + + default void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + executeQueryAsync(maxRows, scrollable, null, handler); + } - default void executeQueryAsync(int maxRows, boolean scrollable, ArrayList pageKeys, + default void executeQueryAsync(int maxRows, boolean scrollable, List pageKeys, AsyncHandler> handler) { - executeQueryAsync(maxRows, scrollable, handler); + Result result = executeQuery(maxRows, scrollable, pageKeys); + if (handler != null) { + AsyncResult r = new AsyncResult<>(); + r.setResult(result); + handler.handle(r); + } } /** @@ -84,7 +105,9 @@ default void executeQueryAsync(int maxRows, boolean scrollable, ArrayList> handler); + default int executeUpdate(List pageKeys) { + return executeUpdate(); + } /** * Execute the update command @@ -94,23 +117,26 @@ default void executeQueryAsync(int maxRows, boolean scrollable, ArrayList> handler) { + executeUpdateAsync(null, handler); + } - Command prepare(); + default void executeUpdateAsync(List pageKeys, AsyncHandler> handler) { + int updateCount = executeUpdate(pageKeys); + if (handler != null) { + AsyncResult r = new AsyncResult<>(); + r.setResult(updateCount); + handler.handle(r); + } + } - void replicationCommit(long validKey, boolean autoCommit); + default Command prepare() { + return this; + } - void replicationRollback(); + default void replicationCommit(long validKey, boolean autoCommit) { + } - default Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { - return executeQuery(maxRows, scrollable); + default void replicationRollback() { } } diff --git a/lealone-common/src/main/java/org/lealone/db/CommandBase.java b/lealone-common/src/main/java/org/lealone/db/CommandBase.java deleted file mode 100644 index 07db00a43..000000000 --- a/lealone-common/src/main/java/org/lealone/db/CommandBase.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.lealone.db; - -import org.lealone.db.async.AsyncHandler; -import org.lealone.db.async.AsyncResult; -import org.lealone.db.result.Result; - -public abstract class CommandBase implements Command { - - @Override - public Command prepare() { - return this; - } - - @Override - public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { - } - - @Override - public void executeUpdateAsync(AsyncHandler> handler) { - } - - @Override - public void replicationCommit(long validKey, boolean autoCommit) { - } - - @Override - public void replicationRollback() { - } - -} diff --git a/lealone-common/src/main/java/org/lealone/db/Session.java b/lealone-common/src/main/java/org/lealone/db/Session.java index 496f9c0eb..197dd9d52 100644 --- a/lealone-common/src/main/java/org/lealone/db/Session.java +++ b/lealone-common/src/main/java/org/lealone/db/Session.java @@ -37,16 +37,12 @@ public interface Session extends Closeable, Transaction.Participant { public static final int RESULT_CLOSE = 23; public static final int COMMAND_QUERY = 40; - public static final int COMMAND_QUERY_WITH_PAGE_KEYS = 41; - public static final int COMMAND_UPDATE = 42; - public static final int COMMAND_UPDATE_WITH_PAGE_KEYS = 43; + public static final int COMMAND_UPDATE = 41; public static final int COMMAND_PREPARE = 50; public static final int COMMAND_PREPARE_READ_PARAMS = 51; public static final int COMMAND_PREPARED_QUERY = 52; - public static final int COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS = 53; - public static final int COMMAND_PREPARED_UPDATE = 54; - public static final int COMMAND_PREPARED_UPDATE_WITH_PAGE_KEYS = 55; + public static final int COMMAND_PREPARED_UPDATE = 53; public static final int COMMAND_GET_META_DATA = 70; public static final int COMMAND_READ_LOB = 71; @@ -58,13 +54,9 @@ public interface Session extends Closeable, Transaction.Participant { public static final int COMMAND_REPLICATION_ROLLBACK = 83; public static final int COMMAND_DISTRIBUTED_TRANSACTION_QUERY = 100; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS = 101; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY = 102; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS = 103; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_UPDATE = 104; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_UPDATE_WITH_PAGE_KEYS = 105; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE = 106; - public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE_WITH_PAGE_KEYS = 107; + public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY = 101; + public static final int COMMAND_DISTRIBUTED_TRANSACTION_UPDATE = 102; + public static final int COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE = 103; public static final int COMMAND_DISTRIBUTED_TRANSACTION_COMMIT = 120; public static final int COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK = 121; diff --git a/lealone-common/src/main/java/org/lealone/db/async/AsyncHandler.java b/lealone-common/src/main/java/org/lealone/db/async/AsyncHandler.java index 5e7b136f4..47692e357 100644 --- a/lealone-common/src/main/java/org/lealone/db/async/AsyncHandler.java +++ b/lealone-common/src/main/java/org/lealone/db/async/AsyncHandler.java @@ -22,7 +22,7 @@ public interface AsyncHandler { /** * Something has happened, so handle it. * - * @param event the event to handle + * @param event the event to handle */ void handle(E event); } \ No newline at end of file diff --git a/lealone-common/src/main/java/org/lealone/storage/replication/ReplicationCommand.java b/lealone-common/src/main/java/org/lealone/storage/replication/ReplicationCommand.java index c831dfddc..f4c9ae3df 100644 --- a/lealone-common/src/main/java/org/lealone/storage/replication/ReplicationCommand.java +++ b/lealone-common/src/main/java/org/lealone/storage/replication/ReplicationCommand.java @@ -27,7 +27,6 @@ import java.util.concurrent.Future; import org.lealone.db.Command; -import org.lealone.db.CommandBase; import org.lealone.db.CommandParameter; import org.lealone.db.CommandUpdateResult; import org.lealone.db.result.Result; @@ -38,7 +37,7 @@ import org.lealone.storage.replication.exceptions.WriteFailureException; import org.lealone.storage.replication.exceptions.WriteTimeoutException; -public class ReplicationCommand extends CommandBase implements StorageCommand { +public class ReplicationCommand implements StorageCommand { private static final Random random = new Random(System.currentTimeMillis()); @@ -50,6 +49,11 @@ public ReplicationCommand(ReplicationSession session, Command[] commands) { this.commands = commands; } + @Override + public int getType() { + return REPLICATION_COMMAND; + } + private Command getRandomNode(HashSet seen) { while (true) { // 随机选择一个节点,但是不能跟前面选过的重复 @@ -63,11 +67,6 @@ private Command getRandomNode(HashSet seen) { } } - @Override - public int getType() { - return commands[0].getType(); - } - @Override public boolean isQuery() { return commands[0].isQuery(); diff --git a/lealone-db/src/main/java/org/lealone/db/ServerCommand.java b/lealone-db/src/main/java/org/lealone/db/ServerCommand.java index 978c3d322..3c6292589 100644 --- a/lealone-db/src/main/java/org/lealone/db/ServerCommand.java +++ b/lealone-db/src/main/java/org/lealone/db/ServerCommand.java @@ -27,7 +27,7 @@ import org.lealone.storage.StorageMap; import org.lealone.transaction.Transaction; -public class ServerCommand extends CommandBase implements StorageCommand { +public class ServerCommand implements StorageCommand { private final ServerSession session; diff --git a/lealone-net/src/main/java/org/lealone/net/TcpConnection.java b/lealone-net/src/main/java/org/lealone/net/TcpConnection.java index 87e3c20a6..f1382e68e 100644 --- a/lealone-net/src/main/java/org/lealone/net/TcpConnection.java +++ b/lealone-net/src/main/java/org/lealone/net/TcpConnection.java @@ -343,61 +343,60 @@ private static void writeBatchResult(Transfer transfer, Session session, int id, transfer.flush(); } - private void executeQueryAsync(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command, - int operation, int objectId, int maxRows, int fetchSize) throws IOException { - PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Runnable() { - @Override - public void run() { - command.executeQueryAsync(maxRows, false, res -> { - if (res.isSucceeded()) { - Result result = res.getResult(); - cache.addObject(objectId, result); - try { - transfer.writeResponseHeader(id, getStatus(session)); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY - || operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY) { - transfer.writeString(session.getTransaction().getLocalTransactionNames()); - } - if (session.isRunModeChanged()) { - transfer.writeInt(sessionId).writeString(session.getNewTargetEndpoints()); - } - int columnCount = result.getVisibleColumnCount(); - transfer.writeInt(columnCount); - int rowCount = result.getRowCount(); - transfer.writeInt(rowCount); - for (int i = 0; i < columnCount; i++) { - writeColumn(transfer, result, i); - } - int fetch = fetchSize; - if (rowCount != -1) - fetch = Math.min(rowCount, fetchSize); - writeRow(transfer, result, fetch); - transfer.flush(); - } catch (Exception e) { - sendError(transfer, id, e); - } - } else { - sendError(transfer, id, res.getCause()); - } - }); + private static List readPageKeys(Transfer transfer) throws IOException { + ArrayList pageKeys; + int size = transfer.readInt(); + if (size > 0) { + pageKeys = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Object value = transfer.readValue(); + boolean first = transfer.readBoolean(); + PageKey pk = new PageKey(value, first); + pageKeys.add(pk); } - }); - addPreparedCommandToQueue(pc, sessionId); + } else { + pageKeys = null; + } + return pageKeys; } - private void executeQueryAsync(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command, - int operation, int objectId, int maxRows, int fetchSize, ArrayList pageKeys) throws IOException { + private void executeQueryAsync(Transfer transfer, Session session, int sessionId, int id, int operation, + boolean prepared) throws IOException { + int resultId = transfer.readInt(); + int maxRows = transfer.readInt(); + int fetchSize = transfer.readInt(); + boolean scrollable = transfer.readBoolean(); + + if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY + || operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY) { + session.setAutoCommit(false); + session.setRoot(false); + } + + PreparedStatement command; + if (prepared) { + command = (PreparedStatement) cache.getObject(id, false); + setParameters(transfer, command); + } else { + String sql = transfer.readString(); + command = session.prepareStatement(sql, fetchSize); + cache.addObject(id, command); + } + command.setFetchSize(fetchSize); + + List pageKeys = readPageKeys(transfer); + PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Runnable() { @Override public void run() { - command.executeQueryAsync(maxRows, false, pageKeys, res -> { + command.executeQueryAsync(maxRows, scrollable, pageKeys, res -> { if (res.isSucceeded()) { Result result = res.getResult(); - cache.addObject(objectId, result); + cache.addObject(resultId, result); try { transfer.writeResponseHeader(id, getStatus(session)); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS - || operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS) { + if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY + || operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY) { transfer.writeString(session.getTransaction().getLocalTransactionNames()); } if (session.isRunModeChanged()) { @@ -427,12 +426,33 @@ public void run() { addPreparedCommandToQueue(pc, sessionId); } - private void executeUpdateAsync(Transfer transfer, Session session, int sessionId, int id, - PreparedStatement command, int operation) throws IOException { + private void executeUpdateAsync(Transfer transfer, Session session, int sessionId, int id, int operation, + boolean prepared) throws IOException { + if (operation == Session.COMMAND_REPLICATION_UPDATE + || operation == Session.COMMAND_REPLICATION_PREPARED_UPDATE) { + session.setReplicationName(transfer.readString()); + } else if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE + || operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE) { + session.setAutoCommit(false); + session.setRoot(false); + } + + PreparedStatement command; + if (prepared) { + command = (PreparedStatement) cache.getObject(id, false); + setParameters(transfer, command); + } else { + String sql = transfer.readString(); + command = session.prepareStatement(sql, -1); + cache.addObject(id, command); + } + + List pageKeys = readPageKeys(transfer); + PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Runnable() { @Override public void run() { - command.executeUpdateAsync(res -> { + command.executeUpdateAsync(pageKeys, res -> { if (res.isSucceeded()) { int updateCount = res.getResult(); try { @@ -521,114 +541,24 @@ protected void handleRequest(Transfer transfer, int id, int operation) throws IO } case Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY: case Session.COMMAND_QUERY: { - String sql = transfer.readString(); - int objectId = transfer.readInt(); - int maxRows = transfer.readInt(); - int fetchSize = transfer.readInt(); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY) { - session.setAutoCommit(false); - session.setRoot(false); - } - PreparedStatement command = session.prepareStatement(sql, fetchSize); - command.setFetchSize(fetchSize); - cache.addObject(id, command); - executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize); + executeQueryAsync(transfer, session, sessionId, id, operation, false); break; } case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY: case Session.COMMAND_PREPARED_QUERY: { - int objectId = transfer.readInt(); - int maxRows = transfer.readInt(); - int fetchSize = transfer.readInt(); - PreparedStatement command = (PreparedStatement) cache.getObject(id, false); - command.setFetchSize(fetchSize); - setParameters(transfer, command); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY) { - session.setAutoCommit(false); - session.setRoot(false); - } - executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize); - break; - } - - case Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS: - case Session.COMMAND_QUERY_WITH_PAGE_KEYS: { - String sql = transfer.readString(); - int objectId = transfer.readInt(); - int maxRows = transfer.readInt(); - int fetchSize = transfer.readInt(); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS) { - session.setAutoCommit(false); - session.setRoot(false); - } - int size = transfer.readInt(); - ArrayList pageKeys = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - Object value = transfer.readValue(); - boolean first = transfer.readBoolean(); - PageKey pk = new PageKey(value, first); - pageKeys.add(pk); - } - PreparedStatement command = session.prepareStatement(sql, fetchSize); - command.setFetchSize(fetchSize); - cache.addObject(id, command); - executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize, - pageKeys); - break; - } - case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS: - case Session.COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS: { - int objectId = transfer.readInt(); - int maxRows = transfer.readInt(); - int fetchSize = transfer.readInt(); - PreparedStatement command = (PreparedStatement) cache.getObject(id, false); - command.setFetchSize(fetchSize); - setParameters(transfer, command); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS) { - session.setAutoCommit(false); - session.setRoot(false); - } - int size = transfer.readInt(); - ArrayList pageKeys = new ArrayList<>(size); - for (int i = 0; i < size; i++) { - Object value = transfer.readValue(); - boolean first = transfer.readBoolean(); - PageKey pk = new PageKey(value, first); - pageKeys.add(pk); - } - executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize, - pageKeys); + executeQueryAsync(transfer, session, sessionId, id, operation, true); break; } case Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE: case Session.COMMAND_UPDATE: case Session.COMMAND_REPLICATION_UPDATE: { - String sql = transfer.readString(); - if (operation == Session.COMMAND_REPLICATION_UPDATE) { - session.setReplicationName(transfer.readString()); - session.setRoot(false); - } - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE) { - session.setAutoCommit(false); - session.setRoot(false); - } - PreparedStatement command = session.prepareStatement(sql, -1); - cache.addObject(id, command); - executeUpdateAsync(transfer, session, sessionId, id, command, operation); + executeUpdateAsync(transfer, session, sessionId, id, operation, false); break; } case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE: case Session.COMMAND_PREPARED_UPDATE: case Session.COMMAND_REPLICATION_PREPARED_UPDATE: { - if (operation == Session.COMMAND_REPLICATION_PREPARED_UPDATE) - session.setReplicationName(transfer.readString()); - PreparedStatement command = (PreparedStatement) cache.getObject(id, false); - setParameters(transfer, command); - if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE) { - session.setAutoCommit(false); - session.setRoot(false); - } - executeUpdateAsync(transfer, session, sessionId, id, command, operation); + executeUpdateAsync(transfer, session, sessionId, id, operation, true); break; } case Session.COMMAND_REPLICATION_COMMIT: { diff --git a/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java b/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java index cf175eb16..5a3290ec4 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java +++ b/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java @@ -12,7 +12,6 @@ import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; import org.lealone.common.util.StatementBuilder; -import org.lealone.db.CommandBase; import org.lealone.db.CommandParameter; import org.lealone.db.CommandUpdateResult; import org.lealone.db.Database; @@ -20,8 +19,6 @@ import org.lealone.db.SysProperties; import org.lealone.db.api.DatabaseEventListener; import org.lealone.db.api.ErrorCode; -import org.lealone.db.async.AsyncHandler; -import org.lealone.db.async.AsyncResult; import org.lealone.db.result.Result; import org.lealone.db.value.Value; import org.lealone.sql.expression.Expression; @@ -35,7 +32,7 @@ * @author H2 Group * @author zhh */ -public abstract class StatementBase extends CommandBase implements PreparedStatement, ParsedStatement { +public abstract class StatementBase implements PreparedStatement, ParsedStatement { /** * The session. @@ -543,43 +540,33 @@ public Result executeQuery(int maxRows) { @Override public Result executeQuery(int maxRows, boolean scrollable) { - return executeQuery(maxRows); - } - - @Override - public int executeUpdate() { - return update(); + return query(maxRows); } @Override - public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { - return executeUpdate(); + public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { + TableFilter tf = getTableFilter(); + if (tf != null) + tf.setPageKeys(pageKeys); + return query(maxRows); } @Override - public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { - Result result = executeQuery(maxRows, scrollable); - if (handler != null) { - AsyncResult r = new AsyncResult<>(); - r.setResult(result); - handler.handle(r); - } + public int executeUpdate() { + return update(); } @Override - public void executeQueryAsync(int maxRows, boolean scrollable, ArrayList pageKeys, - AsyncHandler> handler) { - executeQueryAsync(maxRows, scrollable, handler); + public int executeUpdate(List pageKeys) { + TableFilter tf = getTableFilter(); + if (tf != null) + tf.setPageKeys(pageKeys); + return update(); } @Override - public void executeUpdateAsync(AsyncHandler> handler) { - int updateCount = executeUpdate(); - if (handler != null) { - AsyncResult r = new AsyncResult<>(); - r.setResult(updateCount); - handler.handle(r); - } + public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { + return update(); } @Override diff --git a/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java b/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java index c4ff4340f..e137ff328 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java +++ b/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java @@ -8,6 +8,7 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.List; +import java.util.Map; import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; @@ -25,6 +26,7 @@ import org.lealone.db.value.Value; import org.lealone.db.value.ValueNull; import org.lealone.sql.expression.Parameter; +import org.lealone.sql.optimizer.TableFilter; import org.lealone.sql.router.SQLRouter; import org.lealone.storage.PageKey; @@ -59,9 +61,17 @@ public StatementWrapper(ServerSession session, StatementBase statement) { trace = session.getDatabase().getTrace(Trace.COMMAND); } + // ========================================== 只实现了以下方法 ============================================= + @Override - public Result getMetaData() { - return statement.getMetaData(); + public PreparedStatement prepare() { + statement.prepare(); + return this; + } + + @Override + public PreparedStatement getWrappedStatement() { + return statement; } /** @@ -69,6 +79,7 @@ public Result getMetaData() { * * @throws DbException if the statement has been canceled */ + @Override public void checkCanceled() { if (cancel) { @@ -77,11 +88,6 @@ public void checkCanceled() { } } - @Override - public void close() { - statement.close(); - } - @Override public void cancel() { this.cancel = true; @@ -93,154 +99,85 @@ public String toString() { return "StatementWrapper[" + statement.toString() + "]"; } - /** - * Whether the command is already closed (in which case it can be re-used). - * - * @return true if it can be re-used - */ - @Override - public boolean canReuse() { - return statement.canReuse(); - } - - /** - * The command is now re-used, therefore reset the canReuse flag, and the - * parameter values. - */ - @Override - public void reuse() { - statement.reuse(); - } - - @Override - public boolean isCacheable() { - return statement.isCacheable(); - } - - @Override - public int getType() { - return statement.getType(); - } - - @Override - public int hashCode() { - return statement.hashCode(); - } - - @Override - public boolean needRecompile() { - return statement.needRecompile(); - } - - @Override - public boolean equals(Object obj) { - return statement.equals(obj); - } - - @Override - public void setParameterList(ArrayList parameters) { - statement.setParameterList(parameters); - } - - @Override - public ArrayList getParameters() { - return statement.getParameters(); - } - - @Override - public boolean isQuery() { - return statement.isQuery(); - } - - @Override - public PreparedStatement prepare() { - statement.prepare(); - return this; - } - - @Override - public void setSQL(String sql) { - statement.setSQL(sql); - } - @Override - public String getSQL() { - return statement.getSQL(); + public void replicationCommit(long validKey, boolean autoCommit) { + session.replicationCommit(validKey, autoCommit); } @Override - public String getPlanSQL() { - return statement.getPlanSQL(); + public void replicationRollback() { + session.rollback(); } @Override - public void setObjectId(int i) { - statement.setObjectId(i); + public Result executeQuery(int maxRows) { + return executeQuery(maxRows, false); } @Override - public void setSession(ServerSession currentSession) { - statement.setSession(currentSession); + public Result executeQuery(int maxRows, boolean scrollable) { + return executeQuery(maxRows, scrollable, null); } - @Override - public void setPrepareAlways(boolean prepareAlways) { - statement.setPrepareAlways(prepareAlways); - } + // scrollable参数被忽略了 @Override - public int getCurrentRowNumber() { - return statement.getCurrentRowNumber(); + public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { + return (Result) executeQuery0(maxRows, pageKeys, null); } @Override - public boolean isLocal() { - return statement.isLocal(); + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + executeQuery0(maxRows, null, handler); } @Override - public void setLocal(boolean local) { - statement.setLocal(local); + public void executeQueryAsync(int maxRows, boolean scrollable, List pageKeys, + AsyncHandler> handler) { + executeQuery0(maxRows, pageKeys, handler); } @Override - public int getFetchSize() { - return statement.getFetchSize(); + public int executeUpdate() { + return executeUpdate(null); } @Override - public void setFetchSize(int fetchSize) { - statement.setFetchSize(fetchSize); + public int executeUpdate(List pageKeys) { + return ((Integer) executeUpdate0(pageKeys, null)).intValue(); } @Override - public ServerSession getSession() { - return statement.getSession(); + public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { + int updateCount = executeUpdate(); + if (commandUpdateResult != null) { + commandUpdateResult.setUpdateCount(updateCount); + commandUpdateResult.addResult(this, session.getLastRowKey()); + } + return updateCount; } @Override - public PreparedStatement getWrappedStatement() { - return statement; + public void executeUpdateAsync(AsyncHandler> handler) { + executeUpdate0(null, handler); } @Override - public Result query(int maxRows) { - return statement.query(maxRows); + public void executeUpdateAsync(List pageKeys, AsyncHandler> handler) { + executeUpdate0(pageKeys, handler); } - @Override - public Result query(int maxRows, boolean scrollable) { - return statement.query(maxRows, scrollable); + private Object executeQuery0(int maxRows, List pageKeys, AsyncHandler> queryHandler) { + return execute(null, queryHandler, pageKeys, maxRows, false); } - @Override - public int update() { - return statement.update(); + private Object executeUpdate0(List pageKeys, AsyncHandler> updateHandler) { + return execute(updateHandler, null, pageKeys, 0, true); } @SuppressWarnings({ "rawtypes", "unchecked" }) - private Object execute(int maxRows, boolean isUpdate, AsyncHandler> updateHandler, - AsyncHandler> queryHandler, ArrayList pageKeys) { + private Object execute(AsyncHandler> updateHandler, + AsyncHandler> queryHandler, List pageKeys, int maxRows, boolean isUpdate) { boolean async = (updateHandler != null) || (queryHandler != null); startTimeNanos = 0; long start = 0; @@ -268,7 +205,11 @@ private Object execute(int maxRows, boolean isUpdate, AsyncHandler ar = new AsyncResult<>(); @@ -441,54 +382,190 @@ private void stop(boolean async, AsyncResult ar, AsyncHandler ah) { } } + // ========================================== 以下方法只是简单的委派 ============================================= + @Override - public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { - return statement.executeQuery(maxRows, scrollable, pageKeys); + public int hashCode() { + return statement.hashCode(); } @Override - public Result executeQuery(int maxRows) { - return (Result) execute(maxRows, false, null, null, null); + public boolean isLocal() { + return statement.isLocal(); } @Override - public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { - execute(0, false, null, handler, null); + public void setLocal(boolean local) { + statement.setLocal(local); } @Override - public void executeQueryAsync(int maxRows, boolean scrollable, ArrayList pageKeys, - AsyncHandler> handler) { - execute(0, false, null, handler, pageKeys); + public int getFetchSize() { + return statement.getFetchSize(); } @Override - public int executeUpdate() { - return ((Integer) execute(0, true, null, null, null)).intValue(); + public void setFetchSize(int fetchSize) { + statement.setFetchSize(fetchSize); } @Override - public void executeUpdateAsync(AsyncHandler> handler) { - execute(0, true, handler, null, null); + public Result getMetaData() { + return statement.getMetaData(); } @Override - public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { - int updateCount = executeUpdate(); - if (commandUpdateResult != null) { - commandUpdateResult.setUpdateCount(updateCount); - commandUpdateResult.addResult(this, session.getLastRowKey()); - } - return updateCount; + public int getType() { + return statement.getType(); } @Override - public void replicationCommit(long validKey, boolean autoCommit) { - session.replicationCommit(validKey, autoCommit); + public boolean needRecompile() { + return statement.needRecompile(); } @Override - public void replicationRollback() { - session.rollback(); + public boolean equals(Object obj) { + return statement.equals(obj); + } + + @Override + public void setParameterList(ArrayList parameters) { + statement.setParameterList(parameters); + } + + @Override + public ArrayList getParameters() { + return statement.getParameters(); + } + + @Override + public boolean isQuery() { + return statement.isQuery(); + } + + @Override + public Result query(int maxRows) { + return statement.query(maxRows); + } + + @Override + public Result query(int maxRows, boolean scrollable) { + return statement.query(maxRows, scrollable); + } + + @Override + public int update() { + return statement.update(); + } + + @Override + public int update(String replicationName) { + return statement.update(replicationName); + } + + @Override + public void setSQL(String sql) { + statement.setSQL(sql); + } + + @Override + public String getSQL() { + return statement.getSQL(); + } + + @Override + public String getPlanSQL() { + return statement.getPlanSQL(); + } + + @Override + public void setObjectId(int i) { + statement.setObjectId(i); + } + + @Override + public void setSession(ServerSession currentSession) { + statement.setSession(currentSession); + } + + @Override + public void setPrepareAlways(boolean prepareAlways) { + statement.setPrepareAlways(prepareAlways); + } + + @Override + public int getCurrentRowNumber() { + return statement.getCurrentRowNumber(); + } + + @Override + public boolean isCacheable() { + return statement.isCacheable(); + } + + @Override + public ServerSession getSession() { + return statement.getSession(); + } + + @Override + public boolean canReuse() { + return statement.canReuse(); + } + + @Override + public void reuse() { + statement.reuse(); + } + + @Override + public void close() { + statement.close(); + } + + @Override + public double getCost() { + return statement.getCost(); + } + + @Override + public int getPriority() { + return statement.getPriority(); + } + + @Override + public void setPriority(int priority) { + statement.setPriority(priority); + } + + @Override + public boolean isDDL() { + return statement.isDDL(); + } + + @Override + public boolean isDatabaseStatement() { + return statement.isDatabaseStatement(); + } + + @Override + public boolean isReplicationStatement() { + return statement.isReplicationStatement(); + } + + @Override + public TableFilter getTableFilter() { + return statement.getTableFilter(); + } + + @Override + public Map> getEndpointToPageKeyMap() { + return statement.getEndpointToPageKeyMap(); + } + + @Override + public String getPlanSQL(boolean isDistributed) { + return statement.getPlanSQL(isDistributed); } } diff --git a/lealone-sql/src/main/java/org/lealone/sql/dml/Query.java b/lealone-sql/src/main/java/org/lealone/sql/dml/Query.java index 8726e33c7..d769082bd 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/dml/Query.java +++ b/lealone-sql/src/main/java/org/lealone/sql/dml/Query.java @@ -32,7 +32,6 @@ import org.lealone.sql.expression.ValueExpression; import org.lealone.sql.optimizer.ColumnResolver; import org.lealone.sql.optimizer.TableFilter; -import org.lealone.storage.PageKey; /** * Represents a SELECT statement (simple, or union). @@ -561,17 +560,6 @@ public final long getMaxDataModificationId() { public abstract List getTopFilters(); - protected List pageKeys; - - @Override - public Result executeQuery(int maxRows, boolean scrollable, List pageKeys) { - this.pageKeys = pageKeys; - for (TableFilter tf : getTopFilters()) { - tf.setPageKeys(pageKeys); - } - return query(maxRows, scrollable); - } - @Override public boolean isDeterministic() { return isEverything(ExpressionVisitor.DETERMINISTIC_VISITOR); diff --git a/lealone-sql/src/main/java/org/lealone/sql/router/SQLRouter.java b/lealone-sql/src/main/java/org/lealone/sql/router/SQLRouter.java index 7933869a7..bad7ac91d 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/router/SQLRouter.java +++ b/lealone-sql/src/main/java/org/lealone/sql/router/SQLRouter.java @@ -240,13 +240,13 @@ private static int maybeExecuteDistributedUpdate(StatementBase statement, int i = 0; for (Entry> e : endpointToPageKeyMap.entrySet()) { String hostId = e.getKey(); - // List pageKeys = e.getValue(); + List pageKeys = e.getValue(); sessions[i] = currentSession.getNestedSession(hostId, !NetEndpoint.getLocalTcpEndpoint().equals(NetEndpoint.createTCP(hostId))); commands[i] = sessions[i].createCommand(sql, Integer.MAX_VALUE); Command c = commands[i]; callables.add(() -> { - return c.executeUpdate(); + return c.executeUpdate(pageKeys); }); i++; }