Skip to content

Commit

Permalink
扩展JDBC,增加异步API
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 20, 2016
1 parent 88e7d71 commit 4aea8a4
Show file tree
Hide file tree
Showing 17 changed files with 867 additions and 363 deletions.
Expand Up @@ -21,6 +21,8 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;


import org.lealone.async.AsyncHandler;
import org.lealone.async.AsyncResult;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
import org.lealone.common.trace.Trace; import org.lealone.common.trace.Trace;
import org.lealone.db.Command; import org.lealone.db.Command;
Expand Down Expand Up @@ -191,4 +193,16 @@ public Command prepare() {
return this; return this;
} }


@Override
public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler) {
// TODO Auto-generated method stub

}

@Override
public void executeUpdateAsync(AsyncHandler<AsyncResult<Integer>> handler) {
// TODO Auto-generated method stub

}

} }
272 changes: 135 additions & 137 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -11,6 +11,8 @@
import java.util.ArrayList; import java.util.ArrayList;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.async.AsyncHandler;
import org.lealone.async.AsyncResult;
import org.lealone.client.result.ClientResult; import org.lealone.client.result.ClientResult;
import org.lealone.client.result.RowCountDeterminedClientResult; import org.lealone.client.result.RowCountDeterminedClientResult;
import org.lealone.client.result.RowCountUndeterminedClientResult; import org.lealone.client.result.RowCountUndeterminedClientResult;
Expand Down Expand Up @@ -158,211 +160,207 @@ public void runInternal() {


@Override @Override
public Result executeQuery(int maxRows) { public Result executeQuery(int maxRows) {
return executeQuery(maxRows, false); return executeQuery(maxRows, false, null, false);
} }


@Override @Override
public Result executeQuery(int maxRows, boolean scrollable) { public Result executeQuery(int maxRows, boolean scrollable) {
if (prepared) return executeQuery(maxRows, scrollable, null, false);
return executePreparedQuery(maxRows, scrollable);
else
return executeQueryDirectly(maxRows, scrollable);
} }


private Result executeQueryDirectly(int maxRows, boolean scrollable) { @Override
id = session.getNextId(); public void executeQueryAsync(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler) {
int objectId = session.getNextId(); executeQuery(maxRows, scrollable, handler, true);
ClientResult result = null; }

private Result executeQuery(int maxRows, boolean scrollable, AsyncHandler<AsyncResult<Result>> handler,
boolean async) {
if (prepared) {
checkParameters();
prepareIfRequired();
} else {
id = session.getNextId();
}
int resultId = session.getNextId();
Result result = null;
try { try {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id); if (prepared) {
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY); if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY);
} else {
session.traceOperation("COMMAND_PREPARED_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY);
}
transfer.writeInt(session.getSessionId()).writeInt(resultId).writeInt(maxRows);
} else { } else {
session.traceOperation("COMMAND_QUERY", id); if (isDistributedQuery) {
transfer.writeRequestHeader(id, Session.COMMAND_QUERY); session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY);
} else {
session.traceOperation("COMMAND_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_QUERY);
}
transfer.writeInt(session.getSessionId()).writeString(sql).writeInt(resultId).writeInt(maxRows);
} }
transfer.writeInt(session.getSessionId()).writeString(sql).writeInt(objectId).writeInt(maxRows);
int fetch; int fetch;
if (scrollable) { if (scrollable) {
fetch = Integer.MAX_VALUE; fetch = Integer.MAX_VALUE;
} else { } else {
fetch = fetchSize; fetch = fetchSize;
} }
transfer.writeInt(fetch); transfer.writeInt(fetch);
AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() { if (prepared)
@Override sendParameters(transfer);
public void runInternal() { result = getQueryResult(isDistributedQuery, fetch, resultId, handler, async);
try {
if (isDistributedQuery)
session.getTransaction().addLocalTransactionNames(transfer.readString());

int columnCount = transfer.readInt();
int rowCount = transfer.readInt();
ClientResult result;
if (rowCount < 0)
result = new RowCountUndeterminedClientResult(session, transfer, objectId, columnCount,
fetch);
else
result = new RowCountDeterminedClientResult(session, transfer, objectId, columnCount,
rowCount, fetch);
setResult(result);
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush();
result = ac.getResult();
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
session.readSessionState(); session.readSessionState();
isQuery = true;
return result; return result;
} }


private Result executePreparedQuery(int maxRows, boolean scrollable) { private Result getQueryResult(boolean isDistributedQuery, int fetch, int resultId,
checkParameters(); AsyncHandler<AsyncResult<Result>> handler, boolean async) throws IOException {
int objectId = session.getNextId(); isQuery = true;
ClientResult result = null; AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() {
prepareIfRequired(); @Override
try { public void runInternal() {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); try {
if (isDistributedQuery) { if (isDistributedQuery)
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY", id); session.getTransaction().addLocalTransactionNames(transfer.readString());
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY);
} else {
session.traceOperation("COMMAND_PREPARED_QUERY", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY);
}
transfer.writeInt(session.getSessionId()).writeInt(objectId).writeInt(maxRows);
int fetch;
if (scrollable) {
fetch = Integer.MAX_VALUE;
} else {
fetch = fetchSize;
}
transfer.writeInt(fetch);
sendParameters(transfer);
AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() {
@Override
public void runInternal() {
try {
if (isDistributedQuery)
session.getTransaction().addLocalTransactionNames(transfer.readString());


int columnCount = transfer.readInt(); int columnCount = transfer.readInt();
int rowCount = transfer.readInt(); int rowCount = transfer.readInt();
ClientResult result; ClientResult result;
if (rowCount < 0) if (rowCount < 0)
result = new RowCountUndeterminedClientResult(session, transfer, objectId, columnCount, result = new RowCountUndeterminedClientResult(session, transfer, resultId, columnCount, fetch);
fetch); else
else result = new RowCountDeterminedClientResult(session, transfer, resultId, columnCount, rowCount,
result = new RowCountDeterminedClientResult(session, transfer, objectId, columnCount, fetch);
rowCount, fetch);
setResult(result); setResult(result);
} catch (IOException e) { if (handler != null) {
throw DbException.convert(e); AsyncResult<Result> r = new AsyncResult<>();
r.setResult(result);
handler.handle(r);
} }
// resultSet.setCommand(command);
} catch (IOException e) {
throw DbException.convert(e);
} }
}; }
transfer.addAsyncCallback(id, ac); };
transfer.flush(); transfer.addAsyncCallback(id, ac);
result = ac.getResult(); transfer.flush();
} catch (Exception e) {
session.handleException(e); if (async)
} return null;
session.readSessionState(); else
return result; return ac.getResult();
} }


@Override @Override
public int executeUpdate() { public int executeUpdate() {
return executeUpdate(null); return executeUpdate(null, null, false);
} }


@Override @Override
public int executeUpdate(String replicationName) { public int executeUpdate(String replicationName) {
if (prepared) return executeUpdate(replicationName, null, false);
return executePreparedUpdate(replicationName); }
else
return executeUpdateDirectly(replicationName); @Override
public void executeUpdateAsync(AsyncHandler<AsyncResult<Integer>> handler) {
executeUpdate(null, handler, true);
} }


private int executeUpdateDirectly(String replicationName) { private int executeUpdate(String replicationName, AsyncHandler<AsyncResult<Integer>> handler, boolean async) {
id = session.getNextId(); if (prepared) {
checkParameters();
prepareIfRequired();
} else {
id = session.getNextId();
}
int updateCount = 0; int updateCount = 0;
try { try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit(); boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_UPDATE", id); if (prepared) {
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE); if (isDistributedUpdate) {
} else if (replicationName != null) { session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE", id);
session.traceOperation("COMMAND_REPLICATION_UPDATE", id); transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_UPDATE); } else if (replicationName != null) {
session.traceOperation("COMMAND_REPLICATION_PREPARED_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_PREPARED_UPDATE);
} else {
session.traceOperation("COMMAND_PREPARED_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_UPDATE);
}
} else { } else {
session.traceOperation("COMMAND_UPDATE", id); if (isDistributedUpdate) {
transfer.writeRequestHeader(id, Session.COMMAND_UPDATE); session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE);
} else if (replicationName != null) {
session.traceOperation("COMMAND_REPLICATION_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_UPDATE);
} else {
session.traceOperation("COMMAND_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_UPDATE);
}
} }
transfer.writeInt(session.getSessionId()).writeString(sql); transfer.writeInt(session.getSessionId());
if (!prepared)
transfer.writeString(sql);
if (replicationName != null) if (replicationName != null)
transfer.writeString(replicationName); transfer.writeString(replicationName);


updateCount = getUpdateCount(isDistributedUpdate, id); if (prepared)
sendParameters(transfer);

updateCount = getUpdateCount(isDistributedUpdate, id, handler, async);
} catch (Exception e) { } catch (Exception e) {
session.handleException(e); session.handleException(e);
} }
session.readSessionState(); session.readSessionState();
return updateCount; return updateCount;
} }


private int getUpdateCount(boolean isDistributedUpdate, int id) throws IOException { private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler<AsyncResult<Integer>> handler,
boolean async) throws IOException {
isQuery = false;
IntAsyncCallback ac = new IntAsyncCallback() { IntAsyncCallback ac = new IntAsyncCallback() {
@Override @Override
public void runInternal() { public void runInternal() {
try { try {
if (isDistributedUpdate) if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString()); session.getTransaction().addLocalTransactionNames(transfer.readString());


setResult(transfer.readInt()); int updateCount = transfer.readInt();
setResult(updateCount);
if (handler != null) {
AsyncResult<Integer> r = new AsyncResult<>();
r.setResult(updateCount);
handler.handle(r);
}
} catch (IOException e) { } catch (IOException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
} }
}; };
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();
return ac.getResult();
}

private int executePreparedUpdate(String replicationName) {
checkParameters();
int updateCount = 0;
prepareIfRequired();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE);
} else if (replicationName != null) {
session.traceOperation("COMMAND_REPLICATION_PREPARED_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_PREPARED_UPDATE);
} else {
session.traceOperation("COMMAND_PREPARED_UPDATE", id);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_UPDATE);
}
transfer.writeInt(session.getSessionId());
if (replicationName != null)
transfer.writeString(replicationName);
sendParameters(transfer);


updateCount = getUpdateCount(isDistributedUpdate, id); int updateCount;
} catch (Exception e) { if (async) {
session.handleException(e); updateCount = -1;
} else {
ac.await();
updateCount = ac.getResult();
} }
session.readSessionState();
return updateCount; return updateCount;
} }


Expand Down

0 comments on commit 4aea8a4

Please sign in to comment.