Skip to content

Commit

Permalink
新增TransactionalRouter
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 1, 2015
1 parent ec1a116 commit e77a2e8
Show file tree
Hide file tree
Showing 9 changed files with 224 additions and 8 deletions.
Expand Up @@ -57,15 +57,15 @@


import com.google.common.collect.Iterables; import com.google.common.collect.Iterables;


public class DefaultRouter implements Router { public class P2PRouter implements Router {
private static final Random random = new Random(System.currentTimeMillis()); private static final Random random = new Random(System.currentTimeMillis());
private static final DefaultRouter INSTANCE = new DefaultRouter(); private static final P2PRouter INSTANCE = new P2PRouter();


public static DefaultRouter getInstance() { public static P2PRouter getInstance() {
return INSTANCE; return INSTANCE;
} }


private DefaultRouter() { private P2PRouter() {
} }


@Override @Override
Expand Down
Expand Up @@ -4,9 +4,10 @@


import org.lealone.cluster.config.Config; import org.lealone.cluster.config.Config;
import org.lealone.cluster.config.DatabaseDescriptor; import org.lealone.cluster.config.DatabaseDescriptor;
import org.lealone.cluster.router.DefaultRouter; import org.lealone.cluster.router.P2PRouter;
import org.lealone.engine.Session; import org.lealone.engine.Session;
import org.lealone.server.TcpServer; import org.lealone.server.TcpServer;
import org.lealone.transaction.TransactionalRouter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -20,8 +21,7 @@ public static void main(String[] args) {


try { try {
if (DatabaseDescriptor.loadConfig().isClusterMode()) { if (DatabaseDescriptor.loadConfig().isClusterMode()) {
Session.setClusterMode(true); Session.setRouter(new TransactionalRouter(P2PRouter.getInstance()));
Session.setRouter(DefaultRouter.getInstance());
StorageService.instance.initServer(); StorageService.instance.initServer();
} }


Expand Down
2 changes: 1 addition & 1 deletion lealone-server/pom.xml
Expand Up @@ -14,7 +14,7 @@
<dependencies> <dependencies>
<dependency> <dependency>
<groupId>org.lealone</groupId> <groupId>org.lealone</groupId>
<artifactId>lealone-sql</artifactId> <artifactId>lealone-transaction</artifactId>
</dependency> </dependency>
</dependencies> </dependencies>
</project> </project>
4 changes: 4 additions & 0 deletions lealone-sql/src/main/java/org/lealone/command/dml/Insert.java
Expand Up @@ -416,4 +416,8 @@ public void setInsertFromSelect(boolean value) {
public boolean isCacheable() { public boolean isCacheable() {
return true; return true;
} }

public boolean isBatch() {
return query != null || list.size() > 1; // || table.doesSecondaryIndexExist();
}
} }
1 change: 1 addition & 0 deletions lealone-sql/src/main/java/org/lealone/engine/Session.java
Expand Up @@ -1358,6 +1358,7 @@ public static Router getRouter() {


public static void setRouter(Router r) { public static void setRouter(Router r) {
router = r; router = r;
setClusterMode(r != null);
} }


private static boolean isClusterMode; private static boolean isClusterMode;
Expand Down
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.hbase.command.router;

import org.lealone.command.ddl.DefineCommand;
import org.lealone.command.dml.Delete;
import org.lealone.command.dml.Insert;
import org.lealone.command.dml.Select;
import org.lealone.command.dml.Update;
import org.lealone.command.router.Router;
import org.lealone.result.ResultInterface;

public class MasterSlaveRouter implements Router {
private static final MasterSlaveRouter INSTANCE = new MasterSlaveRouter();

public static MasterSlaveRouter getInstance() {
return INSTANCE;
}

private MasterSlaveRouter() {
}

@Override
public int executeDefineCommand(DefineCommand defineCommand) {
// TODO Auto-generated method stub
return 0;
}

@Override
public int executeInsert(Insert insert) {
// TODO Auto-generated method stub
return 0;
}

@Override
public int executeDelete(Delete delete) {
// TODO Auto-generated method stub
return 0;
}

@Override
public int executeUpdate(Update update) {
// TODO Auto-generated method stub
return 0;
}

@Override
public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) {
// TODO Auto-generated method stub
return null;
}

}
Expand Up @@ -23,8 +23,11 @@
import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver; import org.apache.hadoop.hbase.coprocessor.BaseMasterObserver;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.HMaster;
import org.lealone.engine.Session;
import org.lealone.hbase.command.router.MasterSlaveRouter;
import org.lealone.hbase.server.HBaseTcpServer; import org.lealone.hbase.server.HBaseTcpServer;
import org.lealone.hbase.transaction.TimestampService; import org.lealone.hbase.transaction.TimestampService;
import org.lealone.transaction.TransactionalRouter;


/** /**
* *
Expand All @@ -47,6 +50,8 @@ public static TimestampService getTimestampService() {


@Override @Override
public synchronized void start(CoprocessorEnvironment env) throws IOException { public synchronized void start(CoprocessorEnvironment env) throws IOException {
Session.setRouter(new TransactionalRouter(MasterSlaveRouter.getInstance()));

if (server == null) { if (server == null) {
HMaster m = (HMaster) ((MasterCoprocessorEnvironment) env).getMasterServices(); HMaster m = (HMaster) ((MasterCoprocessorEnvironment) env).getMasterServices();
hostAndPort = m.getServerName().getHostAndPort(); hostAndPort = m.getServerName().getHostAndPort();
Expand Down
Expand Up @@ -21,9 +21,12 @@


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.lealone.engine.Session;
import org.lealone.hbase.command.router.MasterSlaveRouter;
import org.lealone.hbase.server.HBasePgServer; import org.lealone.hbase.server.HBasePgServer;
import org.lealone.hbase.server.HBaseTcpServer; import org.lealone.hbase.server.HBaseTcpServer;
import org.lealone.hbase.transaction.TimestampService; import org.lealone.hbase.transaction.TimestampService;
import org.lealone.transaction.TransactionalRouter;


/** /**
* *
Expand Down Expand Up @@ -56,6 +59,8 @@ public TimestampService getTimestampService() {


@Override @Override
public void run() { public void run() {
Session.setRouter(new TransactionalRouter(MasterSlaveRouter.getInstance()));

HBaseTcpServer server = new HBaseTcpServer(this); HBaseTcpServer server = new HBaseTcpServer(this);
server.start(); server.start();


Expand Down
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.lealone.transaction;

import org.lealone.command.ddl.DefineCommand;
import org.lealone.command.dml.Delete;
import org.lealone.command.dml.Insert;
import org.lealone.command.dml.Select;
import org.lealone.command.dml.TransactionCommand;
import org.lealone.command.dml.Update;
import org.lealone.command.router.Router;
import org.lealone.engine.Session;
import org.lealone.message.DbException;
import org.lealone.result.ResultInterface;

public class TransactionalRouter implements Router {
private final Router nestedRouter;

public TransactionalRouter(Router nestedRouter) {
this.nestedRouter = nestedRouter;
}

@Override
public int executeDefineCommand(DefineCommand defineCommand) {
return nestedRouter.executeDefineCommand(defineCommand);
}

@Override
public int executeInsert(Insert insert) {
boolean isTopTransaction = false;
boolean isNestedTransaction = false;
Session session = insert.getSession();
try {
if (insert.isBatch()) {
if (session.getAutoCommit()) {
session.setAutoCommit(false);
isTopTransaction = true;
} else {
isNestedTransaction = true;
session.addSavepoint(TransactionCommand.INTERNAL_SAVEPOINT);
}
}
int updateCount = nestedRouter.executeInsert(insert);
if (isTopTransaction)
session.commit(false);
return updateCount;
} catch (Exception e) {
if (isTopTransaction)
session.rollback();

//嵌套事务出错时提前rollback
if (isNestedTransaction)
session.rollbackToSavepoint(TransactionCommand.INTERNAL_SAVEPOINT);

throw DbException.convert(e);
} finally {
if (isTopTransaction)
session.setAutoCommit(true);
}
}

@Override
public int executeDelete(Delete delete) {
return executeUpdateOrDelete(null, delete);
}

@Override
public int executeUpdate(Update update) {
return executeUpdateOrDelete(update, null);
}

private int executeUpdateOrDelete(Update update, Delete delete) {
boolean isTopTransaction = false;
boolean isNestedTransaction = false;

Session session;
if (update != null)
session = update.getSession();
else
session = delete.getSession();
try {
if (session.getAutoCommit()) {
session.setAutoCommit(false);
isTopTransaction = true;
} else {
isNestedTransaction = true;
session.addSavepoint(TransactionCommand.INTERNAL_SAVEPOINT);
}
int updateCount;
if (update != null)
updateCount = nestedRouter.executeUpdate(update);
else
updateCount = nestedRouter.executeDelete(delete);
if (isTopTransaction)
session.commit(false);
return updateCount;
} catch (Exception e) {
if (isTopTransaction)
session.rollback();

//嵌套事务出错时提前rollback
if (isNestedTransaction)
session.rollbackToSavepoint(TransactionCommand.INTERNAL_SAVEPOINT);

throw DbException.convert(e);
} finally {
if (isTopTransaction)
session.setAutoCommit(true);
}

}

@Override
public ResultInterface executeSelect(Select select, int maxRows, boolean scrollable) {
return nestedRouter.executeSelect(select, maxRows, scrollable);
}

}

0 comments on commit e77a2e8

Please sign in to comment.