From 4aea8a4808419550da5567b4b8dac6760972e9d5 Mon Sep 17 00:00:00 2001 From: Honghua Zhu Date: Sun, 20 Mar 2016 23:57:34 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A9=E5=B1=95JDBC=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E5=BC=82=E6=AD=A5API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../lealone/client/ClientBatchCommand.java | 14 + .../org/lealone/client/ClientCommand.java | 272 +++++++++--------- .../client/jdbc/JdbcPreparedStatement.java | 89 ++++-- .../lealone/client/jdbc/JdbcResultSet.java | 2 +- .../lealone/client/jdbc/JdbcStatement.java | 101 +++++-- .../java/org/lealone/async/AsyncHandler.java | 28 ++ .../java/org/lealone/async/AsyncResult.java | 30 ++ .../src/main/java/org/lealone/db/Command.java | 6 + .../replication/ReplicationCommand.java | 14 + .../java/org/lealone/db/ServerCommand.java | 14 + .../java/org/lealone/sql/StatementBase.java | 13 + .../org/lealone/sql/StatementWrapper.java | 12 + .../test/async/AsyncJdbcStatementTest.java | 57 ++++ .../async/AsyncPreparedStatementTest.java | 76 +++++ .../test/benchmark/AsyncBenchmark.java | 166 +++++++++++ .../lealone/test/benchmark/SyncBenchmark.java | 147 ++++++++++ .../org/lealone/test/misc/CRUDExample.java | 189 +----------- 17 files changed, 867 insertions(+), 363 deletions(-) create mode 100644 lealone-common/src/main/java/org/lealone/async/AsyncHandler.java create mode 100644 lealone-common/src/main/java/org/lealone/async/AsyncResult.java create mode 100644 lealone-test/src/test/java/org/lealone/test/async/AsyncJdbcStatementTest.java create mode 100644 lealone-test/src/test/java/org/lealone/test/async/AsyncPreparedStatementTest.java create mode 100644 lealone-test/src/test/java/org/lealone/test/benchmark/AsyncBenchmark.java create mode 100644 lealone-test/src/test/java/org/lealone/test/benchmark/SyncBenchmark.java diff --git a/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java b/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java index 8d5d16ae9..668491bc3 100644 --- a/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java +++ b/lealone-client/src/main/java/org/lealone/client/ClientBatchCommand.java @@ -21,6 +21,8 @@ import java.util.ArrayList; import java.util.List; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; import org.lealone.db.Command; @@ -191,4 +193,16 @@ public Command prepare() { return this; } + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + } diff --git a/lealone-client/src/main/java/org/lealone/client/ClientCommand.java b/lealone-client/src/main/java/org/lealone/client/ClientCommand.java index f1be38757..0f378eb80 100644 --- a/lealone-client/src/main/java/org/lealone/client/ClientCommand.java +++ b/lealone-client/src/main/java/org/lealone/client/ClientCommand.java @@ -11,6 +11,8 @@ import java.util.ArrayList; import org.lealone.api.ErrorCode; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.client.result.ClientResult; import org.lealone.client.result.RowCountDeterminedClientResult; import org.lealone.client.result.RowCountUndeterminedClientResult; @@ -158,31 +160,51 @@ public void runInternal() { @Override public Result executeQuery(int maxRows) { - return executeQuery(maxRows, false); + return executeQuery(maxRows, false, null, false); } @Override public Result executeQuery(int maxRows, boolean scrollable) { - if (prepared) - return executePreparedQuery(maxRows, scrollable); - else - return executeQueryDirectly(maxRows, scrollable); + return executeQuery(maxRows, scrollable, null, false); } - private Result executeQueryDirectly(int maxRows, boolean scrollable) { - id = session.getNextId(); - int objectId = session.getNextId(); - ClientResult result = null; + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + executeQuery(maxRows, scrollable, handler, true); + } + + private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler> handler, + boolean async) { + if (prepared) { + checkParameters(); + prepareIfRequired(); + } else { + id = session.getNextId(); + } + int resultId = session.getNextId(); + Result result = null; try { boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); - if (isDistributedQuery) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id); - transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY); + + if (prepared) { + if (isDistributedQuery) { + session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY", id); + transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY); + } else { + session.traceOperation("COMMAND_PREPARED_QUERY", id); + transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY); + } + transfer.writeInt(session.getSessionId()).writeInt(resultId).writeInt(maxRows); } else { - session.traceOperation("COMMAND_QUERY", id); - transfer.writeRequestHeader(id, Session.COMMAND_QUERY); + if (isDistributedQuery) { + session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id); + transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY); + } else { + session.traceOperation("COMMAND_QUERY", id); + transfer.writeRequestHeader(id, Session.COMMAND_QUERY); + } + transfer.writeInt(session.getSessionId()).writeString(sql).writeInt(resultId).writeInt(maxRows); } - transfer.writeInt(session.getSessionId()).writeString(sql).writeInt(objectId).writeInt(maxRows); int fetch; if (scrollable) { fetch = Integer.MAX_VALUE; @@ -190,127 +212,115 @@ private Result executeQueryDirectly(int maxRows, boolean scrollable) { fetch = fetchSize; } transfer.writeInt(fetch); - AsyncCallback ac = new AsyncCallback() { - @Override - public void runInternal() { - try { - if (isDistributedQuery) - session.getTransaction().addLocalTransactionNames(transfer.readString()); - - int columnCount = transfer.readInt(); - int rowCount = transfer.readInt(); - ClientResult result; - if (rowCount < 0) - result = new RowCountUndeterminedClientResult(session, transfer, objectId, columnCount, - fetch); - else - result = new RowCountDeterminedClientResult(session, transfer, objectId, columnCount, - rowCount, fetch); - setResult(result); - } catch (IOException e) { - throw DbException.convert(e); - } - } - }; - transfer.addAsyncCallback(id, ac); - transfer.flush(); - result = ac.getResult(); + if (prepared) + sendParameters(transfer); + result = getQueryResult(isDistributedQuery, fetch, resultId, handler, async); } catch (Exception e) { session.handleException(e); } session.readSessionState(); - isQuery = true; return result; } - private Result executePreparedQuery(int maxRows, boolean scrollable) { - checkParameters(); - int objectId = session.getNextId(); - ClientResult result = null; - prepareIfRequired(); - try { - boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); - if (isDistributedQuery) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY", id); - transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY); - } else { - session.traceOperation("COMMAND_PREPARED_QUERY", id); - transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY); - } - transfer.writeInt(session.getSessionId()).writeInt(objectId).writeInt(maxRows); - int fetch; - if (scrollable) { - fetch = Integer.MAX_VALUE; - } else { - fetch = fetchSize; - } - transfer.writeInt(fetch); - sendParameters(transfer); - AsyncCallback ac = new AsyncCallback() { - @Override - public void runInternal() { - try { - if (isDistributedQuery) - session.getTransaction().addLocalTransactionNames(transfer.readString()); + private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultId, + AsyncHandler> handler, boolean async) throws IOException { + isQuery = true; + AsyncCallback ac = new AsyncCallback() { + @Override + public void runInternal() { + try { + if (isDistributedQuery) + session.getTransaction().addLocalTransactionNames(transfer.readString()); - int columnCount = transfer.readInt(); - int rowCount = transfer.readInt(); - ClientResult result; - if (rowCount < 0) - result = new RowCountUndeterminedClientResult(session, transfer, objectId, columnCount, - fetch); - else - result = new RowCountDeterminedClientResult(session, transfer, objectId, columnCount, - rowCount, fetch); - setResult(result); - } catch (IOException e) { - throw DbException.convert(e); + int columnCount = transfer.readInt(); + int rowCount = transfer.readInt(); + ClientResult result; + if (rowCount < 0) + result = new RowCountUndeterminedClientResult(session, transfer, resultId, columnCount, fetch); + else + result = new RowCountDeterminedClientResult(session, transfer, resultId, columnCount, rowCount, + fetch); + + setResult(result); + if (handler != null) { + AsyncResult r = new AsyncResult<>(); + r.setResult(result); + handler.handle(r); } + // resultSet.setCommand(command); + } catch (IOException e) { + throw DbException.convert(e); } - }; - transfer.addAsyncCallback(id, ac); - transfer.flush(); - result = ac.getResult(); - } catch (Exception e) { - session.handleException(e); - } - session.readSessionState(); - return result; + } + }; + transfer.addAsyncCallback(id, ac); + transfer.flush(); + + if (async) + return null; + else + return ac.getResult(); } @Override public int executeUpdate() { - return executeUpdate(null); + return executeUpdate(null, null, false); } @Override public int executeUpdate(String replicationName) { - if (prepared) - return executePreparedUpdate(replicationName); - else - return executeUpdateDirectly(replicationName); + return executeUpdate(replicationName, null, false); + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + executeUpdate(null, handler, true); } - private int executeUpdateDirectly(String replicationName) { - id = session.getNextId(); + private int executeUpdate(String replicationName, AsyncHandler> handler, boolean async) { + if (prepared) { + checkParameters(); + prepareIfRequired(); + } else { + id = session.getNextId(); + } int updateCount = 0; try { boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); - if (isDistributedUpdate) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE); - } else if (replicationName != null) { - session.traceOperation("COMMAND_REPLICATION_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_UPDATE); + + if (prepared) { + if (isDistributedUpdate) { + session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE); + } else if (replicationName != null) { + session.traceOperation("COMMAND_REPLICATION_PREPARED_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_PREPARED_UPDATE); + } else { + session.traceOperation("COMMAND_PREPARED_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_UPDATE); + } } else { - session.traceOperation("COMMAND_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_UPDATE); + if (isDistributedUpdate) { + session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE); + } else if (replicationName != null) { + session.traceOperation("COMMAND_REPLICATION_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_UPDATE); + } else { + session.traceOperation("COMMAND_UPDATE", id); + transfer.writeRequestHeader(id, Session.COMMAND_UPDATE); + } } - transfer.writeInt(session.getSessionId()).writeString(sql); + transfer.writeInt(session.getSessionId()); + if (!prepared) + transfer.writeString(sql); if (replicationName != null) transfer.writeString(replicationName); - updateCount = getUpdateCount(isDistributedUpdate, id); + if (prepared) + sendParameters(transfer); + + updateCount = getUpdateCount(isDistributedUpdate, id, handler, async); } catch (Exception e) { session.handleException(e); } @@ -318,7 +328,9 @@ private int executeUpdateDirectly(String replicationName) { return updateCount; } - private int getUpdateCount(boolean isDistributedUpdate, int id) throws IOException { + private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler> handler, + boolean async) throws IOException { + isQuery = false; IntAsyncCallback ac = new IntAsyncCallback() { @Override public void runInternal() { @@ -326,7 +338,13 @@ public void runInternal() { if (isDistributedUpdate) session.getTransaction().addLocalTransactionNames(transfer.readString()); - setResult(transfer.readInt()); + int updateCount = transfer.readInt(); + setResult(updateCount); + if (handler != null) { + AsyncResult r = new AsyncResult<>(); + r.setResult(updateCount); + handler.handle(r); + } } catch (IOException e) { throw DbException.convert(e); } @@ -334,35 +352,15 @@ public void runInternal() { }; transfer.addAsyncCallback(id, ac); transfer.flush(); - return ac.getResult(); - } - - private int executePreparedUpdate(String replicationName) { - checkParameters(); - int updateCount = 0; - prepareIfRequired(); - try { - boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); - if (isDistributedUpdate) { - session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE); - } else if (replicationName != null) { - session.traceOperation("COMMAND_REPLICATION_PREPARED_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_PREPARED_UPDATE); - } else { - session.traceOperation("COMMAND_PREPARED_UPDATE", id); - transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_UPDATE); - } - transfer.writeInt(session.getSessionId()); - if (replicationName != null) - transfer.writeString(replicationName); - sendParameters(transfer); - updateCount = getUpdateCount(isDistributedUpdate, id); - } catch (Exception e) { - session.handleException(e); + int updateCount; + if (async) { + updateCount = -1; + } else { + ac.await(); + updateCount = ac.getResult(); } - session.readSessionState(); + return updateCount; } diff --git a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcPreparedStatement.java b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcPreparedStatement.java index 47030aa4a..1df4926db 100644 --- a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcPreparedStatement.java +++ b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcPreparedStatement.java @@ -29,6 +29,8 @@ import java.util.List; import org.lealone.api.ErrorCode; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.client.ClientBatchCommand; import org.lealone.client.ClientSession; import org.lealone.common.exceptions.DbException; @@ -92,6 +94,14 @@ void setCachedColumnLabelMap(HashMap cachedColumnLabelMap) { */ @Override public ResultSet executeQuery() throws SQLException { + return executeQuery(null, false); + } + + public void executeQueryAsync(AsyncHandler> handler) throws SQLException { + executeQuery(handler, true); + } + + private ResultSet executeQuery(AsyncHandler> handler, boolean async) throws SQLException { try { int id = getNextTraceId(TraceObject.RESULT_SET); if (isDebugEnabled()) { @@ -99,18 +109,39 @@ public ResultSet executeQuery() throws SQLException { } checkClosed(); closeOldResultSet(); - Result result; boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY; boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE; - try { - setExecutingStatement(command); - result = command.executeQuery(maxRows, scrollable); - } finally { - setExecutingStatement(null); + setExecutingStatement(command); + if (async) { + AsyncHandler> h = new AsyncHandler>() { + @Override + public void handle(AsyncResult ar) { + Result r = ar.getResult(); + resultSet = new JdbcResultSet(conn, JdbcPreparedStatement.this, r, id, closedByResultSet, + scrollable, updatable, cachedColumnLabelMap); + setExecutingStatement(null); + resultSet.setCommand(command); + + if (handler != null) { + AsyncResult r2 = new AsyncResult<>(); + r2.setResult(resultSet); + handler.handle(r2); + } + } + }; + command.executeQueryAsync(maxRows, scrollable, h); + return null; + } else { + Result result; + try { + result = command.executeQuery(maxRows, scrollable); + } finally { + setExecutingStatement(null); + } + resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable, + cachedColumnLabelMap); + return resultSet; } - resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable, - cachedColumnLabelMap); - return resultSet; } catch (Exception e) { throw logAndConvert(e); } @@ -137,21 +168,43 @@ public int executeUpdate() throws SQLException { try { debugCodeCall("executeUpdate"); checkClosed(); - return executeUpdateInternal(); + return executeUpdateInternal(null, false); } catch (Exception e) { throw logAndConvert(e); } } - private int executeUpdateInternal() throws SQLException { - closeOldResultSet(); + public void executeUpdateAsync(AsyncHandler> handler) throws SQLException { try { - setExecutingStatement(command); - updateCount = command.executeUpdate(); - } finally { - setExecutingStatement(null); + debugCodeCall("executeUpdateAsync"); + executeUpdateInternal(handler, true); + } catch (Exception e) { + throw logAndConvert(e); + } + } + + private int executeUpdateInternal(AsyncHandler> handler, boolean async) throws SQLException { + closeOldResultSet(); + setExecutingStatement(command); + if (async) { + AsyncHandler> h = new AsyncHandler>() { + @Override + public void handle(AsyncResult ar) { + updateCount = ar.getResult(); + handler.handle(ar); + setExecutingStatement(null); + } + }; + command.executeUpdateAsync(h); + return -1; + } else { + try { + updateCount = command.executeUpdate(); + } finally { + setExecutingStatement(null); + } + return updateCount; } - return updateCount; } /** @@ -1117,7 +1170,7 @@ public int[] executeBatch() throws SQLException { param.setValue(value, false); } try { - result[i] = executeUpdateInternal(); + result[i] = executeUpdateInternal(null, false); } catch (Exception re) { SQLException e = logAndConvert(re); if (next == null) { diff --git a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcResultSet.java b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcResultSet.java index 904905cac..a7fa52035 100644 --- a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcResultSet.java +++ b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcResultSet.java @@ -91,7 +91,7 @@ public class JdbcResultSet extends TraceObject implements ResultSet { private JdbcPreparedStatement preparedStatement; private Command command; - JdbcResultSet(JdbcConnection conn, JdbcStatement stat, Result result, int id, boolean closeStatement, + public JdbcResultSet(JdbcConnection conn, JdbcStatement stat, Result result, int id, boolean closeStatement, boolean scrollable, boolean updatable) { setTrace(conn.getSession().getTrace(), TraceObject.RESULT_SET, id); this.conn = conn; diff --git a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcStatement.java b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcStatement.java index 6d880da58..1ae21138d 100644 --- a/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcStatement.java +++ b/lealone-client/src/main/java/org/lealone/client/jdbc/JdbcStatement.java @@ -14,6 +14,8 @@ import java.util.ArrayList; import org.lealone.api.ErrorCode; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.client.ClientBatchCommand; import org.lealone.client.ClientSession; import org.lealone.common.exceptions.DbException; @@ -65,28 +67,59 @@ public class JdbcStatement extends TraceObject implements Statement { */ @Override public ResultSet executeQuery(String sql) throws SQLException { + return executeQuery(sql, null, false); + } + + public void executeQueryAsync(String sql, AsyncHandler> handler) throws SQLException { + executeQuery(sql, handler, true); + } + + private ResultSet executeQuery(String sql, AsyncHandler> handler, boolean async) + throws SQLException { try { int id = getNextTraceId(TraceObject.RESULT_SET); if (isDebugEnabled()) { - debugCodeAssign("ResultSet", TraceObject.RESULT_SET, id, "executeQuery(" + quote(sql) + ")"); + debugCodeAssign("ResultSet", TraceObject.RESULT_SET, id, "executeQuery" + (async ? "Async" : "") + "(" + + quote(sql) + ")"); } checkClosed(); closeOldResultSet(); sql = JdbcConnection.translateSQL(sql, escapeProcessing); Command command = conn.createCommand(sql, fetchSize); - Result result; boolean scrollable = resultSetType != ResultSet.TYPE_FORWARD_ONLY; boolean updatable = resultSetConcurrency == ResultSet.CONCUR_UPDATABLE; setExecutingStatement(command); - try { - result = command.executeQuery(maxRows, scrollable); - } finally { - setExecutingStatement(null); + if (async) { + AsyncHandler> h = new AsyncHandler>() { + @Override + public void handle(AsyncResult ar) { + Result r = ar.getResult(); + resultSet = new JdbcResultSet(conn, JdbcStatement.this, r, id, closedByResultSet, scrollable, + updatable); + resultSet.setCommand(command); + setExecutingStatement(null); + + if (handler != null) { + AsyncResult r2 = new AsyncResult<>(); + r2.setResult(resultSet); + handler.handle(r2); + } + } + }; + command.executeQueryAsync(maxRows, scrollable, h); + return null; + } else { + Result result; + try { + result = command.executeQuery(maxRows, scrollable); + } finally { + setExecutingStatement(null); + } + // command.close(); //关闭结果集时再关闭 + resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable); + resultSet.setCommand(command); + return resultSet; } - // command.close(); //关闭结果集时再关闭 - resultSet = new JdbcResultSet(conn, this, result, id, closedByResultSet, scrollable, updatable); - resultSet.setCommand(command); - return resultSet; } catch (Exception e) { throw logAndConvert(e); } @@ -114,25 +147,49 @@ public ResultSet executeQuery(String sql) throws SQLException { public int executeUpdate(String sql) throws SQLException { try { debugCodeCall("executeUpdate", sql); - return executeUpdateInternal(sql); + return executeUpdateInternal(sql, null, false); } catch (Exception e) { throw logAndConvert(e); } } - private int executeUpdateInternal(String sql) throws SQLException { + public void executeUpdateAsync(String sql, AsyncHandler> handler) throws SQLException { + try { + debugCodeCall("executeUpdateAsync", sql); + executeUpdateInternal(sql, handler, true); + } catch (Exception e) { + throw logAndConvert(e); + } + } + + private int executeUpdateInternal(String sql, AsyncHandler> handler, boolean async) + throws SQLException { checkClosed(); closeOldResultSet(); sql = JdbcConnection.translateSQL(sql, escapeProcessing); Command command = conn.createCommand(sql, fetchSize); setExecutingStatement(command); - try { - updateCount = command.executeUpdate(); - } finally { - setExecutingStatement(null); + if (async) { + AsyncHandler> h = new AsyncHandler>() { + @Override + public void handle(AsyncResult ar) { + updateCount = ar.getResult(); + handler.handle(ar); + setExecutingStatement(null); + command.close(); + } + }; + command.executeUpdateAsync(h); + return -1; + } else { + try { + updateCount = command.executeUpdate(); + } finally { + setExecutingStatement(null); + } + command.close(); + return updateCount; } - command.close(); - return updateCount; } /** @@ -678,7 +735,7 @@ public int[] executeBatch() throws SQLException { for (int i = 0; i < size; i++) { String sql = batchCommands.get(i); try { - result[i] = executeUpdateInternal(sql); + result[i] = executeUpdateInternal(sql, null, false); } catch (Exception re) { SQLException e = logAndConvert(re); if (next == null) { @@ -796,7 +853,7 @@ public int executeUpdate(String sql, int autoGeneratedKeys) throws SQLException if (isDebugEnabled()) { debugCode("executeUpdate(" + quote(sql) + ", " + autoGeneratedKeys + ");"); } - return executeUpdateInternal(sql); + return executeUpdateInternal(sql, null, false); } catch (Exception e) { throw logAndConvert(e); } @@ -821,7 +878,7 @@ public int executeUpdate(String sql, int[] columnIndexes) throws SQLException { if (isDebugEnabled()) { debugCode("executeUpdate(" + quote(sql) + ", " + quoteIntArray(columnIndexes) + ");"); } - return executeUpdateInternal(sql); + return executeUpdateInternal(sql, null, false); } catch (Exception e) { throw logAndConvert(e); } @@ -846,7 +903,7 @@ public int executeUpdate(String sql, String[] columnNames) throws SQLException { if (isDebugEnabled()) { debugCode("executeUpdate(" + quote(sql) + ", " + quoteArray(columnNames) + ");"); } - return executeUpdateInternal(sql); + return executeUpdateInternal(sql, null, false); } catch (Exception e) { throw logAndConvert(e); } diff --git a/lealone-common/src/main/java/org/lealone/async/AsyncHandler.java b/lealone-common/src/main/java/org/lealone/async/AsyncHandler.java new file mode 100644 index 000000000..38d10b8c4 --- /dev/null +++ b/lealone-common/src/main/java/org/lealone/async/AsyncHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.async; + +public interface AsyncHandler { + + /** + * Something has happened, so handle it. + * + * @param event the event to handle + */ + void handle(E event); +} \ No newline at end of file diff --git a/lealone-common/src/main/java/org/lealone/async/AsyncResult.java b/lealone-common/src/main/java/org/lealone/async/AsyncResult.java new file mode 100644 index 000000000..fa10e85cc --- /dev/null +++ b/lealone-common/src/main/java/org/lealone/async/AsyncResult.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.async; + +public class AsyncResult { + protected T result; + + public T getResult() { + return result; + } + + public void setResult(T result) { + this.result = result; + } +} diff --git a/lealone-common/src/main/java/org/lealone/db/Command.java b/lealone-common/src/main/java/org/lealone/db/Command.java index 021ddb37e..d0df00636 100644 --- a/lealone-common/src/main/java/org/lealone/db/Command.java +++ b/lealone-common/src/main/java/org/lealone/db/Command.java @@ -7,6 +7,8 @@ import java.util.List; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.db.result.Result; /** @@ -66,6 +68,8 @@ public interface Command { */ Result executeQuery(int maxRows, boolean scrollable); + void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler); + /** * Execute the update command * @@ -73,6 +77,8 @@ public interface Command { */ int executeUpdate(); + void executeUpdateAsync(AsyncHandler> handler); + /** * Execute the update command * diff --git a/lealone-common/src/main/java/org/lealone/replication/ReplicationCommand.java b/lealone-common/src/main/java/org/lealone/replication/ReplicationCommand.java index 14af34b8d..a066f9ece 100644 --- a/lealone-common/src/main/java/org/lealone/replication/ReplicationCommand.java +++ b/lealone-common/src/main/java/org/lealone/replication/ReplicationCommand.java @@ -23,6 +23,8 @@ import java.util.List; import java.util.Random; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.common.util.New; import org.lealone.db.Command; import org.lealone.db.CommandParameter; @@ -337,4 +339,16 @@ public Command prepare() { return this; } + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + } diff --git a/lealone-db/src/main/java/org/lealone/db/ServerCommand.java b/lealone-db/src/main/java/org/lealone/db/ServerCommand.java index 12e245caa..1256e2389 100644 --- a/lealone-db/src/main/java/org/lealone/db/ServerCommand.java +++ b/lealone-db/src/main/java/org/lealone/db/ServerCommand.java @@ -20,6 +20,8 @@ import java.nio.ByteBuffer; import java.util.ArrayList; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.db.result.Result; import org.lealone.replication.Replication; import org.lealone.storage.StorageCommand; @@ -140,4 +142,16 @@ public Command prepare() { return this; } + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + } diff --git a/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java b/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java index 18890a6f9..77ab45b3c 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java +++ b/lealone-sql/src/main/java/org/lealone/sql/StatementBase.java @@ -9,6 +9,8 @@ import org.lealone.api.DatabaseEventListener; import org.lealone.api.ErrorCode; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; import org.lealone.common.util.StatementBuilder; @@ -577,4 +579,15 @@ public int executeUpdateAsync() { return executeUpdate(); } + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + // TODO Auto-generated method stub + + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + // TODO Auto-generated method stub + + } } diff --git a/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java b/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java index 3c924b519..640c26022 100644 --- a/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java +++ b/lealone-sql/src/main/java/org/lealone/sql/StatementWrapper.java @@ -10,6 +10,8 @@ import org.lealone.api.DatabaseEventListener; import org.lealone.api.ErrorCode; +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; import org.lealone.common.exceptions.DbException; import org.lealone.common.trace.Trace; import org.lealone.common.util.MathUtils; @@ -435,4 +437,14 @@ public int executeUpdateAsync() { return ((Integer) execute(0, true, true)).intValue(); } + @Override + public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler> handler) { + statement.executeQueryAsync(maxRows, scrollable, handler); + } + + @Override + public void executeUpdateAsync(AsyncHandler> handler) { + statement.executeUpdateAsync(handler); + } + } diff --git a/lealone-test/src/test/java/org/lealone/test/async/AsyncJdbcStatementTest.java b/lealone-test/src/test/java/org/lealone/test/async/AsyncJdbcStatementTest.java new file mode 100644 index 000000000..3dd9a1b4a --- /dev/null +++ b/lealone-test/src/test/java/org/lealone/test/async/AsyncJdbcStatementTest.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.test.async; + +import java.sql.Connection; +import java.sql.ResultSet; + +import org.lealone.client.jdbc.JdbcStatement; +import org.lealone.test.TestBase; + +public class AsyncJdbcStatementTest { + + public static void main(String[] args) throws Exception { + Connection conn = new TestBase().getConnection(); + JdbcStatement stmt = (JdbcStatement) conn.createStatement(); + stmt.executeUpdate("DROP TABLE IF EXISTS test"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); + String sql = "INSERT INTO test(f1, f2) VALUES(1, 2)"; + stmt.executeUpdate(sql); + + stmt.executeUpdateAsync("INSERT INTO test(f1, f2) VALUES(2, 2)", res -> { + System.out.println("updateCount: " + res.getResult()); + + try { + stmt.executeQueryAsync("SELECT * FROM test where f2 = 2", res2 -> { + ResultSet rs = res2.getResult(); + try { + while (rs.next()) { + System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); + } + stmt.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/lealone-test/src/test/java/org/lealone/test/async/AsyncPreparedStatementTest.java b/lealone-test/src/test/java/org/lealone/test/async/AsyncPreparedStatementTest.java new file mode 100644 index 000000000..86baf0418 --- /dev/null +++ b/lealone-test/src/test/java/org/lealone/test/async/AsyncPreparedStatementTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.test.async; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.util.concurrent.CountDownLatch; + +import org.lealone.client.jdbc.JdbcPreparedStatement; +import org.lealone.client.jdbc.JdbcStatement; +import org.lealone.test.TestBase; +import org.lealone.test.sql.SqlTestBase; + +public class AsyncPreparedStatementTest extends SqlTestBase { + + public static void main(String[] args) throws Exception { + Connection conn = new TestBase().getConnection(); + JdbcStatement stmt = (JdbcStatement) conn.createStatement(); + stmt.executeUpdate("DROP TABLE IF EXISTS test"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); + stmt.close(); + + String sql = "INSERT INTO test(f1, f2) VALUES(?, ?)"; + JdbcPreparedStatement ps = (JdbcPreparedStatement) conn.prepareStatement(sql); + ps.setInt(1, 1); + ps.setLong(2, 2); + ps.executeUpdate(); + + CountDownLatch latch = new CountDownLatch(1); + ps.setInt(1, 2); + ps.setLong(2, 2); + ps.executeUpdateAsync(res -> { + System.out.println("updateCount: " + res.getResult()); + latch.countDown(); + }); + latch.await(); + ps.close(); + + ps = (JdbcPreparedStatement) conn.prepareStatement("SELECT * FROM test where f2 = ?"); + ps.setLong(1, 2); + ResultSet rs = ps.executeQuery(); + while (rs.next()) { + System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); + } + + JdbcPreparedStatement ps2 = ps; + ps2.setLong(1, 2); + ps2.executeQueryAsync(res -> { + ResultSet rs2 = res.getResult(); + try { + while (rs2.next()) { + System.out.println("f1=" + rs2.getInt(1) + " f2=" + rs2.getLong(2)); + } + ps2.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + }); + } +} diff --git a/lealone-test/src/test/java/org/lealone/test/benchmark/AsyncBenchmark.java b/lealone-test/src/test/java/org/lealone/test/benchmark/AsyncBenchmark.java new file mode 100644 index 000000000..44af0e372 --- /dev/null +++ b/lealone-test/src/test/java/org/lealone/test/benchmark/AsyncBenchmark.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.test.benchmark; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import org.lealone.async.AsyncHandler; +import org.lealone.async.AsyncResult; +import org.lealone.client.jdbc.JdbcStatement; +import org.lealone.test.TestBase; + +public class AsyncBenchmark { + + public static void main(String[] args) throws Exception { + run(); + } + + static Random random = new Random(); + + static class MyThread extends Thread { + JdbcStatement stmt; + Connection conn; + long read_time; + long random_read_time; + long write_time; + int start; + int end; + + MyThread(int start, int count) throws Exception { + super("MyThread-" + start); + conn = new TestBase().getConnection(); + stmt = (JdbcStatement) conn.createStatement(); + this.start = start; + this.end = start + count; + } + + void write() throws Exception { + CountDownLatch latch = new CountDownLatch(end - start); + long t1 = System.currentTimeMillis(); + for (int i = start; i < end; i++) { + String sql = "INSERT INTO test(f1, f2) VALUES(" + i + "," + i * 10 + ")"; + stmt.executeUpdateAsync(sql, res -> { + latch.countDown(); + }); + } + latch.await(); + + long t2 = System.currentTimeMillis(); + write_time = t2 - t1; + System.out.println(getName() + " write end, time=" + write_time + " ms"); + } + + void read(boolean random) throws Exception { + CountDownLatch latch = new CountDownLatch(end - start); + long t1 = System.currentTimeMillis(); + + AsyncHandler> handler = ac -> { + ResultSet rs = ac.getResult(); + try { + while (rs.next()) { + // System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); + } + latch.countDown(); + } catch (Exception e) { + e.printStackTrace(); + } + }; + for (int i = start; i < end; i++) { + if (!random) + stmt.executeQueryAsync("SELECT * FROM test where f1 = " + i, handler); + else + stmt.executeQueryAsync("SELECT * FROM test where f1 = " + AsyncBenchmark.random.nextInt(end), + handler); + } + latch.await(); + long t2 = System.currentTimeMillis(); + + if (random) + random_read_time = t2 - t1; + else + read_time = t2 - t1; + if (random) + System.out.println(getName() + " random read end, time=" + random_read_time + " ms"); + else + System.out.println(getName() + " read end, time=" + read_time + " ms"); + } + + @Override + public void run() { + try { + write(); + // read(false); + // read(true); + stmt.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + static void run() throws Exception { + Connection conn = new TestBase().getConnection(); + Statement stmt = conn.createStatement(); + stmt.executeUpdate("DROP TABLE IF EXISTS test"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); + stmt.close(); + + int threadsCount = 4; // Runtime.getRuntime().availableProcessors() * 4; + int loop = 1000; + + MyThread[] threads = new MyThread[threadsCount]; + for (int i = 0; i < threadsCount; i++) { + threads[i] = new MyThread(i * loop, loop); + } + + for (int i = 0; i < threadsCount; i++) { + threads[i].start(); + } + for (int i = 0; i < threadsCount; i++) { + threads[i].join(); + } + conn.close(); + + long write_sum = 0; + for (int i = 0; i < threadsCount; i++) { + write_sum += threads[i].write_time; + } + + long read_sum = 0; + for (int i = 0; i < threadsCount; i++) { + read_sum += threads[i].read_time; + } + long random_read_sum = 0; + for (int i = 0; i < threadsCount; i++) { + random_read_sum += threads[i].random_read_time; + } + + System.out.println(); + System.out.println("threads: " + threadsCount + ", loop: " + loop + ", rows: " + (threadsCount * loop)); + System.out.println("=========================================================="); + System.out.println("write_sum=" + write_sum + ", avg=" + (write_sum / threadsCount) + " ms"); + System.out.println("read_sum=" + read_sum + ", avg=" + (read_sum / threadsCount) + " ms"); + System.out.println("random_read_sum=" + random_read_sum + ", avg=" + (random_read_sum / threadsCount) + " ms"); + } + +} diff --git a/lealone-test/src/test/java/org/lealone/test/benchmark/SyncBenchmark.java b/lealone-test/src/test/java/org/lealone/test/benchmark/SyncBenchmark.java new file mode 100644 index 000000000..b2652bfa2 --- /dev/null +++ b/lealone-test/src/test/java/org/lealone/test/benchmark/SyncBenchmark.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.test.benchmark; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.Random; + +import org.lealone.test.TestBase; + +public class SyncBenchmark { + + public static void main(String[] args) throws Exception { + run(); + } + + static Random random = new Random(); + + static class MyThread extends Thread { + Statement stmt; + Connection conn; + long read_time; + long random_read_time; + long write_time; + int start; + int end; + + MyThread(int start, int count) throws Exception { + super("MyThread-" + start); + conn = new TestBase().getConnection(); + stmt = conn.createStatement(); + this.start = start; + this.end = start + count; + } + + void write() throws Exception { + long t1 = System.currentTimeMillis(); + for (int i = start; i < end; i++) { + String sql = "INSERT INTO test(f1, f2) VALUES(" + i + "," + i * 10 + ")"; + stmt.executeUpdate(sql); + } + + long t2 = System.currentTimeMillis(); + write_time = t2 - t1; + System.out.println(getName() + " write end, time=" + write_time + " ms"); + } + + void read(boolean random) throws Exception { + long t1 = System.currentTimeMillis(); + for (int i = start; i < end; i++) { + ResultSet rs; + if (!random) + rs = stmt.executeQuery("SELECT * FROM test where f1 = " + i); + else + rs = stmt.executeQuery("SELECT * FROM test where f1 = " + SyncBenchmark.random.nextInt(end)); + while (rs.next()) { + // System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); + } + } + + long t2 = System.currentTimeMillis(); + + if (random) + random_read_time = t2 - t1; + else + read_time = t2 - t1; + if (random) + System.out.println(getName() + " random read end, time=" + random_read_time + " ms"); + else + System.out.println(getName() + " read end, time=" + read_time + " ms"); + } + + @Override + public void run() { + try { + write(); + read(false); + read(true); + stmt.close(); + conn.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } + } + + static void run() throws Exception { + Connection conn = new TestBase().getConnection(); + Statement stmt = conn.createStatement(); + stmt.executeUpdate("DROP TABLE IF EXISTS test"); + stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); + stmt.close(); + + int threadsCount = 1; // Runtime.getRuntime().availableProcessors() * 4; + int loop = 1000; + + MyThread[] threads = new MyThread[threadsCount]; + for (int i = 0; i < threadsCount; i++) { + threads[i] = new MyThread(i * loop, loop); + } + + for (int i = 0; i < threadsCount; i++) { + threads[i].start(); + } + for (int i = 0; i < threadsCount; i++) { + threads[i].join(); + } + conn.close(); + + long write_sum = 0; + for (int i = 0; i < threadsCount; i++) { + write_sum += threads[i].write_time; + } + + long read_sum = 0; + for (int i = 0; i < threadsCount; i++) { + read_sum += threads[i].read_time; + } + long random_read_sum = 0; + for (int i = 0; i < threadsCount; i++) { + random_read_sum += threads[i].random_read_time; + } + + System.out.println(); + System.out.println("threads: " + threadsCount + ", loop: " + loop + ", rows: " + (threadsCount * loop)); + System.out.println("=========================================================="); + System.out.println("write_sum=" + write_sum + ", avg=" + (write_sum / threadsCount) + " ms"); + System.out.println("read_sum=" + read_sum + ", avg=" + (read_sum / threadsCount) + " ms"); + System.out.println("random_read_sum=" + random_read_sum + ", avg=" + (random_read_sum / threadsCount) + " ms"); + } +} diff --git a/lealone-test/src/test/java/org/lealone/test/misc/CRUDExample.java b/lealone-test/src/test/java/org/lealone/test/misc/CRUDExample.java index 66092fc30..691a00fe9 100644 --- a/lealone-test/src/test/java/org/lealone/test/misc/CRUDExample.java +++ b/lealone-test/src/test/java/org/lealone/test/misc/CRUDExample.java @@ -20,25 +20,20 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; -import java.util.Random; -import java.util.concurrent.CountDownLatch; import org.junit.Assert; import org.lealone.test.TestBase; public class CRUDExample { - public static void main(String[] args) throws Exception { - // crud(new TestBase().enableTrace(TraceSystem.DEBUG).getConnection()); - // crud0(); - benchmark(); - // for (int i = 2; i < 500; i++) - // System.out.println("INSERT INTO test(f1, f2) VALUES(" + i + ", 1);"); + public static void main(String[] args) throws Exception { + Connection conn = new TestBase().getConnection(); + crud(conn); } public static void crud(Connection conn) throws Exception { Statement stmt = conn.createStatement(); - + stmt.executeUpdate("DROP TABLE IF EXISTS test"); stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); stmt.executeUpdate("INSERT INTO test(f1, f2) VALUES(1, 1)"); stmt.executeUpdate("UPDATE test SET f2 = 2 WHERE f1 = 1"); @@ -56,180 +51,4 @@ public static void crud(Connection conn) throws Exception { conn.close(); } - static void crud0() throws Exception { - Connection conn = new TestBase().getConnection(); - Statement stmt = conn.createStatement(); - ResultSet rs; - - stmt.executeUpdate("DROP TABLE IF EXISTS test"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); - stmt.executeUpdate("CREATE memory TABLE IF NOT EXISTS test2 (f1 int primary key, f2 long)"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test3 (f1 int primary key, f2 long)"); - - stmt.executeUpdate("CREATE INDEX IF NOT EXISTS test_f2 ON test(f2)"); - - // stmt.executeUpdate("DELETE FROM test"); - - for (int i = 1; i <= 10; i++) { - stmt.executeUpdate("INSERT INTO test(f1, f2) VALUES(" + i + "," + i * 10 + ")"); - } - - stmt.executeUpdate("UPDATE test SET f2 = 1 where f1 = 1"); - - // stmt.executeUpdate("UPDATE test SET f2 = 1 where f1 >= 2"); - - // rs = stmt.executeQuery("SELECT * FROM test where f1 <= 3"); - rs = stmt.executeQuery("SELECT * FROM test where f1 = 3"); - while (rs.next()) { - System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); - } - rs.close(); - stmt.executeUpdate("DELETE FROM test WHERE f1 = 1"); - - rs = stmt.executeQuery("SELECT count(*) FROM test"); - while (rs.next()) { - System.out.println("count=" + rs.getInt(1)); - } - - rs.close(); - - rs = stmt.executeQuery("SELECT count(*) FROM test where f2=20"); - while (rs.next()) { - System.out.println("count=" + rs.getInt(1)); - } - - rs.close(); - - Connection conn2 = new TestBase().getConnection(); - Statement stmt2 = conn2.createStatement(); - - rs = stmt2.executeQuery("SELECT count(*) FROM test"); - while (rs.next()) { - System.out.println("count=" + rs.getInt(1)); - } - - stmt2.close(); - conn2.close(); - - stmt.close(); - conn.close(); - } - - static Random random = new Random(); - static CountDownLatch latch; - - static class MyThread extends Thread { - Statement stmt; - Connection conn; - long read_time; - long randow_read_time; - long write_time; - int start; - int end; - - MyThread(int start, int count) throws Exception { - super("MyThread-" + start); - conn = new TestBase().getConnection(); - stmt = conn.createStatement(); - this.start = start; - this.end = start + count; - } - - void write() throws Exception { - long t1 = System.currentTimeMillis(); - for (int i = start; i < end; i++) { - String sql = "INSERT INTO test(f1, f2) VALUES(" + i + "," + i * 10 + ")"; - stmt.executeUpdate(sql); - } - - long t2 = System.currentTimeMillis(); - write_time = t2 - t1; - System.out.println(getName() + " write end, time=" + write_time + " ms"); - } - - void read(boolean randow) throws Exception { - long t1 = System.currentTimeMillis(); - for (int i = start; i < end; i++) { - ResultSet rs; - if (!randow) - rs = stmt.executeQuery("SELECT * FROM test where f1 = " + i); - else - rs = stmt.executeQuery("SELECT * FROM test where f1 = " + random.nextInt(end)); - while (rs.next()) { - // System.out.println("f1=" + rs.getInt(1) + " f2=" + rs.getLong(2)); - } - } - - long t2 = System.currentTimeMillis(); - - if (randow) - randow_read_time = t2 - t1; - else - read_time = t2 - t1; - if (randow) - System.out.println(getName() + " randow read end, time=" + randow_read_time + " ms"); - else - System.out.println(getName() + " read end, time=" + read_time + " ms"); - } - - @Override - public void run() { - try { - write(); - read(false); - read(true); - stmt.close(); - conn.close(); - latch.countDown(); - } catch (Exception e) { - e.printStackTrace(); - } - } - } - - static void benchmark() throws Exception { - Connection conn = new TestBase().getConnection(); - Statement stmt = conn.createStatement(); - stmt.executeUpdate("DROP TABLE IF EXISTS test"); - stmt.executeUpdate("CREATE TABLE IF NOT EXISTS test (f1 int primary key, f2 long)"); - stmt.close(); - - int threadsCount = 10;// Runtime.getRuntime().availableProcessors() * 4;// 10; - int loop = 500; - latch = new CountDownLatch(threadsCount); - - MyThread[] threads = new MyThread[threadsCount]; - for (int i = 0; i < threadsCount; i++) { - threads[i] = new MyThread(i * loop, loop); - } - - for (int i = 0; i < threadsCount; i++) { - threads[i].start(); - } - - latch.await(); - conn.close(); - - long write_sum = 0; - for (int i = 0; i < threadsCount; i++) { - write_sum += threads[i].write_time; - } - - long read_sum = 0; - for (int i = 0; i < threadsCount; i++) { - read_sum += threads[i].read_time; - } - long randow_read_sum = 0; - for (int i = 0; i < threadsCount; i++) { - randow_read_sum += threads[i].randow_read_time; - } - - System.out.println(); - System.out.println("threads: " + threadsCount + ", loop: " + loop + ", rows: " + (threadsCount * loop)); - System.out.println("=========================================================="); - System.out.println("write_sum=" + write_sum + ", avg=" + (write_sum / threadsCount) + " ms"); - System.out.println("read_sum=" + read_sum + ", avg=" + (read_sum / threadsCount) + " ms"); - System.out.println("randow_read_sum=" + randow_read_sum + ", avg=" + (randow_read_sum / threadsCount) + " ms"); - } - }