Skip to content

Commit

Permalink
删除不必要的AsyncCallback子类,并在AsyncResult中处理异常情况
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 27, 2016
1 parent 809cf4c commit 1d7302d
Show file tree
Hide file tree
Showing 12 changed files with 130 additions and 108 deletions.
Expand Up @@ -30,8 +30,8 @@
import org.lealone.db.Session; import org.lealone.db.Session;
import org.lealone.db.result.Result; import org.lealone.db.result.Result;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.net.AsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;
import org.lealone.net.VoidAsyncCallback;


public class ClientBatchCommand implements Command { public class ClientBatchCommand implements Command {
private ClientSession session; private ClientSession session;
Expand Down Expand Up @@ -132,7 +132,7 @@ public int executeUpdate() {
} }


private void getResultAsync() throws IOException { private void getResultAsync() throws IOException {
VoidAsyncCallback ac = new VoidAsyncCallback() { AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override @Override
public void runInternal() { public void runInternal() {
try { try {
Expand Down
Expand Up @@ -26,9 +26,7 @@
import org.lealone.db.result.Result; import org.lealone.db.result.Result;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.net.AsyncCallback; import org.lealone.net.AsyncCallback;
import org.lealone.net.IntAsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;
import org.lealone.net.VoidAsyncCallback;
import org.lealone.storage.StorageCommand; import org.lealone.storage.StorageCommand;


/** /**
Expand Down Expand Up @@ -77,7 +75,7 @@ private void prepare(ClientSession s, boolean createParams) {
transfer.writeRequestHeader(id, Session.COMMAND_PREPARE); transfer.writeRequestHeader(id, Session.COMMAND_PREPARE);
} }
transfer.writeInt(session.getSessionId()).writeString(sql); transfer.writeInt(session.getSessionId()).writeString(sql);
VoidAsyncCallback ac = new VoidAsyncCallback() { AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override @Override
public void runInternal() { public void runInternal() {
try { try {
Expand Down Expand Up @@ -250,6 +248,8 @@ public void runInternal() {
} }
} }
}; };
if (async)
ac.setAsyncHandler(handler);
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();


Expand Down Expand Up @@ -329,7 +329,7 @@ private int executeUpdate(String replicationName, AsyncHandler<AsyncResult<Integ
private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler<AsyncResult<Integer>> handler, private int getUpdateCount(boolean isDistributedUpdate, int id, AsyncHandler<AsyncResult<Integer>> handler,
boolean async) throws IOException { boolean async) throws IOException {
isQuery = false; isQuery = false;
IntAsyncCallback ac = new IntAsyncCallback() { AsyncCallback<Integer> ac = new AsyncCallback<Integer>() {
@Override @Override
public void runInternal() { public void runInternal() {
try { try {
Expand All @@ -348,14 +348,15 @@ public void runInternal() {
} }
} }
}; };
if (async)
ac.setAsyncHandler(handler);
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();


int updateCount; int updateCount;
if (async) { if (async) {
updateCount = -1; updateCount = -1;
} else { } else {
ac.await();
updateCount = ac.getResult(); updateCount = ac.getResult();
} }


Expand Down
Expand Up @@ -37,8 +37,8 @@
import org.lealone.db.SetTypes; import org.lealone.db.SetTypes;
import org.lealone.db.SysProperties; import org.lealone.db.SysProperties;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.net.AsyncCallback;
import org.lealone.net.AsyncConnection; import org.lealone.net.AsyncConnection;
import org.lealone.net.OkAsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;
import org.lealone.replication.ReplicationSession; import org.lealone.replication.ReplicationSession;
import org.lealone.sql.ParsedStatement; import org.lealone.sql.ParsedStatement;
Expand Down Expand Up @@ -290,7 +290,7 @@ private void setAutoCommitSend(boolean autoCommit) {
traceOperation("SESSION_SET_AUTOCOMMIT", autoCommit ? 1 : 0); traceOperation("SESSION_SET_AUTOCOMMIT", autoCommit ? 1 : 0);
transfer.writeRequestHeader(id, Session.SESSION_SET_AUTO_COMMIT); transfer.writeRequestHeader(id, Session.SESSION_SET_AUTO_COMMIT);
transfer.writeInt(sessionId).writeBoolean(autoCommit); transfer.writeInt(sessionId).writeBoolean(autoCommit);
OkAsyncCallback ac = new OkAsyncCallback(); AsyncCallback<Void> ac = new AsyncCallback<>();
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();
ac.await(); ac.await();
Expand Down
Expand Up @@ -93,15 +93,20 @@ private ResultSet executeQuery(String sql, AsyncHandler<AsyncResult<ResultSet>>
AsyncHandler<AsyncResult<Result>> h = new AsyncHandler<AsyncResult<Result>>() { AsyncHandler<AsyncResult<Result>> h = new AsyncHandler<AsyncResult<Result>>() {
@Override @Override
public void handle(AsyncResult<Result> ar) { public void handle(AsyncResult<Result> ar) {
Result r = ar.getResult(); if (ar.isSucceeded()) {
resultSet = new JdbcResultSet(conn, JdbcStatement.this, r, id, closedByResultSet, scrollable, Result r = ar.getResult();
updatable); resultSet = new JdbcResultSet(conn, JdbcStatement.this, r, id, closedByResultSet,
resultSet.setCommand(command); scrollable, updatable);
resultSet.setCommand(command);
}
setExecutingStatement(null); setExecutingStatement(null);


if (handler != null) { if (handler != null) {
AsyncResult<ResultSet> r2 = new AsyncResult<>(); AsyncResult<ResultSet> r2 = new AsyncResult<>();
r2.setResult(resultSet); if (ar.isSucceeded())
r2.setResult(resultSet);
else
r2.setCause(ar.getCause());
handler.handle(r2); handler.handle(r2);
} }
} }
Expand Down Expand Up @@ -173,7 +178,8 @@ private int executeUpdateInternal(String sql, AsyncHandler<AsyncResult<Integer>>
AsyncHandler<AsyncResult<Integer>> h = new AsyncHandler<AsyncResult<Integer>>() { AsyncHandler<AsyncResult<Integer>> h = new AsyncHandler<AsyncResult<Integer>>() {
@Override @Override
public void handle(AsyncResult<Integer> ar) { public void handle(AsyncResult<Integer> ar) {
updateCount = ar.getResult(); if (ar.isSucceeded())
updateCount = ar.getResult();
// 设置完后再调用handle,否则有可能当前语句提前关闭了 // 设置完后再调用handle,否则有可能当前语句提前关闭了
setExecutingStatement(null); setExecutingStatement(null);
command.close(); command.close();
Expand Down
Expand Up @@ -16,8 +16,8 @@
import org.lealone.db.SysProperties; import org.lealone.db.SysProperties;
import org.lealone.db.result.Result; import org.lealone.db.result.Result;
import org.lealone.db.value.Value; import org.lealone.db.value.Value;
import org.lealone.net.AsyncCallback;
import org.lealone.net.Transfer; import org.lealone.net.Transfer;
import org.lealone.net.VoidAsyncCallback;


/** /**
* The client side part of a result set that is kept on the server. * The client side part of a result set that is kept on the server.
Expand Down Expand Up @@ -168,7 +168,7 @@ protected void sendFetch(int fetchSize) throws IOException {
session.traceOperation("RESULT_FETCH_ROWS", id); session.traceOperation("RESULT_FETCH_ROWS", id);
transfer.writeRequestHeader(id, Session.RESULT_FETCH_ROWS).writeInt(fetchSize); transfer.writeRequestHeader(id, Session.RESULT_FETCH_ROWS).writeInt(fetchSize);


VoidAsyncCallback ac = new VoidAsyncCallback(); AsyncCallback<Void> ac = new AsyncCallback<>();
transfer.addAsyncCallback(id, ac); transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();
ac.await(); ac.await();
Expand Down
33 changes: 33 additions & 0 deletions lealone-common/src/main/java/org/lealone/async/AsyncResult.java
Expand Up @@ -18,13 +18,46 @@
package org.lealone.async; package org.lealone.async;


public class AsyncResult<T> { public class AsyncResult<T> {

protected T result; protected T result;
protected Throwable cause;
protected boolean succeeded;
protected boolean failed;


public T getResult() { public T getResult() {
return result; return result;
} }


public void setResult(T result) { public void setResult(T result) {
this.result = result; this.result = result;
failed = false;
succeeded = true;
}

public Throwable getCause() {
return cause;
}

public void setCause(Throwable cause) {
this.cause = cause;
failed = true;
succeeded = false;
}

public boolean isSucceeded() {
return succeeded;
} }

public void setSucceeded(boolean succeeded) {
this.succeeded = succeeded;
}

public boolean isFailed() {
return failed;
}

public void setFailed(boolean failed) {
this.failed = failed;
}

} }
39 changes: 31 additions & 8 deletions lealone-net/src/main/java/org/lealone/net/AsyncCallback.java
Expand Up @@ -19,20 +19,46 @@


import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;


import org.lealone.async.AsyncHandler;
import org.lealone.async.AsyncResult;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;


@SuppressWarnings("rawtypes")
public class AsyncCallback<T> { public class AsyncCallback<T> {


protected Transfer transfer; protected Transfer transfer;
protected T result; protected T result;
protected DbException e; protected DbException e;
protected CountDownLatch latch = new CountDownLatch(1); protected CountDownLatch latch = new CountDownLatch(1);


protected AsyncHandler ah;

public AsyncCallback() {
}

public AsyncCallback(AsyncHandler ah) {
this.ah = ah;
}

public void setAsyncHandler(AsyncHandler ah) {
this.ah = ah;
}

@SuppressWarnings("unchecked")
private void handle(DbException e) {
if (ah == null)
throw e;

AsyncResult r = new AsyncResult();
r.setCause(e);
ah.handle(r);
}

public T getResult() { public T getResult() {
try { try {
latch.await(); latch.await();
if (e != null) if (e != null)
throw e; handle(e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
Expand All @@ -46,21 +72,18 @@ public void setTransfer(Transfer transfer) {
public void setDbException(DbException e) { public void setDbException(DbException e) {
this.e = e; this.e = e;
latch.countDown(); latch.countDown();
handle(e);
} }


public void setResult(T result) { public void setResult(T result) {
this.result = result; this.result = result;
latch.countDown(); latch.countDown();
} }


public void run() {
runInternal();
latch.countDown();
}

public void run(Transfer transfer) { public void run(Transfer transfer) {
this.transfer.setDataInputStream(transfer.getDataInputStream()); this.transfer.setDataInputStream(transfer.getDataInputStream());
run(); runInternal();
latch.countDown();
} }


protected void runInternal() { protected void runInternal() {
Expand All @@ -70,7 +93,7 @@ public void await() {
try { try {
latch.await(); latch.await();
if (e != null) if (e != null)
throw e; handle(e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw DbException.convert(e); throw DbException.convert(e);
} }
Expand Down
16 changes: 8 additions & 8 deletions lealone-net/src/main/java/org/lealone/net/AsyncConnection.java
Expand Up @@ -141,7 +141,7 @@ public void writeInitPacket(Session session, int sessionId, Transfer transfer, C
for (String key : keys) { for (String key : keys) {
transfer.writeString(key).writeString(ci.getProperty(key)); transfer.writeString(key).writeString(ci.getProperty(key));
} }
VoidAsyncCallback ac = new VoidAsyncCallback() { AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override @Override
public void runInternal() { public void runInternal() {
try { try {
Expand Down Expand Up @@ -350,7 +350,7 @@ private static void writeBatchResult(Transfer transfer, Session session, int id,
transfer.flush(); transfer.flush();
} }


private void executeQuery(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command, private void executeQueryAsync(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command,
int operation, int objectId, int maxRows, int fetchSize) throws IOException { int operation, int objectId, int maxRows, int fetchSize) throws IOException {
PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Callable<Object>() { PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Callable<Object>() {
@Override @Override
Expand Down Expand Up @@ -402,7 +402,7 @@ private void prepareCommit(Session session, PreparedStatement command) throws Ex
} }
} }


private void executeUpdate(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command, private void executeUpdateAsync(Transfer transfer, Session session, int sessionId, int id, PreparedStatement command,
int operation) throws IOException { int operation) throws IOException {
PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Callable<Object>() { PreparedCommand pc = new PreparedCommand(id, command, transfer, session, new Callable<Object>() {
@Override @Override
Expand Down Expand Up @@ -469,7 +469,7 @@ void sendError(Transfer transfer, int id, Throwable t) {
} }


if (isServer) { if (isServer) {
message = "[Server]" + message; message = "[Server] " + message;
} }


transfer.reset(); // 为什么要reset? 见reset中的注释 transfer.reset(); // 为什么要reset? 见reset中的注释
Expand Down Expand Up @@ -573,7 +573,7 @@ private void processRequest(Transfer transfer, int id) throws IOException {
} }
PreparedStatement command = session.prepareStatement(sql, fetchSize); PreparedStatement command = session.prepareStatement(sql, fetchSize);
cache.addObject(id, command); cache.addObject(id, command);
executeQuery(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize); executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize);
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY: case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY:
Expand All @@ -590,7 +590,7 @@ private void processRequest(Transfer transfer, int id) throws IOException {
session.setAutoCommit(false); session.setAutoCommit(false);
session.setRoot(false); session.setRoot(false);
} }
executeQuery(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize); executeQueryAsync(transfer, session, sessionId, id, command, operation, objectId, maxRows, fetchSize);
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE: case Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE:
Expand All @@ -607,7 +607,7 @@ private void processRequest(Transfer transfer, int id) throws IOException {
} }
PreparedStatement command = session.prepareStatement(sql, -1); PreparedStatement command = session.prepareStatement(sql, -1);
cache.addObject(id, command); cache.addObject(id, command);
executeUpdate(transfer, session, sessionId, id, command, operation); executeUpdateAsync(transfer, session, sessionId, id, command, operation);
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE: case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE:
Expand All @@ -623,7 +623,7 @@ private void processRequest(Transfer transfer, int id) throws IOException {
session.setAutoCommit(false); session.setAutoCommit(false);
session.setRoot(false); session.setRoot(false);
} }
executeUpdate(transfer, session, sessionId, id, command, operation); executeUpdateAsync(transfer, session, sessionId, id, command, operation);
break; break;
} }
case Session.COMMAND_STORAGE_DISTRIBUTED_PUT: case Session.COMMAND_STORAGE_DISTRIBUTED_PUT:
Expand Down
22 changes: 0 additions & 22 deletions lealone-net/src/main/java/org/lealone/net/IntAsyncCallback.java

This file was deleted.

22 changes: 0 additions & 22 deletions lealone-net/src/main/java/org/lealone/net/OkAsyncCallback.java

This file was deleted.

0 comments on commit 1d7302d

Please sign in to comment.