Skip to content

Commit

Permalink
改进Select在P2P+DHT环境下的路由实现
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Jan 1, 2015
1 parent ed22a32 commit ec1a116
Show file tree
Hide file tree
Showing 12 changed files with 218 additions and 314 deletions.
Expand Up @@ -17,7 +17,6 @@
*/
package org.lealone.cluster.concurrent;

import static org.lealone.cluster.config.DatabaseDescriptor.getConcurrentCounterWriters;
import static org.lealone.cluster.config.DatabaseDescriptor.getConcurrentReaders;
import static org.lealone.cluster.config.DatabaseDescriptor.getConcurrentWriters;

Expand Down Expand Up @@ -49,17 +48,17 @@ public class StageManager {

static {
stages.put(Stage.MUTATION, multiThreadedLowSignalStage(Stage.MUTATION, getConcurrentWriters()));
stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
//stages.put(Stage.COUNTER_MUTATION, multiThreadedLowSignalStage(Stage.COUNTER_MUTATION, getConcurrentCounterWriters()));
stages.put(Stage.READ, multiThreadedLowSignalStage(Stage.READ, getConcurrentReaders()));
stages.put(Stage.REQUEST_RESPONSE,
multiThreadedLowSignalStage(Stage.REQUEST_RESPONSE, FBUtilities.getAvailableProcessors()));
stages.put(Stage.INTERNAL_RESPONSE, multiThreadedStage(Stage.INTERNAL_RESPONSE, FBUtilities.getAvailableProcessors()));
// the rest are all single-threaded
stages.put(Stage.GOSSIP, new MetricsEnabledThreadPoolExecutor(Stage.GOSSIP));
stages.put(Stage.ANTI_ENTROPY, new MetricsEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
//stages.put(Stage.ANTI_ENTROPY, new MetricsEnabledThreadPoolExecutor(Stage.ANTI_ENTROPY));
stages.put(Stage.MIGRATION, new MetricsEnabledThreadPoolExecutor(Stage.MIGRATION));
stages.put(Stage.MISC, new MetricsEnabledThreadPoolExecutor(Stage.MISC));
stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, FBUtilities.getAvailableProcessors()));
//stages.put(Stage.READ_REPAIR, multiThreadedStage(Stage.READ_REPAIR, FBUtilities.getAvailableProcessors()));
stages.put(Stage.TRACING, tracingExecutor());
}

Expand Down

Large diffs are not rendered by default.

Expand Up @@ -54,7 +54,7 @@ private static void startTcpServer() throws Exception {

server.init(list.toArray(new String[list.size()]));
server.start();
logger.info("Lealone daemon started, listening tcp port: {}", server.getPort());
logger.info("Lealone TcpServer started, listening address: {}, port: {}", config.listen_address, server.getPort());
server.listen();
}

Expand Down
Expand Up @@ -6,6 +6,8 @@
*/
package org.lealone.command.ddl;

import java.util.concurrent.Callable;

import org.lealone.command.Prepared;
import org.lealone.engine.Session;
import org.lealone.result.ResultInterface;
Expand All @@ -14,7 +16,7 @@
* This class represents a non-transaction statement, for example a CREATE or
* DROP.
*/
public abstract class DefineCommand extends Prepared {
public abstract class DefineCommand extends Prepared implements Callable<Integer> {

/**
* The transactional behavior. The default is disabled, meaning the command
Expand All @@ -31,10 +33,12 @@ protected DefineCommand(Session session) {
super(session);
}

@Override
public boolean isReadOnly() {
return false;
}

@Override
public ResultInterface queryMeta() {
return null;
}
Expand All @@ -43,8 +47,13 @@ public void setTransactional(boolean transactional) {
this.transactional = transactional;
}

@Override
public boolean isTransactional() {
return transactional;
}

@Override
public Integer call() {
return Integer.valueOf(update());
}
}
20 changes: 1 addition & 19 deletions lealone-sql/src/main/java/org/lealone/command/dml/Query.java
Expand Up @@ -9,7 +9,6 @@
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.Callable;

import org.lealone.api.ErrorCode;
import org.lealone.command.Prepared;
Expand Down Expand Up @@ -37,7 +36,7 @@
/**
* Represents a SELECT statement (simple, or union).
*/
public abstract class Query extends Prepared implements Callable<ResultInterface> {
public abstract class Query extends Prepared {

/**
* The limit expression as specified in the LIMIT or TOP clause.
Expand Down Expand Up @@ -288,13 +287,6 @@ public ResultInterface query(int maxrows) {
return query(maxrows, null);
}

private int queryLimit;

@Override
public ResultInterface call() {
return query0(queryLimit, null);
}

/**
* Execute the query, writing the result to the target result.
*
Expand All @@ -303,14 +295,6 @@ public ResultInterface call() {
* @return the result set (if the target is not set).
*/
public ResultInterface query(int limit, ResultTarget target) {
queryLimit = limit;
if (isLocal())
return query0(limit, null);
else
return Session.getRouter().executeQuery(this, limit, false);
}

private ResultInterface query0(int limit, ResultTarget target) {
fireBeforeSelectTriggers();
if (noCache || !session.getDatabase().getOptimizeReuseResults()) {
return queryWithoutCache(limit, target);
Expand Down Expand Up @@ -535,6 +519,4 @@ public final long getMaxDataModificationId() {
}

public abstract List<TableFilter> getTopFilters();

public abstract TableFilter getTableFilter();
}
26 changes: 20 additions & 6 deletions lealone-sql/src/main/java/org/lealone/command/dml/Select.java
Expand Up @@ -11,6 +11,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.Callable;

import org.lealone.api.ErrorCode;
import org.lealone.api.Trigger;
Expand Down Expand Up @@ -63,7 +64,7 @@
* @author Thomas Mueller
* @author Joel Turkel (Group sorted query)
*/
public class Select extends Query {
public class Select extends Query implements Callable<ResultInterface> {
protected TableFilter topTableFilter;
protected final ArrayList<TableFilter> filters = New.arrayList();
protected final ArrayList<TableFilter> topFilters = New.arrayList();
Expand Down Expand Up @@ -139,11 +140,6 @@ public ArrayList<TableFilter> getTopFilters() {
return topFilters;
}

@Override
public TableFilter getTableFilter() {
return getTopTableFilter();
}

public void setExpressions(ArrayList<Expression> expressions) {
this.expressions = expressions;
}
Expand Down Expand Up @@ -1464,4 +1460,22 @@ public int getLimitRows() {
} else
return -1;
}

private int queryLimit;
private ResultTarget resultTarget;

@Override
public ResultInterface query(int limit, ResultTarget target) {
queryLimit = limit;
resultTarget = target;
if (isLocal())
return super.query(limit, target);
else
return Session.getRouter().executeSelect(this, limit, false);
}

@Override
public ResultInterface call() {
return super.query(queryLimit, resultTarget);
}
}
Expand Up @@ -448,9 +448,4 @@ public List<TableFilter> getTopFilters() {
filters.addAll(right.getTopFilters());
return filters;
}

@Override
public TableFilter getTableFilter() {
return getTopFilters().get(0);
}
}
Expand Up @@ -17,7 +17,6 @@
*/
package org.lealone.command.router;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
Expand All @@ -29,9 +28,7 @@
import java.util.concurrent.atomic.AtomicInteger;

import org.lealone.command.CommandInterface;
import org.lealone.command.FrontendCommand;
import org.lealone.command.dml.Select;
import org.lealone.engine.Session;
import org.lealone.message.DbException;
import org.lealone.result.ResultInterface;
import org.lealone.util.New;
Expand Down Expand Up @@ -94,60 +91,6 @@ public static String getPlanSQL(Select select) {
return select.getSQL();
}

public static ResultInterface executeQuery(Session session, SQLRoutingInfo sqlRoutingInfo, Select select, final int maxRows,
final boolean scrollable) {

List<CommandInterface> commands = new ArrayList<CommandInterface>();
if (sqlRoutingInfo.remoteCommands != null) {
commands.addAll(sqlRoutingInfo.remoteCommands);
}
// if (sqlRoutingInfo.localRegions != null) {
// for (String regionName : sqlRoutingInfo.localRegions) {
// Prepared p = session.prepare(getPlanSQL(select), true);
// p.setExecuteDirec(true);
// p.setFetchSize(select.getFetchSize());
// if (p instanceof WithWhereClause) {
// ((WithWhereClause) p).getWhereClauseSupport().setRegionName(regionName);
// }
// commands.add(new CommandWrapper(p));
// }
// }
//originalSelect.isGroupQuery()如果是false,那么按org.apache.hadoop.hbase.client.ClientScanner的功能来实现。
//只要Select语句中出现聚合函数、groupBy、Having三者之一都被认为是GroupQuery,
//对于GroupQuery需要把Select语句同时发给相关的RegionServer,得到结果后再合并。
if (!select.isGroupQuery() && select.getSortOrder() == null)
return new SerializedResult(commands, maxRows, scrollable, select);

int size = commands.size();
List<Future<ResultInterface>> futures = New.arrayList(size);
List<ResultInterface> results = New.arrayList(size);
for (int i = 0; i < size; i++) {
final CommandInterface c = commands.get(i);
futures.add(pool.submit(new Callable<ResultInterface>() {
@Override
public ResultInterface call() throws Exception {
return c.executeQuery(maxRows, scrollable);
}
}));
}
try {
for (int i = 0; i < size; i++) {
results.add(futures.get(i).get());
}
} catch (Exception e) {
throwException(e);
}

if (!select.isGroupQuery() && select.getSortOrder() != null)
return new SortedResult(maxRows, session, select, results);

String newSQL = select.getPlanSQL(true);
Select newSelect = (Select) session.prepare(newSQL, true);
newSelect.setExecuteDirec(true);

return new MergedResult(results, newSelect, select);
}

public static int executeUpdate(List<CommandInterface> commands) {
int size = commands.size();
if (size == 0)
Expand Down Expand Up @@ -177,21 +120,13 @@ public Integer call() throws Exception {
return updateCount;
}

public static int executeUpdate(SQLRoutingInfo sqlRoutingInfo, Callable<Integer> call) {
int updateCount = 0;
List<FrontendCommand> commands = sqlRoutingInfo.remoteCommands;
int size = commands.size() + 1;
public static int executeUpdateCallable(List<Callable<Integer>> commands) {
int size = commands.size();
List<Future<Integer>> futures = New.arrayList(size);
futures.add(pool.submit(call));
for (int i = 0; i < size - 1; i++) {
final CommandInterface c = commands.get(i);
futures.add(pool.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return c.executeUpdate();
}
}));
for (int i = 0; i < size; i++) {
futures.add(pool.submit(commands.get(i)));
}
int updateCount = 0;
try {
for (int i = 0; i < size; i++) {
updateCount += futures.get(i).get();
Expand All @@ -202,21 +137,21 @@ public Integer call() throws Exception {
return updateCount;
}

public static int executeUpdateCallable(List<Callable<Integer>> calls) {
int size = calls.size();
List<Future<Integer>> futures = New.arrayList(size);
public static List<ResultInterface> executeSelectCallable(List<Callable<ResultInterface>> commands) {
int size = commands.size();
List<Future<ResultInterface>> futures = New.arrayList(size);
List<ResultInterface> results = New.arrayList(size);
for (int i = 0; i < size; i++) {
futures.add(pool.submit(calls.get(i)));
futures.add(pool.submit(commands.get(i)));
}
int updateCount = 0;
try {
for (int i = 0; i < size; i++) {
updateCount += futures.get(i).get();
results.add(futures.get(i).get());
}
} catch (Exception e) {
throwException(e);
}
return updateCount;
return results;
}

public static <T> void execute(List<Callable<T>> calls) {
Expand Down
Expand Up @@ -24,10 +24,10 @@
import org.lealone.command.Prepared;
import org.lealone.result.ResultInterface;

class CommandWrapper implements CommandInterface {
public class CommandWrapper implements CommandInterface {
private Prepared p;

CommandWrapper(Prepared p) {
public CommandWrapper(Prepared p) {
this.p = p;
}

Expand Down
Expand Up @@ -36,6 +36,7 @@ public class MergedIndex extends BaseIndex {
public MergedIndex(ResultInterface result, Table table, int id, IndexColumn[] columns, IndexType indexType) {
super();
this.result = result;
initBaseIndex(table, id, table.getName() + "_DATA", columns, indexType);
}

@Override
Expand Down
Expand Up @@ -20,18 +20,19 @@
import org.lealone.command.ddl.DefineCommand;
import org.lealone.command.dml.Delete;
import org.lealone.command.dml.Insert;
import org.lealone.command.dml.Query;
import org.lealone.command.dml.Select;
import org.lealone.command.dml.Update;
import org.lealone.result.ResultInterface;

public interface Router {

int executeDefineCommand(DefineCommand defineCommand);

int executeInsert(Insert insert);

int executeDelete(Delete delete);

int executeUpdate(Update update);

ResultInterface executeQuery(Query query, int maxRows, boolean scrollable);

int executeDefineCommand(DefineCommand defineCommand);
ResultInterface executeSelect(Select select, int maxRows, boolean scrollable);
}

0 comments on commit ec1a116

Please sign in to comment.