diff --git a/lealone-cluster/src/main/java/org/lealone/cluster/router/DefaultRouter.java b/lealone-cluster/src/main/java/org/lealone/cluster/router/P2PRouter.java similarity index 98% rename from lealone-cluster/src/main/java/org/lealone/cluster/router/DefaultRouter.java rename to lealone-cluster/src/main/java/org/lealone/cluster/router/P2PRouter.java index 44068d0b5..39b44837c 100644 --- a/lealone-cluster/src/main/java/org/lealone/cluster/router/DefaultRouter.java +++ b/lealone-cluster/src/main/java/org/lealone/cluster/router/P2PRouter.java @@ -57,15 +57,15 @@ import com.google.common.collect.Iterables; -public class DefaultRouter implements Router { +public class P2PRouter implements Router { private static final Random random = new Random(System.currentTimeMillis()); - private static final DefaultRouter INSTANCE = new DefaultRouter(); + private static final P2PRouter INSTANCE = new P2PRouter(); - public static DefaultRouter getInstance() { + public static P2PRouter getInstance() { return INSTANCE; } - private DefaultRouter() { + private P2PRouter() { } @Override diff --git a/lealone-cluster/src/main/java/org/lealone/cluster/service/LealoneDaemon.java b/lealone-cluster/src/main/java/org/lealone/cluster/service/LealoneDaemon.java index 79430d651..f073dd7be 100644 --- a/lealone-cluster/src/main/java/org/lealone/cluster/service/LealoneDaemon.java +++ b/lealone-cluster/src/main/java/org/lealone/cluster/service/LealoneDaemon.java @@ -4,9 +4,10 @@ import org.lealone.cluster.config.Config; import org.lealone.cluster.config.DatabaseDescriptor; -import org.lealone.cluster.router.DefaultRouter; +import org.lealone.cluster.router.P2PRouter; import org.lealone.engine.Session; import org.lealone.server.TcpServer; +import org.lealone.transaction.TransactionalRouter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,8 +21,7 @@ public static void main(String[] args) { try { if (DatabaseDescriptor.loadConfig().isClusterMode()) { - Session.setClusterMode(true); - Session.setRouter(DefaultRouter.getInstance()); + Session.setRouter(new TransactionalRouter(P2PRouter.getInstance())); StorageService.instance.initServer(); } diff --git a/lealone-server/pom.xml b/lealone-server/pom.xml index 4c45410ec..ad1b4a703 100644 --- a/lealone-server/pom.xml +++ b/lealone-server/pom.xml @@ -14,7 +14,7 @@ org.lealone - lealone-sql + lealone-transaction diff --git a/lealone-sql/src/main/java/org/lealone/command/dml/Insert.java b/lealone-sql/src/main/java/org/lealone/command/dml/Insert.java index 063fa733f..2129f6a2f 100644 --- a/lealone-sql/src/main/java/org/lealone/command/dml/Insert.java +++ b/lealone-sql/src/main/java/org/lealone/command/dml/Insert.java @@ -416,4 +416,8 @@ public void setInsertFromSelect(boolean value) { public boolean isCacheable() { return true; } + + public boolean isBatch() { + return query != null || list.size() > 1; // || table.doesSecondaryIndexExist(); + } } diff --git a/lealone-sql/src/main/java/org/lealone/engine/Session.java b/lealone-sql/src/main/java/org/lealone/engine/Session.java index 54a0178b8..d3cd09391 100644 --- a/lealone-sql/src/main/java/org/lealone/engine/Session.java +++ b/lealone-sql/src/main/java/org/lealone/engine/Session.java @@ -1358,6 +1358,7 @@ public static Router getRouter() { public static void setRouter(Router r) { router = r; + setClusterMode(r != null); } private static boolean isClusterMode; diff --git a/lealone-storage/hbase/src/main/java/org/lealone/hbase/command/router/MasterSlaveRouter.java b/lealone-storage/hbase/src/main/java/org/lealone/hbase/command/router/MasterSlaveRouter.java new file mode 100644 index 000000000..dea0bcc86 --- /dev/null +++ b/lealone-storage/hbase/src/main/java/org/lealone/hbase/command/router/MasterSlaveRouter.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.hbase.command.router; + +import org.lealone.command.ddl.DefineCommand; +import org.lealone.command.dml.Delete; +import org.lealone.command.dml.Insert; +import org.lealone.command.dml.Select; +import org.lealone.command.dml.Update; +import org.lealone.command.router.Router; +import org.lealone.result.ResultInterface; + +public class MasterSlaveRouter implements Router { + private static final MasterSlaveRouter INSTANCE = new MasterSlaveRouter(); + + public static MasterSlaveRouter getInstance() { + return INSTANCE; + } + + private MasterSlaveRouter() { + } + + @Override + public int executeDefineCommand(DefineCommand defineCommand) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int executeInsert(Insert insert) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int executeDelete(Delete delete) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public int executeUpdate(Update update) { + // TODO Auto-generated method stub + return 0; + } + + @Override + public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) { + // TODO Auto-generated method stub + return null; + } + +} diff --git a/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseMasterObserver.java b/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseMasterObserver.java index 98310a0cd..167ab3d74 100644 --- a/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseMasterObserver.java +++ b/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseMasterObserver.java @@ -23,8 +23,11 @@ import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.master.HMaster; +import org.lealone.engine.Session; +import org.lealone.hbase.command.router.MasterSlaveRouter; import org.lealone.hbase.server.HBaseTcpServer; import org.lealone.hbase.transaction.TimestampService; +import org.lealone.transaction.TransactionalRouter; /** * @@ -47,6 +50,8 @@ public static TimestampService getTimestampService() { @Override public synchronized void start(CoprocessorEnvironment env) throws IOException { + Session.setRouter(new TransactionalRouter(MasterSlaveRouter.getInstance())); + if (server == null) { HMaster m = (HMaster) ((MasterCoprocessorEnvironment) env).getMasterServices(); hostAndPort = m.getServerName().getHostAndPort(); diff --git a/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseRegionServer.java b/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseRegionServer.java index 3acc61e4e..39be382e9 100644 --- a/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseRegionServer.java +++ b/lealone-storage/hbase/src/main/java/org/lealone/hbase/engine/HBaseRegionServer.java @@ -21,9 +21,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.lealone.engine.Session; +import org.lealone.hbase.command.router.MasterSlaveRouter; import org.lealone.hbase.server.HBasePgServer; import org.lealone.hbase.server.HBaseTcpServer; import org.lealone.hbase.transaction.TimestampService; +import org.lealone.transaction.TransactionalRouter; /** * @@ -56,6 +59,8 @@ public TimestampService getTimestampService() { @Override public void run() { + Session.setRouter(new TransactionalRouter(MasterSlaveRouter.getInstance())); + HBaseTcpServer server = new HBaseTcpServer(this); server.start(); diff --git a/lealone-transaction/src/main/java/org/lealone/transaction/TransactionalRouter.java b/lealone-transaction/src/main/java/org/lealone/transaction/TransactionalRouter.java new file mode 100644 index 000000000..2eb02b0e2 --- /dev/null +++ b/lealone-transaction/src/main/java/org/lealone/transaction/TransactionalRouter.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.lealone.transaction; + +import org.lealone.command.ddl.DefineCommand; +import org.lealone.command.dml.Delete; +import org.lealone.command.dml.Insert; +import org.lealone.command.dml.Select; +import org.lealone.command.dml.TransactionCommand; +import org.lealone.command.dml.Update; +import org.lealone.command.router.Router; +import org.lealone.engine.Session; +import org.lealone.message.DbException; +import org.lealone.result.ResultInterface; + +public class TransactionalRouter implements Router { + private final Router nestedRouter; + + public TransactionalRouter(Router nestedRouter) { + this.nestedRouter = nestedRouter; + } + + @Override + public int executeDefineCommand(DefineCommand defineCommand) { + return nestedRouter.executeDefineCommand(defineCommand); + } + + @Override + public int executeInsert(Insert insert) { + boolean isTopTransaction = false; + boolean isNestedTransaction = false; + Session session = insert.getSession(); + try { + if (insert.isBatch()) { + if (session.getAutoCommit()) { + session.setAutoCommit(false); + isTopTransaction = true; + } else { + isNestedTransaction = true; + session.addSavepoint(TransactionCommand.INTERNAL_SAVEPOINT); + } + } + int updateCount = nestedRouter.executeInsert(insert); + if (isTopTransaction) + session.commit(false); + return updateCount; + } catch (Exception e) { + if (isTopTransaction) + session.rollback(); + + //嵌套事务出错时提前rollback + if (isNestedTransaction) + session.rollbackToSavepoint(TransactionCommand.INTERNAL_SAVEPOINT); + + throw DbException.convert(e); + } finally { + if (isTopTransaction) + session.setAutoCommit(true); + } + } + + @Override + public int executeDelete(Delete delete) { + return executeUpdateOrDelete(null, delete); + } + + @Override + public int executeUpdate(Update update) { + return executeUpdateOrDelete(update, null); + } + + private int executeUpdateOrDelete(Update update, Delete delete) { + boolean isTopTransaction = false; + boolean isNestedTransaction = false; + + Session session; + if (update != null) + session = update.getSession(); + else + session = delete.getSession(); + try { + if (session.getAutoCommit()) { + session.setAutoCommit(false); + isTopTransaction = true; + } else { + isNestedTransaction = true; + session.addSavepoint(TransactionCommand.INTERNAL_SAVEPOINT); + } + int updateCount; + if (update != null) + updateCount = nestedRouter.executeUpdate(update); + else + updateCount = nestedRouter.executeDelete(delete); + if (isTopTransaction) + session.commit(false); + return updateCount; + } catch (Exception e) { + if (isTopTransaction) + session.rollback(); + + //嵌套事务出错时提前rollback + if (isNestedTransaction) + session.rollbackToSavepoint(TransactionCommand.INTERNAL_SAVEPOINT); + + throw DbException.convert(e); + } finally { + if (isTopTransaction) + session.setAutoCommit(true); + } + + } + + @Override + public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) { + return nestedRouter.executeSelect(select, maxRows, scrollable); + } + +}