Skip to content

Commit

Permalink
重构HBase Update/Delete
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 1, 2015
1 parent e77a2e8 commit 20d1a6a
Show file tree
Hide file tree
Showing 24 changed files with 125 additions and 186 deletions.
20 changes: 0 additions & 20 deletions lealone-sql/src/main/java/org/lealone/command/Prepared.java
Expand Up @@ -427,26 +427,6 @@ public Command getCommand() {
return command; return command;
} }


protected String[] localRegionNames;

public String[] getLocalRegionNames() {
return localRegionNames;
}

public void setLocalRegionNames(String[] localRegionNames) {
this.localRegionNames = localRegionNames;
}

private boolean executeDirec;

public boolean isExecuteDirec() {
return executeDirec;
}

public void setExecuteDirec(boolean executeDirec) {
this.executeDirec = executeDirec;
}

private boolean local = true; private boolean local = true;


public boolean isLocal() { public boolean isLocal() {
Expand Down
Expand Up @@ -1123,7 +1123,7 @@ public boolean supportsAlterColumnWithCopyData() {
return true; return true;
} }


public boolean isDistributed() { public boolean supportsSharding() {
return false; return false;
} }


Expand Down
Expand Up @@ -511,14 +511,14 @@ public String getTableName() {
} }


@Override @Override
public boolean isDistributed() { public boolean supportsSharding() {
for (Table t : tables) { for (Table t : tables) {
if (t instanceof TableView) { if (t instanceof TableView) {
return ((TableView) t).isDistributed(); return ((TableView) t).supportsSharding();
} else { } else {
return t.isDistributed(); return t.supportsSharding();
} }
} }
return super.isDistributed(); return super.supportsSharding();
} }
} }
3 changes: 2 additions & 1 deletion lealone-sql/src/main/java/org/lealone/engine/MetaRecord.java
Expand Up @@ -58,7 +58,7 @@ public void execute(Database db, Session systemSession, DatabaseEventListener li
Prepared command = systemSession.prepare(sql); Prepared command = systemSession.prepare(sql);
//System.out.println("execute id: " + id + ", sql=" + sql); //System.out.println("execute id: " + id + ", sql=" + sql);
command.setObjectId(id); command.setObjectId(id);
command.setExecuteDirec(true); command.setLocal(true);
command.update(); command.update();
} catch (DbException e) { } catch (DbException e) {
e = e.addSQL(sql); e = e.addSQL(sql);
Expand Down Expand Up @@ -91,6 +91,7 @@ public String getSQL() {
* @param other the other record * @param other the other record
* @return -1, 0, or 1 * @return -1, 0, or 1
*/ */
@Override
public int compareTo(MetaRecord other) { public int compareTo(MetaRecord other) {
int c1 = getCreateOrder(); int c1 = getCreateOrder();
int c2 = other.getCreateOrder(); int c2 = other.getCreateOrder();
Expand Down
Expand Up @@ -110,7 +110,7 @@ public long getRowCountApproximation() {
} }


@Override @Override
public boolean isDistributed() { public boolean supportsSharding() {
return true; return true;
} }


Expand Down
Expand Up @@ -129,7 +129,7 @@ public long getRowCountApproximation() {
} }


@Override @Override
public boolean isDistributed() { public boolean supportsSharding() {
return true; return true;
} }


Expand Down
Expand Up @@ -17,6 +17,13 @@
*/ */
package org.lealone.hbase.command; package org.lealone.hbase.command;


import static org.lealone.hbase.engine.HBaseConstants.COMMAND_PARALLEL_CORE_POOL_SIZE;
import static org.lealone.hbase.engine.HBaseConstants.COMMAND_PARALLEL_KEEP_ALIVE_TIME;
import static org.lealone.hbase.engine.HBaseConstants.COMMAND_PARALLEL_MAX_POOL_SIZE;
import static org.lealone.hbase.engine.HBaseConstants.DEFAULT_COMMAND_PARALLEL_CORE_POOL_SIZE;
import static org.lealone.hbase.engine.HBaseConstants.DEFAULT_COMMAND_PARALLEL_KEEP_ALIVE_TIME;
import static org.lealone.hbase.engine.HBaseConstants.DEFAULT_COMMAND_PARALLEL_MAX_POOL_SIZE;

import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
Expand All @@ -42,8 +49,6 @@
import org.lealone.result.ResultInterface; import org.lealone.result.ResultInterface;
import org.lealone.util.New; import org.lealone.util.New;


import static org.lealone.hbase.engine.HBaseConstants.*;

public class CommandParallel { public class CommandParallel {
private final static ThreadPoolExecutor pool = initPool(); private final static ThreadPoolExecutor pool = initPool();


Expand Down Expand Up @@ -76,10 +81,10 @@ public static ResultInterface executeQuery(Session session, SQLRoutingInfo sqlRo
if (sqlRoutingInfo.localRegions != null) { if (sqlRoutingInfo.localRegions != null) {
for (String regionName : sqlRoutingInfo.localRegions) { for (String regionName : sqlRoutingInfo.localRegions) {
Prepared p = session.prepare(HBaseUtils.getPlanSQL(select), true); Prepared p = session.prepare(HBaseUtils.getPlanSQL(select), true);
p.setExecuteDirec(true); p.setLocal(true);
p.setFetchSize(select.getFetchSize()); p.setFetchSize(select.getFetchSize());
if (p instanceof WithWhereClause) { if (p instanceof WithWhereClause) {
((WithWhereClause) p).getWhereClauseSupport().setRegionName(regionName); ((WithWhereClause) p).getWhereClauseSupport().setCurrentRegionName(regionName);
} }
commands.add(new CommandWrapper(p)); commands.add(new CommandWrapper(p));
} }
Expand All @@ -96,6 +101,7 @@ public static ResultInterface executeQuery(Session session, SQLRoutingInfo sqlRo
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
final CommandInterface c = commands.get(i); final CommandInterface c = commands.get(i);
futures.add(pool.submit(new Callable<ResultInterface>() { futures.add(pool.submit(new Callable<ResultInterface>() {
@Override
public ResultInterface call() throws Exception { public ResultInterface call() throws Exception {
return c.executeQuery(maxRows, scrollable); return c.executeQuery(maxRows, scrollable);
} }
Expand All @@ -114,7 +120,7 @@ public ResultInterface call() throws Exception {


String newSQL = select.getPlanSQL(true); String newSQL = select.getPlanSQL(true);
Select newSelect = (Select) session.prepare(newSQL, true); Select newSelect = (Select) session.prepare(newSQL, true);
newSelect.setExecuteDirec(true); newSelect.setLocal(true);


return new HBaseMergedResult(results, newSelect, select); return new HBaseMergedResult(results, newSelect, select);
} }
Expand All @@ -130,6 +136,7 @@ public static int executeUpdate(List<CommandInterface> commands) {
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
final CommandInterface c = commands.get(i); final CommandInterface c = commands.get(i);
futures.add(pool.submit(new Callable<Integer>() { futures.add(pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception { public Integer call() throws Exception {
return c.executeUpdate(); return c.executeUpdate();
} }
Expand All @@ -154,6 +161,7 @@ public static int executeUpdate(SQLRoutingInfo sqlRoutingInfo, Callable<Integer>
for (int i = 0; i < size - 1; i++) { for (int i = 0; i < size - 1; i++) {
final CommandInterface c = commands.get(i); final CommandInterface c = commands.get(i);
futures.add(pool.submit(new Callable<Integer>() { futures.add(pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception { public Integer call() throws Exception {
return c.executeUpdate(); return c.executeUpdate();
} }
Expand Down
Expand Up @@ -47,6 +47,7 @@
import org.lealone.hbase.command.dml.HBaseMerge; import org.lealone.hbase.command.dml.HBaseMerge;
import org.lealone.hbase.command.dml.HBaseSelect; import org.lealone.hbase.command.dml.HBaseSelect;
import org.lealone.hbase.command.dml.HBaseUpdate; import org.lealone.hbase.command.dml.HBaseUpdate;
import org.lealone.hbase.command.dml.WithWhereClause;
import org.lealone.hbase.engine.HBaseConstants; import org.lealone.hbase.engine.HBaseConstants;
import org.lealone.hbase.engine.HBaseDatabase; import org.lealone.hbase.engine.HBaseDatabase;
import org.lealone.hbase.engine.HBaseSession; import org.lealone.hbase.engine.HBaseSession;
Expand Down Expand Up @@ -75,7 +76,7 @@ protected Prepared parsePrepared(char first) {
private Prepared parseInTheRegion() { private Prepared parseInTheRegion() {
String[] regionNames = parseRegionNames(); String[] regionNames = parseRegionNames();
Prepared p = parsePrepared(); Prepared p = parsePrepared();
p.setLocalRegionNames(regionNames); ((WithWhereClause) p).setLocalRegionNames(regionNames);
return p; return p;
} }


Expand Down
Expand Up @@ -23,11 +23,11 @@
import org.lealone.command.Command; import org.lealone.command.Command;
import org.lealone.command.FrontendCommand; import org.lealone.command.FrontendCommand;
import org.lealone.command.ddl.DefineCommand; import org.lealone.command.ddl.DefineCommand;
import org.lealone.engine.Session;
import org.lealone.engine.FrontendSession; import org.lealone.engine.FrontendSession;
import org.lealone.engine.Session;
import org.lealone.expression.Parameter; import org.lealone.expression.Parameter;
import org.lealone.hbase.engine.HBaseSession;
import org.lealone.hbase.engine.FrontendSessionPool; import org.lealone.hbase.engine.FrontendSessionPool;
import org.lealone.hbase.engine.HBaseSession;
import org.lealone.hbase.util.HBaseUtils; import org.lealone.hbase.util.HBaseUtils;
import org.lealone.message.DbException; import org.lealone.message.DbException;
import org.lealone.result.ResultInterface; import org.lealone.result.ResultInterface;
Expand All @@ -47,7 +47,7 @@ public DefineCommandWrapper(Session session, DefineCommand dc, String sql) {


@Override @Override
public int update() { public int update() {
if (isExecuteDirec()) { if (isLocal()) {
return dc.update(); return dc.update();
} else if (session.isMaster()) { } else if (session.isMaster()) {
try { try {
Expand Down
Expand Up @@ -20,39 +20,25 @@
import org.lealone.command.dml.Delete; import org.lealone.command.dml.Delete;
import org.lealone.engine.Session; import org.lealone.engine.Session;


public class HBaseDelete extends Delete implements UpdateOrDelete { public class HBaseDelete extends Delete implements WithWhereClause {
private final UpdateOrDeleteSupport updateOrDeleteSupport; private final WhereClauseSupport whereClauseSupport = new WhereClauseSupport();


public HBaseDelete(Session session) { public HBaseDelete(Session session) {
super(session); super(session);
updateOrDeleteSupport = new UpdateOrDeleteSupport(session, this);
} }


@Override @Override
public void prepare() { public WhereClauseSupport getWhereClauseSupport() {
super.prepare(); return whereClauseSupport;
if (tableFilter.getTable().isDistributed())
updateOrDeleteSupport.postPrepare(tableFilter);
else
setExecuteDirec(true);
}

@Override
public int update() {
if (isExecuteDirec())
return super.update();
else
return updateOrDeleteSupport.update();
} }


@Override @Override
public WhereClauseSupport getWhereClauseSupport() { public String[] getLocalRegionNames() {
return updateOrDeleteSupport.getWhereClauseSupport(); return whereClauseSupport.getLocalRegionNames();
} }


@Override @Override
public int internalUpdate() { public void setLocalRegionNames(String[] localRegionNames) {
return super.update(); whereClauseSupport.setLocalRegionNames(localRegionNames);
} }

} }
Expand Up @@ -39,15 +39,15 @@ public void setSortedInsertMode(boolean sortedInsertMode) {
@Override @Override
public void prepare() { public void prepare() {
super.prepare(); super.prepare();
if (table.isDistributed()) if (table.supportsSharding())
insertOrMergeSupport.postPrepare(table, query, list, columns, null); insertOrMergeSupport.postPrepare(table, query, list, columns, null);
else else
setExecuteDirec(true); setLocal(true);
} }


@Override @Override
public int update() { public int update() {
if (isExecuteDirec()) if (isLocal())
return super.update(); return super.update();
else else
return insertOrMergeSupport.update(insertFromSelect, sortedInsertMode, this); return insertOrMergeSupport.update(insertFromSelect, sortedInsertMode, this);
Expand Down
Expand Up @@ -34,15 +34,15 @@ public HBaseMerge(Session session) {
@Override @Override
public void prepare() { public void prepare() {
super.prepare(); super.prepare();
if (table.isDistributed()) if (table.supportsSharding())
insertOrMergeSupport.postPrepare(table, query, list, columns, keys); insertOrMergeSupport.postPrepare(table, query, list, columns, keys);
else else
setExecuteDirec(true); setLocal(true);
} }


@Override @Override
public int update() { public int update() {
if (isExecuteDirec()) if (isLocal())
return super.update(); return super.update();
else else
return insertOrMergeSupport.update(false, false, this); return insertOrMergeSupport.update(false, false, this);
Expand Down
Expand Up @@ -39,23 +39,24 @@ public HBaseSelect(Session session) {
@Override @Override
public void prepare() { public void prepare() {
super.prepare(); super.prepare();
if (topTableFilter.getTable().isDistributed()) if (topTableFilter.getTable().supportsSharding())
whereClauseSupport.setTableFilter(topTableFilter); whereClauseSupport.setTableFilter(topTableFilter);
else else
setExecuteDirec(true); setLocal(true);
} }


@Override @Override
public ResultInterface query(int limit, ResultTarget target) { public ResultInterface query(int limit, ResultTarget target) {
boolean addRowToResultTarget = true; boolean addRowToResultTarget = true;
ResultInterface result; ResultInterface result;


if (isExecuteDirec()) { String[] localRegionNames = whereClauseSupport.getLocalRegionNames();
if (isLocal()) {
result = super.query(limit, target); result = super.query(limit, target);
addRowToResultTarget = false; addRowToResultTarget = false;
} else if (localRegionNames != null && localRegionNames.length != 0) { } else if (localRegionNames != null && localRegionNames.length != 0) {
if (localRegionNames.length == 1) { if (localRegionNames.length == 1) {
whereClauseSupport.setRegionName(localRegionNames[0]); whereClauseSupport.setCurrentRegionName(localRegionNames[0]);
result = super.query(limit, target); result = super.query(limit, target);
addRowToResultTarget = false; addRowToResultTarget = false;
} else { } else {
Expand All @@ -71,7 +72,7 @@ public ResultInterface query(int limit, ResultTarget target) {
} }


if (sqlRoutingInfo.localRegion != null) { if (sqlRoutingInfo.localRegion != null) {
whereClauseSupport.setRegionName(sqlRoutingInfo.localRegion); whereClauseSupport.setCurrentRegionName(sqlRoutingInfo.localRegion);
result = super.query(limit, target); result = super.query(limit, target);
addRowToResultTarget = false; addRowToResultTarget = false;
} else if (sqlRoutingInfo.remoteCommand != null) { } else if (sqlRoutingInfo.remoteCommand != null) {
Expand All @@ -95,4 +96,13 @@ public WhereClauseSupport getWhereClauseSupport() {
return whereClauseSupport; return whereClauseSupport;
} }


@Override
public String[] getLocalRegionNames() {
return whereClauseSupport.getLocalRegionNames();
}

@Override
public void setLocalRegionNames(String[] localRegionNames) {
whereClauseSupport.setLocalRegionNames(localRegionNames);
}
} }
Expand Up @@ -20,39 +20,25 @@
import org.lealone.command.dml.Update; import org.lealone.command.dml.Update;
import org.lealone.engine.Session; import org.lealone.engine.Session;


public class HBaseUpdate extends Update implements UpdateOrDelete { public class HBaseUpdate extends Update implements WithWhereClause {
private final UpdateOrDeleteSupport updateOrDeleteSupport; private final WhereClauseSupport whereClauseSupport = new WhereClauseSupport();


public HBaseUpdate(Session session) { public HBaseUpdate(Session session) {
super(session); super(session);
updateOrDeleteSupport = new UpdateOrDeleteSupport(session, this);
} }


@Override @Override
public void prepare() { public WhereClauseSupport getWhereClauseSupport() {
super.prepare(); return whereClauseSupport;
if (tableFilter.getTable().isDistributed())
updateOrDeleteSupport.postPrepare(tableFilter);
else
setExecuteDirec(true);
}

@Override
public int update() {
if (isExecuteDirec())
return super.update();
else
return updateOrDeleteSupport.update();
} }


@Override @Override
public WhereClauseSupport getWhereClauseSupport() { public String[] getLocalRegionNames() {
return updateOrDeleteSupport.getWhereClauseSupport(); return whereClauseSupport.getLocalRegionNames();
} }


@Override @Override
public int internalUpdate() { public void setLocalRegionNames(String[] localRegionNames) {
return super.update(); whereClauseSupport.setLocalRegionNames(localRegionNames);
} }

} }

0 comments on commit 20d1a6a

Please sign in to comment.