Skip to content

Commit

Permalink
sql语句可以按优化级来执行
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Oct 6, 2016
1 parent 28b1498 commit 98da10b
Show file tree
Hide file tree
Showing 13 changed files with 319 additions and 5 deletions.
Expand Up @@ -21,6 +21,10 @@

public interface PreparedStatement extends SQLStatement {

public final static int MIN_PRIORITY = 1;
public final static int NORM_PRIORITY = 5;
public final static int MAX_PRIORITY = 10;

void setFetchSize(int fetchSize);

int getFetchSize();
Expand All @@ -45,6 +49,10 @@ public interface PreparedStatement extends SQLStatement {

double getCost();

int getPriority();

void setPriority(int priority);

boolean isTransactional();

Result query(int maxRows);
Expand Down
Expand Up @@ -21,6 +21,8 @@ public interface SQLStatementExecutor {

void executeNextStatement();

void executeNextStatementIfNeeded(PreparedStatement current);

void ready();

}
7 changes: 6 additions & 1 deletion lealone-db/src/main/java/org/lealone/db/Database.java
Expand Up @@ -72,6 +72,7 @@
import org.lealone.storage.StorageEngine;
import org.lealone.storage.fs.FileStorage;
import org.lealone.storage.fs.FileUtils;
import org.lealone.storage.memory.MemoryStorageEngine;
import org.lealone.transaction.TransactionEngine;
import org.lealone.transaction.TransactionEngineManager;

Expand Down Expand Up @@ -2155,7 +2156,11 @@ public synchronized Storage getStorage(StorageEngine storageEngine) {
if (storage != null)
return storage;

storage = getStorageBuilder(storageEngine).openStorage();
if (storageEngine instanceof MemoryStorageEngine) {
storage = storageEngine.getStorageBuilder().openStorage();
} else {
storage = getStorageBuilder(storageEngine).openStorage();
}
storages.put(storageEngine.getName(), storage);
if (persistent && lobStorage == null)
setLobStorage(storageEngine.getLobStorage(this, storage));
Expand Down
69 changes: 65 additions & 4 deletions lealone-net/src/main/java/org/lealone/net/CommandHandler.java
Expand Up @@ -26,13 +26,14 @@

import org.lealone.db.SessionStatus;
import org.lealone.net.AsyncConnection.SessionInfo;
import org.lealone.sql.PreparedStatement;
import org.lealone.sql.SQLEngineManager;
import org.lealone.sql.SQLStatementExecutor;

public class CommandHandler extends Thread implements SQLStatementExecutor {

private static final LinkedList<AsyncConnection> connections = new LinkedList<>();
private static final int commandHandlersCount = 2; // Runtime.getRuntime().availableProcessors();
private static final int commandHandlersCount = 1; // Runtime.getRuntime().availableProcessors();
private static final CommandHandler[] commandHandlers = new CommandHandler[commandHandlersCount];
private static final AtomicInteger index = new AtomicInteger(0);

Expand Down Expand Up @@ -76,6 +77,7 @@ public static void removeConnection(AsyncConnection c) {
private final ConcurrentHashMap<Integer, SessionInfo> sessionInfoMap = new ConcurrentHashMap<>();
private final Semaphore haveWork = new Semaphore(1);
private boolean stop;
private int nested;

void addSession(Integer sessionId, SessionInfo sessionInfo) {
sessionInfoMap.put(sessionId, sessionInfo);
Expand Down Expand Up @@ -133,7 +135,7 @@ private PreparedCommand getNextBestCommand() {
return null;

ConcurrentLinkedQueue<PreparedCommand> bestPreparedCommandQueue = null;
double cost = 0.0;
int priority = PreparedStatement.MIN_PRIORITY;

for (SessionInfo sessionInfo : sessionInfoMap.values()) {
ConcurrentLinkedQueue<PreparedCommand> preparedCommandQueue = sessionInfo.preparedCommandQueue;
Expand All @@ -149,9 +151,9 @@ private PreparedCommand getNextBestCommand() {
continue;
}

if (bestPreparedCommandQueue == null || pc.stmt.getCost() < cost) {
if (bestPreparedCommandQueue == null || pc.stmt.getPriority() > priority) {
bestPreparedCommandQueue = preparedCommandQueue;
cost = pc.stmt.getCost();
priority = pc.stmt.getPriority();
}
}

Expand All @@ -161,4 +163,63 @@ private PreparedCommand getNextBestCommand() {
return bestPreparedCommandQueue.poll();
}

@Override
public void executeNextStatementIfNeeded(PreparedStatement current) {
// 如果出来各高优化级的命令,最多只抢占3次,避免堆栈溢出
if (nested >= 3)
return;
nested++;
int priority = current.getPriority();
boolean hasHigherPriorityCommand = false;
while (true) {
PreparedCommand c = getNextBestCommand(priority);
if (c == null) {
break;
}

hasHigherPriorityCommand = true;
try {
c.run();
} catch (Throwable e) {
c.transfer.getAsyncConnection().sendError(c.transfer, c.id, e);
}
}

if (hasHigherPriorityCommand) {
current.setPriority(priority + 1);
}
nested--;
}

private PreparedCommand getNextBestCommand(int priority) {
if (sessionInfoMap.isEmpty())
return null;

ConcurrentLinkedQueue<PreparedCommand> bestPreparedCommandQueue = null;

for (SessionInfo sessionInfo : sessionInfoMap.values()) {
ConcurrentLinkedQueue<PreparedCommand> preparedCommandQueue = sessionInfo.preparedCommandQueue;
PreparedCommand pc = preparedCommandQueue.peek();
if (pc == null)
continue;

// SessionStatus sessionStatus = pc.session.getStatus();
// if (sessionStatus == SessionStatus.TRANSACTION_NOT_COMMIT) {
// bestPreparedCommandQueue = preparedCommandQueue;
// break;
// } else if (sessionStatus == SessionStatus.COMMITTING_TRANSACTION) {
// continue;
// }

if (pc.stmt.getPriority() > priority) {
bestPreparedCommandQueue = preparedCommandQueue;
priority = pc.stmt.getPriority();
}
}

if (bestPreparedCommandQueue == null)
return null;

return bestPreparedCommandQueue.poll();
}
}
18 changes: 18 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/StatementBase.java
Expand Up @@ -377,6 +377,12 @@ public void setPrepareAlways(boolean prepareAlways) {
protected void setCurrentRowNumber(int rowNumber) {
if ((++rowScanCount & 127) == 0) {
checkCanceled();

Thread t = Thread.currentThread();
if (t instanceof SQLStatementExecutor) {
SQLStatementExecutor sqlStatementExecutor = (SQLStatementExecutor) t;
sqlStatementExecutor.executeNextStatementIfNeeded(this);
}
}
this.currentRowNumber = rowNumber;
setProgress();
Expand Down Expand Up @@ -549,6 +555,18 @@ public double getCost() {
return cost;
}

protected int priority = NORM_PRIORITY;

@Override
public int getPriority() {
return priority;
}

@Override
public void setPriority(int priority) {
this.priority = priority;
}

@Override
public Result executeQuery(int maxRows) {
return query(maxRows);
Expand Down
Expand Up @@ -44,4 +44,9 @@ public boolean isTransactional() {
return transactional;
}

@Override
public int getPriority() {
priority = MIN_PRIORITY;
return priority;
}
}
8 changes: 8 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/Delete.java
Expand Up @@ -151,4 +151,12 @@ public String getPlanSQL() {
return buff.toString();
}

@Override
public int getPriority() {
if (getCurrentRowNumber() > 0)
return priority;

priority = NORM_PRIORITY - 1;
return priority;
}
}
12 changes: 12 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/Insert.java
Expand Up @@ -250,4 +250,16 @@ public String getPlanSQL() {
public double getCost() {
return query != null ? query.getCost() : list.size();
}

@Override
public int getPriority() {
if (rowNumber > 0)
return priority;

if (query != null || list.size() > 10)
priority = NORM_PRIORITY - 1;
else
priority = MAX_PRIORITY;
return priority;
}
}
9 changes: 9 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/Merge.java
Expand Up @@ -305,4 +305,13 @@ public String getPlanSQL() {
public double getCost() {
return query != null ? query.getCost() : list.size();
}

@Override
public int getPriority() {
if (getCurrentRowNumber() > 0)
return priority;

priority = NORM_PRIORITY - 1;
return priority;
}
}
9 changes: 9 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/Select.java
Expand Up @@ -1513,4 +1513,13 @@ public boolean isBatchForInsert() {
public void addGlobalCondition(CommandParameter param, int columnId, int comparisonType) {
this.addGlobalCondition((Parameter) param, columnId, comparisonType);
}

@Override
public int getPriority() {
if (getCurrentRowNumber() > 127)
return priority;

priority = MIN_PRIORITY;
return priority;
}
}
Expand Up @@ -469,4 +469,9 @@ public boolean isBatchForInsert() {
public void addGlobalCondition(CommandParameter param, int columnId, int comparisonType) {
this.addGlobalCondition((Parameter) param, columnId, comparisonType);
}

@Override
public int getPriority() {
return Math.min(left.getPriority(), left.getPriority());
}
}
8 changes: 8 additions & 0 deletions lealone-sql/src/main/java/org/lealone/sql/dml/Update.java
Expand Up @@ -223,4 +223,12 @@ public String getPlanSQL() {
return buff.toString();
}

@Override
public int getPriority() {
if (getCurrentRowNumber() > 0)
return priority;

priority = NORM_PRIORITY - 1;
return priority;
}
}

0 comments on commit 98da10b

Please sign in to comment.