Skip to content

Commit

Permalink
协议请求和响应包统一用包ID关联
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 18, 2016
1 parent c748db4 commit 47932bf
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 369 deletions.
97 changes: 51 additions & 46 deletions lealone-client/src/main/java/org/lealone/client/ClientCommand.java
Expand Up @@ -69,12 +69,12 @@ private void prepare(ClientSession s, boolean createParams) {
try {
if (createParams) {
s.traceOperation("COMMAND_PREPARE_READ_PARAMS", id);
transfer.writeRequestHeader(Session.COMMAND_PREPARE_READ_PARAMS);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARE_READ_PARAMS);
} else {
s.traceOperation("COMMAND_PREPARE", id);
transfer.writeRequestHeader(Session.COMMAND_PREPARE);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARE);
}
transfer.writeInt(id).writeInt(session.getSessionId()).writeString(sql);
transfer.writeInt(session.getSessionId()).writeString(sql);
VoidAsyncCallback ac = new VoidAsyncCallback() {
@Override
public void runInternal() {
Expand Down Expand Up @@ -130,7 +130,7 @@ public Result getMetaData() {
prepareIfRequired();
try {
session.traceOperation("COMMAND_GET_META_DATA", id);
transfer.writeRequestHeader(Session.COMMAND_GET_META_DATA).writeInt(id);
transfer.writeRequestHeader(id, Session.COMMAND_GET_META_DATA);
transfer.writeInt(session.getSessionId()).writeInt(objectId);
AsyncCallback<ClientResult> ac = new AsyncCallback<ClientResult>() {
@Override
Expand Down Expand Up @@ -177,13 +177,12 @@ private Result executeQueryDirectly(int maxRows, boolean scrollable) {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_QUERY", id);
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY);
} else {
session.traceOperation("COMMAND_QUERY", id);
transfer.writeRequestHeader(Session.COMMAND_QUERY);
transfer.writeRequestHeader(id, Session.COMMAND_QUERY);
}
transfer.writeInt(id).writeInt(session.getSessionId()).writeString(sql).writeInt(objectId)
.writeInt(maxRows);
transfer.writeInt(session.getSessionId()).writeString(sql).writeInt(objectId).writeInt(maxRows);
int fetch;
if (scrollable) {
fetch = Integer.MAX_VALUE;
Expand Down Expand Up @@ -233,12 +232,12 @@ private Result executePreparedQuery(int maxRows, boolean scrollable) {
boolean isDistributedQuery = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedQuery) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY", id);
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY);
} else {
session.traceOperation("COMMAND_PREPARED_QUERY", id);
transfer.writeRequestHeader(Session.COMMAND_PREPARED_QUERY);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_QUERY);
}
transfer.writeInt(id).writeInt(session.getSessionId()).writeInt(objectId).writeInt(maxRows);
transfer.writeInt(session.getSessionId()).writeInt(objectId).writeInt(maxRows);
int fetch;
if (scrollable) {
fetch = Integer.MAX_VALUE;
Expand Down Expand Up @@ -299,34 +298,45 @@ private int executeUpdateDirectly(String replicationName) {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE);
} else if (replicationName != null) {
session.traceOperation("COMMAND_REPLICATION_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_REPLICATION_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_UPDATE);
} else {
session.traceOperation("COMMAND_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_UPDATE);
}
transfer.writeInt(id).writeInt(session.getSessionId()).writeString(sql);
transfer.writeInt(session.getSessionId()).writeString(sql);
if (replicationName != null)
transfer.writeString(replicationName);
IntAsyncCallback ac = new IntAsyncCallback();
transfer.addAsyncCallback(id, ac);
transfer.flush();

// if (isDistributedUpdate)
// session.getTransaction().addLocalTransactionNames(transfer.readString());
//
// updateCount = transfer.readInt();

updateCount = ac.getResult();
updateCount = getUpdateCount(isDistributedUpdate, id);
} catch (Exception e) {
session.handleException(e);
}
session.readSessionState();
return updateCount;
}

private int getUpdateCount(boolean isDistributedUpdate, int id) throws IOException {
IntAsyncCallback ac = new IntAsyncCallback() {
@Override
public void runInternal() {
try {
if (isDistributedUpdate)
session.getTransaction().addLocalTransactionNames(transfer.readString());

setResult(transfer.readInt());
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush();
return ac.getResult();
}

private int executePreparedUpdate(String replicationName) {
checkParameters();
int updateCount = 0;
Expand All @@ -335,29 +345,20 @@ private int executePreparedUpdate(String replicationName) {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE);
} else if (replicationName != null) {
session.traceOperation("COMMAND_REPLICATION_PREPARED_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_REPLICATION_PREPARED_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_REPLICATION_PREPARED_UPDATE);
} else {
session.traceOperation("COMMAND_PREPARED_UPDATE", id);
transfer.writeRequestHeader(Session.COMMAND_PREPARED_UPDATE);
transfer.writeRequestHeader(id, Session.COMMAND_PREPARED_UPDATE);
}
transfer.writeInt(id).writeInt(session.getSessionId());
transfer.writeInt(session.getSessionId());
if (replicationName != null)
transfer.writeString(replicationName);
sendParameters(transfer);
IntAsyncCallback ac = new IntAsyncCallback();
transfer.addAsyncCallback(id, ac);
transfer.flush();

// if (isDistributedUpdate)
// session.getTransaction().addLocalTransactionNames(transfer.readString());
//
// updateCount = transfer.readInt();

updateCount = ac.getResult();

updateCount = getUpdateCount(isDistributedUpdate, id);
} catch (Exception e) {
session.handleException(e);
}
Expand Down Expand Up @@ -386,7 +387,7 @@ public void close() {
}
session.traceOperation("COMMAND_CLOSE", id);
try {
transfer.writeRequestHeader(Session.COMMAND_CLOSE).writeInt(id).flush();
transfer.writeRequestHeader(id, Session.COMMAND_CLOSE).flush();
} catch (IOException e) {
trace.error(e, "close");
}
Expand Down Expand Up @@ -433,17 +434,18 @@ String getSql() {
@Override
public Object executePut(String replicationName, String mapName, ByteBuffer key, ByteBuffer value) {
byte[] bytes = null;
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_PUT", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_DISTRIBUTED_PUT);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_PUT);
} else if (replicationName != null) {
session.traceOperation("COMMAND_STORAGE_REPLICATION_PUT", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_REPLICATION_PUT);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_REPLICATION_PUT);
} else {
session.traceOperation("COMMAND_STORAGE_PUT", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_PUT);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_PUT);
}
transfer.writeString(mapName).writeByteBuffer(key).writeByteBuffer(value);
if (replicationName != null)
Expand All @@ -464,14 +466,15 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
@Override
public Object executeGet(String mapName, ByteBuffer key) {
byte[] bytes = null;
int id = session.getNextId();
try {
boolean isDistributedUpdate = session.getTransaction() != null && !session.getTransaction().isAutoCommit();
if (isDistributedUpdate) {
session.traceOperation("COMMAND_STORAGE_DISTRIBUTED_GET", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_DISTRIBUTED_GET);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_DISTRIBUTED_GET);
} else {
session.traceOperation("COMMAND_STORAGE_GET", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_GET);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET);
}
transfer.writeString(mapName).writeByteBuffer(key);
transfer.flush();
Expand All @@ -489,9 +492,10 @@ public Object executeGet(String mapName, ByteBuffer key) {

@Override
public void moveLeafPage(String mapName, ByteBuffer splitKey, ByteBuffer page) {
int id = session.getNextId();
try {
session.traceOperation("COMMAND_STORAGE_MOVE_LEAF_PAGE", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_MOVE_LEAF_PAGE);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_MOVE_LEAF_PAGE);
transfer.writeString(mapName).writeByteBuffer(splitKey).writeByteBuffer(page);
transfer.flush();
} catch (Exception e) {
Expand All @@ -502,9 +506,10 @@ public void moveLeafPage(String mapName, ByteBuffer splitKey, ByteBuffer page) {

@Override
public void removeLeafPage(String mapName, ByteBuffer key) {
int id = session.getNextId();
try {
session.traceOperation("COMMAND_STORAGE_REMOVE_LEAF_PAGE", id);
transfer.writeRequestHeader(Session.COMMAND_STORAGE_REMOVE_LEAF_PAGE);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_REMOVE_LEAF_PAGE);
transfer.writeString(mapName).writeByteBuffer(key);
transfer.flush();
} catch (Exception e) {
Expand Down
38 changes: 23 additions & 15 deletions lealone-client/src/main/java/org/lealone/client/ClientSession.java
Expand Up @@ -230,7 +230,7 @@ private Transfer initTransfer(ConnectionInfo ci, String server) throws Exception
}
}
sessionId = getNextId();
transfer = asyncConnection.getTransfer().copy();
transfer = asyncConnection.getTransfer().copy(this);
asyncConnection.writeInitPacket(this, sessionId, transfer, ci);
asyncConnection.addSession(sessionId, this);
return transfer;
Expand Down Expand Up @@ -265,7 +265,7 @@ public void cancel() {
*/
public void cancelStatement(int id) {
try {
transfer.writeRequestHeader(Session.SESSION_CANCEL_STATEMENT).writeInt(id).flush();
transfer.writeRequestHeader(id, Session.SESSION_CANCEL_STATEMENT).flush();
} catch (IOException e) {
trace.debug(e, "could not cancel statement");
}
Expand All @@ -288,8 +288,8 @@ private void setAutoCommitSend(boolean autoCommit) {
try {
int id = getNextId();
traceOperation("SESSION_SET_AUTOCOMMIT", autoCommit ? 1 : 0);
transfer.writeRequestHeader(Session.SESSION_SET_AUTO_COMMIT);
transfer.writeInt(id).writeInt(sessionId).writeBoolean(autoCommit);
transfer.writeRequestHeader(id, Session.SESSION_SET_AUTO_COMMIT);
transfer.writeInt(sessionId).writeBoolean(autoCommit);
OkAsyncCallback ac = new OkAsyncCallback();
transfer.addAsyncCallback(id, ac);
transfer.flush();
Expand All @@ -307,13 +307,13 @@ public void handleException(Exception e) {
@Override
public Command createCommand(String sql, int fetchSize) {
checkClosed();
return new ClientCommand(this, transfer.copy(), sql, fetchSize);
return new ClientCommand(this, transfer.copy(this), sql, fetchSize);
}

@Override
public StorageCommand createStorageCommand() {
checkClosed();
return new ClientCommand(this, transfer.copy(), null, -1);
return new ClientCommand(this, transfer.copy(this), null, -1);
}

@Override
Expand Down Expand Up @@ -345,7 +345,7 @@ public void close() {
synchronized (this) {
try {
traceOperation("SESSION_CLOSE", 0);
transfer.writeRequestHeader(Session.SESSION_CLOSE).writeInt(sessionId).flush();
transfer.writeRequestHeader(sessionId, Session.SESSION_CLOSE).flush();
asyncConnection.remove(sessionId);
} catch (RuntimeException e) {
trace.error(e, "close");
Expand Down Expand Up @@ -461,8 +461,9 @@ public Connection getLobConnection() {
@Override
public synchronized int readLob(long lobId, byte[] hmac, long offset, byte[] buff, int off, int length) {
try {
int id = getNextId();
traceOperation("LOB_READ", (int) lobId);
transfer.writeRequestHeader(Session.COMMAND_READ_LOB);
transfer.writeRequestHeader(id, Session.COMMAND_READ_LOB);
transfer.writeLong(lobId);
transfer.writeBytes(hmac);
transfer.writeLong(offset);
Expand All @@ -484,7 +485,8 @@ public synchronized int readLob(long lobId, byte[] hmac, long offset, byte[] buf
public synchronized void commitTransaction(String allLocalTransactionNames) {
checkClosed();
try {
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_COMMIT);
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_COMMIT);
transfer.writeString(allLocalTransactionNames).flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -495,7 +497,8 @@ public synchronized void commitTransaction(String allLocalTransactionNames) {
public synchronized void rollbackTransaction() {
checkClosed();
try {
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK);
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK);
transfer.flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -506,7 +509,9 @@ public synchronized void rollbackTransaction() {
public synchronized void addSavepoint(String name) {
checkClosed();
try {
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT).writeString(name);
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT);
transfer.writeString(name);
transfer.flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -517,7 +522,9 @@ public synchronized void addSavepoint(String name) {
public synchronized void rollbackToSavepoint(String name) {
checkClosed();
try {
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK_SAVEPOINT).writeString(name);
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK_SAVEPOINT);
transfer.writeString(name);
transfer.flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -528,7 +535,8 @@ public synchronized void rollbackToSavepoint(String name) {
public synchronized boolean validateTransaction(String localTransactionName) {
checkClosed();
try {
transfer.writeRequestHeader(Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE);
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE);
transfer.writeString(localTransactionName).flush();
return transfer.readBoolean();
} catch (Exception e) {
Expand All @@ -550,13 +558,13 @@ public Transaction getTransaction() {

public synchronized ClientBatchCommand getClientBatchCommand(ArrayList<String> batchCommands) {
checkClosed();
return new ClientBatchCommand(this, transfer.copy(), batchCommands);
return new ClientBatchCommand(this, transfer.copy(this), batchCommands);
}

public synchronized ClientBatchCommand getClientBatchCommand(Command preparedCommand,
ArrayList<Value[]> batchParameters) {
checkClosed();
return new ClientBatchCommand(this, transfer.copy(), preparedCommand, batchParameters);
return new ClientBatchCommand(this, transfer.copy(this), preparedCommand, batchParameters);
}

@Override
Expand Down

0 comments on commit 47932bf

Please sign in to comment.