Skip to content

Commit

Permalink
AsyncConnection接收到的每个包都应知道是属于哪个Session的
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 18, 2016
1 parent 5ee4fcd commit 6b6c85c
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 51 deletions.
Expand Up @@ -447,7 +447,7 @@ public Object executePut(String replicationName, String mapName, ByteBuffer key,
session.traceOperation("COMMAND_STORAGE_PUT", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_PUT);
}
transfer.writeString(mapName).writeByteBuffer(key).writeByteBuffer(value);
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key).writeByteBuffer(value);
if (replicationName != null)
transfer.writeString(replicationName);
transfer.flush();
Expand Down Expand Up @@ -476,7 +476,7 @@ public Object executeGet(String mapName, ByteBuffer key) {
session.traceOperation("COMMAND_STORAGE_GET", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_GET);
}
transfer.writeString(mapName).writeByteBuffer(key);
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key);
transfer.flush();

if (isDistributedUpdate)
Expand All @@ -496,6 +496,7 @@ public void moveLeafPage(String mapName, ByteBuffer splitKey, ByteBuffer page) {
try {
session.traceOperation("COMMAND_STORAGE_MOVE_LEAF_PAGE", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_MOVE_LEAF_PAGE);
transfer.writeInt(session.getSessionId());
transfer.writeString(mapName).writeByteBuffer(splitKey).writeByteBuffer(page);
transfer.flush();
} catch (Exception e) {
Expand All @@ -510,7 +511,7 @@ public void removeLeafPage(String mapName, ByteBuffer key) {
try {
session.traceOperation("COMMAND_STORAGE_REMOVE_LEAF_PAGE", id);
transfer.writeRequestHeader(id, Session.COMMAND_STORAGE_REMOVE_LEAF_PAGE);
transfer.writeString(mapName).writeByteBuffer(key);
transfer.writeInt(session.getSessionId()).writeString(mapName).writeByteBuffer(key);
transfer.flush();
} catch (Exception e) {
session.handleException(e);
Expand Down
Expand Up @@ -464,6 +464,7 @@ public synchronized int readLob(long lobId, byte[] hmac, long offset, byte[] buf
int id = getNextId();
traceOperation("LOB_READ", (int) lobId);
transfer.writeRequestHeader(id, Session.COMMAND_READ_LOB);
transfer.writeInt(sessionId);
transfer.writeLong(lobId);
transfer.writeBytes(hmac);
transfer.writeLong(offset);
Expand All @@ -487,7 +488,7 @@ public synchronized void commitTransaction(String allLocalTransactionNames) {
try {
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_COMMIT);
transfer.writeString(allLocalTransactionNames).flush();
transfer.writeInt(sessionId).writeString(allLocalTransactionNames).flush();
} catch (IOException e) {
handleException(e);
}
Expand All @@ -499,7 +500,7 @@ public synchronized void rollbackTransaction() {
try {
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK);
transfer.flush();
transfer.writeInt(sessionId).flush();
} catch (IOException e) {
handleException(e);
}
Expand All @@ -511,7 +512,7 @@ public synchronized void addSavepoint(String name) {
try {
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT);
transfer.writeString(name);
transfer.writeInt(sessionId).writeString(name);
transfer.flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -524,7 +525,7 @@ public synchronized void rollbackToSavepoint(String name) {
try {
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK_SAVEPOINT);
transfer.writeString(name);
transfer.writeInt(sessionId).writeString(name);
transfer.flush();
} catch (IOException e) {
handleException(e);
Expand All @@ -537,7 +538,7 @@ public synchronized boolean validateTransaction(String localTransactionName) {
try {
int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE);
transfer.writeString(localTransactionName).flush();
transfer.writeInt(sessionId).writeString(localTransactionName).flush();
return transfer.readBoolean();
} catch (Exception e) {
handleException(e);
Expand Down
100 changes: 57 additions & 43 deletions lealone-net/src/main/java/org/lealone/net/AsyncConnection.java
Expand Up @@ -65,7 +65,6 @@ public class AsyncConnection implements Comparable<AsyncConnection>, Handler<Buf
private Transfer transfer;
private final NetSocket socket;

private Session session;
private boolean stop;

private final ConcurrentHashMap<Integer, AsyncCallback<?>> callbackMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -173,8 +172,6 @@ private void readInitPacket(int sessionId) {
String userName = transfer.readString();
userName = StringUtils.toUpperEnglish(userName);
Session session = createSession(originalURL, dbName, userName);
if (this.session == null)
this.session = session;
sessions.put(sessionId, session);
transfer.setSession(session);
transfer.writeResponseHeader(sessionId, Session.STATUS_OK);
Expand Down Expand Up @@ -332,15 +329,15 @@ private void writeRow(Result result, int count) throws IOException {
}
}

private int getState(int oldModificationId) {
private int getState(Session session, int oldModificationId) {
if (session.getModificationId() == oldModificationId) {
return Session.STATUS_OK;
}
return Session.STATUS_OK_STATE_CHANGED;
}

private void writeBatchResult(int id, int[] result, int oldModificationId) throws IOException {
writeResponseHeader(id, oldModificationId);
private void writeBatchResult(Session session, int id, int[] result, int oldModificationId) throws IOException {
writeResponseHeader(session, id, oldModificationId);
for (int i = 0; i < result.length; i++)
transfer.writeInt(result[i]);

Expand All @@ -360,7 +357,7 @@ public Object call() throws Exception {
Callable<Object> callable = new Callable<Object>() {
@Override
public Object call() throws Exception {
transfer.writeResponseHeader(id, getState(oldModificationId));
transfer.writeResponseHeader(id, getState(session, oldModificationId));

if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY
|| operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY)
Expand Down Expand Up @@ -415,7 +412,7 @@ public Object call() throws Exception {
if (session.isClosed()) {
status = Session.STATUS_CLOSED;
} else {
status = getState(oldModificationId);
status = getState(session, oldModificationId);
}
transfer.writeResponseHeader(id, status);
if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE
Expand Down Expand Up @@ -512,15 +509,11 @@ private void writeResponseHeader(Session session, int id, int oldModificationId)
if (session != null && session.isClosed()) {
status = Session.STATUS_CLOSED;
} else {
status = getState(oldModificationId);
status = getState(session, oldModificationId);
}
transfer.writeResponseHeader(id, status);
}

private void writeResponseHeader(int id, int oldModificationId) throws IOException {
writeResponseHeader(null, id, oldModificationId);
}

private void processRequest(int id) throws IOException {
int operation = transfer.readInt();
switch (operation) {
Expand Down Expand Up @@ -549,28 +542,25 @@ private void processRequest(int id) throws IOException {
transfer.flush();
break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY:
case Session.COMMAND_QUERY: {
int sessionId = transfer.readInt();
String sql = transfer.readString();
int objectId = transfer.readInt();
int maxRows = transfer.readInt();
int fetchSize = transfer.readInt();
Session session = getSession(sessionId);
if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_QUERY) {
session.setAutoCommit(false);
session.setRoot(false);
}
int old = session.getModificationId();
PreparedStatement command = session.prepareStatement(sql, fetchSize);
cache.addObject(id, command);
executeQuery(session, id, command, operation, objectId, maxRows, fetchSize, old);

break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY:
case Session.COMMAND_PREPARED_QUERY: {
int sessionId = transfer.readInt();
int objectId = transfer.readInt();
Expand All @@ -580,15 +570,15 @@ private void processRequest(int id) throws IOException {
command.setFetchSize(fetchSize);
setParameters(command);
Session session = getSession(sessionId);
if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_QUERY) {
session.setAutoCommit(false);
session.setRoot(false);
}
int old = session.getModificationId();
executeQuery(session, id, command, operation, objectId, maxRows, fetchSize, old);

break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE:
case Session.COMMAND_UPDATE:
case Session.COMMAND_REPLICATION_UPDATE: {
int sessionId = transfer.readInt();
Expand All @@ -597,37 +587,44 @@ private void processRequest(int id) throws IOException {
int old = session.getModificationId();
if (operation == Session.COMMAND_REPLICATION_UPDATE)
session.setReplicationName(transfer.readString());

if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_UPDATE) {
session.setAutoCommit(false);
session.setRoot(false);
}
PreparedStatement command = session.prepareStatement(sql, -1);
cache.addObject(id, command);
executeUpdate(session, id, command, operation, old);
break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE:
case Session.COMMAND_PREPARED_UPDATE:
case Session.COMMAND_REPLICATION_PREPARED_UPDATE: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
if (operation == Session.COMMAND_REPLICATION_PREPARED_UPDATE)
session.setReplicationName(transfer.readString());
PreparedStatement command = (PreparedStatement) cache.getObject(id, false);
setParameters(command);
Session session = getSession(sessionId);
if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_PREPARED_UPDATE) {
session.setAutoCommit(false);
session.setRoot(false);
}
int old = session.getModificationId();
executeUpdate(session, id, command, operation, old);
break;
}
case Session.COMMAND_STORAGE_DISTRIBUTED_PUT: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_STORAGE_DISTRIBUTED_PUT:
case Session.COMMAND_STORAGE_PUT:
case Session.COMMAND_STORAGE_REPLICATION_PUT: {
int sessionId = transfer.readInt();
String mapName = transfer.readString();
byte[] key = transfer.readBytes();
byte[] value = transfer.readBytes();
Session session = getSession(sessionId);
if (operation == Session.COMMAND_STORAGE_DISTRIBUTED_PUT) {
session.setAutoCommit(false);
session.setRoot(false);
}
int old = session.getModificationId();
if (operation == Session.COMMAND_STORAGE_REPLICATION_PUT)
session.setReplicationName(transfer.readString());
Expand All @@ -651,13 +648,16 @@ private void processRequest(int id) throws IOException {
transfer.flush();
break;
}
case Session.COMMAND_STORAGE_DISTRIBUTED_GET: {
session.setAutoCommit(false);
session.setRoot(false);
}
case Session.COMMAND_STORAGE_DISTRIBUTED_GET:
case Session.COMMAND_STORAGE_GET: {
int sessionId = transfer.readInt();
String mapName = transfer.readString();
byte[] key = transfer.readBytes();
Session session = getSession(sessionId);
if (operation == Session.COMMAND_STORAGE_DISTRIBUTED_GET) {
session.setAutoCommit(false);
session.setRoot(false);
}
int old = session.getModificationId();

StorageMap<Object, Object> map = session.getStorageMap(mapName);
Expand All @@ -679,9 +679,11 @@ private void processRequest(int id) throws IOException {
break;
}
case Session.COMMAND_STORAGE_MOVE_LEAF_PAGE: {
int sessionId = transfer.readInt();
String mapName = transfer.readString();
ByteBuffer splitKey = transfer.readByteBuffer();
ByteBuffer page = transfer.readByteBuffer();
Session session = getSession(sessionId);
int old = session.getModificationId();
StorageMap<Object, Object> map = session.getStorageMap(mapName);

Expand All @@ -694,8 +696,10 @@ private void processRequest(int id) throws IOException {
break;
}
case Session.COMMAND_STORAGE_REMOVE_LEAF_PAGE: {
int sessionId = transfer.readInt();
String mapName = transfer.readString();
ByteBuffer key = transfer.readByteBuffer();
Session session = getSession(sessionId);
int old = session.getModificationId();
StorageMap<Object, Object> map = session.getStorageMap(mapName);

Expand All @@ -722,13 +726,17 @@ private void processRequest(int id) throws IOException {
break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_COMMIT: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
int old = session.getModificationId();
session.commit(false, transfer.readString());
writeResponseHeader(session, id, old);
transfer.flush();
break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
int old = session.getModificationId();
session.rollback();
writeResponseHeader(session, id, old);
Expand All @@ -737,6 +745,8 @@ private void processRequest(int id) throws IOException {
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT:
case Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK_SAVEPOINT: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
int old = session.getModificationId();
String name = transfer.readString();
if (operation == Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT)
Expand All @@ -748,6 +758,8 @@ private void processRequest(int id) throws IOException {
break;
}
case Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
int old = session.getModificationId();
boolean isValid = session.validateTransaction(transfer.readString());
writeResponseHeader(session, id, old);
Expand All @@ -770,7 +782,7 @@ private void processRequest(int id) throws IOException {
result[i] = Statement.EXECUTE_FAILED;
}
}
writeBatchResult(id, result, old);
writeBatchResult(session, id, result, old);
break;
}
case Session.COMMAND_BATCH_STATEMENT_PREPARED_UPDATE: {
Expand All @@ -793,7 +805,7 @@ private void processRequest(int id) throws IOException {
result[i] = Statement.EXECUTE_FAILED;
}
}
writeBatchResult(id, result, old);
writeBatchResult(session, id, result, old);
break;
}
case Session.COMMAND_CLOSE: {
Expand Down Expand Up @@ -856,6 +868,8 @@ private void processRequest(int id) throws IOException {
break;
}
case Session.COMMAND_READ_LOB: {
int sessionId = transfer.readInt();
Session session = getSession(sessionId);
if (lobs == null) {
lobs = SmallLRUCache.newInstance(Math.max(SysProperties.SERVER_CACHED_OBJECTS,
SysProperties.SERVER_RESULT_SET_FETCH_SIZE * 5));
Expand Down

0 comments on commit 6b6c85c

Please sign in to comment.