Skip to content

Commit

Permalink
删除CommandBase类,在QUERY/UPDATE协议包中携带可选的pageKeys,优化executeQuery/executeU…
Browse files Browse the repository at this point in the history
…pdate方法的实现
  • Loading branch information
codefollower committed Oct 1, 2018
1 parent 57fd067 commit 8b4d925
Show file tree
Hide file tree
Showing 13 changed files with 425 additions and 507 deletions.
Expand Up @@ -24,7 +24,6 @@
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
import org.lealone.common.trace.Trace; import org.lealone.common.trace.Trace;
import org.lealone.db.Command; import org.lealone.db.Command;
import org.lealone.db.CommandBase;
import org.lealone.db.CommandParameter; import org.lealone.db.CommandParameter;
import org.lealone.db.CommandUpdateResult; import org.lealone.db.CommandUpdateResult;
import org.lealone.db.Session; import org.lealone.db.Session;
Expand All @@ -33,7 +32,7 @@
import org.lealone.net.AsyncCallback; import org.lealone.net.AsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;


public class ClientBatchCommand extends CommandBase { public class ClientBatchCommand implements Command {
private ClientSession session; private ClientSession session;
private Transfer transfer; private Transfer transfer;
private final Trace trace; private final Trace trace;
Expand Down
155 changes: 61 additions & 94 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -20,7 +20,6 @@
import org.lealone.common.trace.Trace; import org.lealone.common.trace.Trace;
import org.lealone.common.util.Utils; import org.lealone.common.util.Utils;
import org.lealone.db.Command; import org.lealone.db.Command;
import org.lealone.db.CommandBase;
import org.lealone.db.CommandParameter; import org.lealone.db.CommandParameter;
import org.lealone.db.CommandUpdateResult; import org.lealone.db.CommandUpdateResult;
import org.lealone.db.Session; import org.lealone.db.Session;
Expand All @@ -44,7 +43,7 @@
* @author H2 Group * @author H2 Group
* @author zhh * @author zhh
*/ */
public class ClientCommand extends CommandBase implements StorageCommand { public class ClientCommand implements StorageCommand {


private final Transfer transfer; private final Transfer transfer;
private final ArrayList<CommandParameter> parameters; private final ArrayList<CommandParameter> parameters;
Expand All @@ -65,6 +64,11 @@ public ClientCommand(ClientSession session, Transfer transfer, String sql, int f
this.session = session; this.session = session;
} }


@Override
public int getType() {
return CLIENT_COMMAND;
}

@Override @Override
public Command prepare() { public Command prepare() {
prepare(session, true); prepare(session, true);
Expand Down Expand Up @@ -166,27 +170,32 @@ public void runInternal() {


@Override @Override
public Result executeQuery(int maxRows) { public Result executeQuery(int maxRows) {
return executeQuery(maxRows, false, null, false); return query(maxRows, false, null, null);
} }


@Override @Override
public Result executeQuery(int maxRows, boolean scrollable) { public Result executeQuery(int maxRows, boolean scrollable) {
return executeQuery(maxRows, scrollable, null, false); return query(maxRows, scrollable, null, null);
}

@Override
public Result executeQuery(int maxRows, boolean scrollable, List<PageKey> pageKeys) {
return query(maxRows, scrollable, pageKeys, null);
} }


@Override @Override
public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler) { public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler) {
executeQuery(maxRows, scrollable, handler, true); query(maxRows, scrollable, null, handler);
} }


@Override @Override
public void executeQueryAsync(int maxRows, boolean scrollable, ArrayList<PageKey> pageKeys, public void executeQueryAsync(int maxRows, boolean scrollable, List<PageKey> pageKeys,
AsyncHandler<AsyncResult<Result>> handler) { AsyncHandler<AsyncResult<Result>> handler) {
executeQuery(maxRows, scrollable, handler, true); query(maxRows, scrollable, pageKeys, handler);
} }


private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler, private Result query(int maxRows, boolean scrollable, List<PageKey> pageKeys,
boolean async) { AsyncHandler<AsyncResult<Result>> handler) {
if (prepared) { if (prepared) {
checkParameters(); checkParameters();
prepareIfRequired(); prepareIfRequired();
Expand All @@ -207,7 +216,6 @@ private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler<AsyncR
session.traceOperation("COMMAND_PREPARED_QUERY", id); session.traceOperation("COMMAND_PREPARED_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY); transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY);
} }
transfer.writeInt(resultId).writeInt(maxRows);
} else { } else {
if (isDistributedQuery) { if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id); session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id);
Expand All @@ -216,26 +224,43 @@ private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler<AsyncR
session.traceOperation("COMMAND_QUERY", id); session.traceOperation("COMMAND_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_QUERY); transfer.writeRequestHeader(id, Session.COMMAND_QUERY);
} }
transfer.writeString(sql).writeInt(resultId).writeInt(maxRows);
} }
int fetch; int fetch;
if (scrollable) { if (scrollable) {
fetch = Integer.MAX_VALUE; fetch = Integer.MAX_VALUE;
} else { } else {
fetch = fetchSize; fetch = fetchSize;
} }
transfer.writeInt(fetch); transfer.writeInt(resultId).writeInt(maxRows).writeInt(fetch).writeBoolean(scrollable);
if (prepared) if (prepared)
sendParameters(transfer); sendParameters(transfer);
result = getQueryResult(isDistributedQuery, fetch, resultId, handler, async); else
transfer.writeString(sql);
writePageKeys(pageKeys);

result = getQueryResult(isDistributedQuery, fetch, resultId, handler);
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
return result; return result;
} }


private void writePageKeys(List<PageKey> pageKeys) throws IOException {
if (pageKeys == null) {
transfer.writeInt(0);
} else {
int size = pageKeys.size();
transfer.writeInt(size);
for (int i = 0; i < size; i++) {
PageKey pk = pageKeys.get(i);
transfer.writeValue((Value) pk.key);
transfer.writeBoolean(pk.first);
}
}
}

private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultId, private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultId,
AsyncHandler<AsyncResult<Result>> handler, boolean async) throws IOException { AsyncHandler<AsyncResult<Result>> handler) throws IOException {
isQuery = true; isQuery = true;
AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() { AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() {
@Override @Override
Expand Down Expand Up @@ -263,12 +288,12 @@ public void runInternal() {
} }
} }
}; };
if (async) if (handler != null)
ac.setAsyncHandler(handler); ac.setAsyncHandler(handler);
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();


if (async) { if (handler != null) {
return null; return null;
} else { } else {
Result result = ac.getResult(); Result result = ac.getResult();
Expand All @@ -278,21 +303,26 @@ public void runInternal() {


@Override @Override
public int executeUpdate() { public int executeUpdate() {
return executeUpdate(null, null, false, null); return update(null, null, null, null);
}

@Override
public int executeUpdate(List<PageKey> pageKeys) {
return update(null, null, pageKeys, null);
} }


@Override @Override
public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) { public int executeUpdate(String replicationName, CommandUpdateResult commandUpdateResult) {
return executeUpdate(replicationName, null, false, commandUpdateResult); return update(replicationName, commandUpdateResult, null, null);
} }


@Override @Override
public void executeUpdateAsync(AsyncHandler<AsyncResult<Integer>> handler) { public void executeUpdateAsync(AsyncHandler<AsyncResult<Integer>> handler) {
executeUpdate(null, handler, true, null); update(null, null, null, handler);
} }


private int executeUpdate(String replicationName, AsyncHandler<AsyncResult<Integer>> handler, boolean async, private int update(String replicationName, CommandUpdateResult commandUpdateResult, List<PageKey> pageKeys,
CommandUpdateResult commandUpdateResult) { AsyncHandler<AsyncResult<Integer>> handler) {
if (prepared) { if (prepared) {
checkParameters(); checkParameters();
prepareIfRequired(); prepareIfRequired();
Expand Down Expand Up @@ -327,23 +357,26 @@ private int executeUpdate(String replicationName, AsyncHandler<AsyncResult<Integ
transfer.writeRequestHeader(id, Session.COMMAND_UPDATE); transfer.writeRequestHeader(id, Session.COMMAND_UPDATE);
} }
} }
if (!prepared)
transfer.writeString(sql);
if (replicationName != null) if (replicationName != null)
transfer.writeString(replicationName); transfer.writeString(replicationName);


if (prepared) if (prepared)
sendParameters(transfer); sendParameters(transfer);
else
transfer.writeString(sql);

writePageKeys(pageKeys);


updateCount = getUpdateCount(isDistributedUpdate, id, handler, async, commandUpdateResult); updateCount = getUpdateCount(isDistributedUpdate, id, commandUpdateResult, handler);
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
return updateCount; return updateCount;
} }


private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler<AsyncResult<Integer>> handler, private int getUpdateCount(boolean isDistributedUpdate, int id, CommandUpdateResult commandUpdateResult,
boolean async, CommandUpdateResult commandUpdateResult) throws IOException { AsyncHandler<AsyncResult<Integer>> handler) throws IOException {
isQuery = false; isQuery = false;
AsyncCallback<Integer> ac = new AsyncCallback<Integer>() { AsyncCallback<Integer> ac = new AsyncCallback<Integer>() {
@Override @Override
Expand All @@ -369,13 +402,13 @@ public void runInternal() {
} }
} }
}; };
if (async) if (handler != null)
ac.setAsyncHandler(handler); ac.setAsyncHandler(handler);
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();


int updateCount; int updateCount;
if (async) { if (handler != null) {
updateCount = -1; updateCount = -1;
} else { } else {
updateCount = ac.getResult(); updateCount = ac.getResult();
Expand Down Expand Up @@ -436,11 +469,6 @@ public String toString() {
return sql + Trace.formatParams(getParameters()); return sql + Trace.formatParams(getParameters());
} }


@Override
public int getType() {
return CLIENT_COMMAND;
}

int getId() { int getId() {
return id; return id;
} }
Expand Down Expand Up @@ -770,65 +798,4 @@ public void runInternal() {
} }
return null; return null;
} }

@Override
public Result executeQuery(int maxRows, boolean scrollable, List<PageKey> pageKeys) {
if (prepared) {
checkParameters();
prepareIfRequired();
} else {
id = session.getNextId();
}
int resultId = session.getNextId();
Result result = null;
try {
boolean isDistributedQuery = session.getParentTransaction() != null
&& !session.getParentTransaction().isAutoCommit();

if (prepared) {
if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS", id);
transfer.writeRequestHeader(id,
Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY_WITH_PAGE_KEYS);
} else {
session.traceOperation("COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY_WITH_PAGE_KEYS);
}
transfer.writeInt(resultId).writeInt(maxRows);
} else {
if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS", id);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY_WITH_PAGE_KEYS);
} else {
session.traceOperation("COMMAND_QUERY_WITH_PAGE_KEYS", id);
transfer.writeRequestHeader(id, Session.COMMAND_QUERY_WITH_PAGE_KEYS);
}
transfer.writeString(sql).writeInt(resultId).writeInt(maxRows);
}
int fetch;
if (scrollable) {
fetch = Integer.MAX_VALUE;
} else {
fetch = fetchSize;
}
transfer.writeInt(fetch);
if (prepared)
sendParameters(transfer);
if (pageKeys == null) {
transfer.writeInt(0);
} else {
int size = pageKeys.size();
transfer.writeInt(size);
for (int i = 0; i < size; i++) {
PageKey pk = pageKeys.get(i);
transfer.writeValue((Value) pk.key);
transfer.writeBoolean(pk.first);
}
}
result = getQueryResult(isDistributedQuery, fetch, resultId, null, false);
} catch (Exception e) {
session.handleException(e);
}
return result;
}
} }

0 comments on commit 8b4d925

Please sign in to comment.