Skip to content

Commit

Permalink
无效Session不需要向服务器端发送SESSION_CLOSE命令
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 24, 2017
1 parent 463c8be commit 722530f
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 46 deletions.
Expand Up @@ -125,7 +125,7 @@ public Session connect(boolean first) {
Session[] sessions = new ClientSession[size]; Session[] sessions = new ClientSession[size];
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
// 如果首次连接的节点就是复制节点之一,则复用它 // 如果首次连接的节点就是复制节点之一,则复用它
if (!isInvalid()) { if (isValid()) {
HostAndPort hostAndPort = new HostAndPort(servers[i]); HostAndPort hostAndPort = new HostAndPort(servers[i]);
if (hostAndPort.inetSocketAddress.equals(this.inetSocketAddress)) { if (hostAndPort.inetSocketAddress.equals(this.inetSocketAddress)) {
sessions[i] = this; sessions[i] = this;
Expand Down Expand Up @@ -259,7 +259,8 @@ private Transfer initTransfer(ConnectionInfo ci, HostAndPort hostAndPort) throws
sessionId = getNextId(); sessionId = getNextId();
transfer = asyncConnection.createTransfer(this); transfer = asyncConnection.createTransfer(this);
asyncConnection.writeInitPacket(this, sessionId, transfer, ci); asyncConnection.writeInitPacket(this, sessionId, transfer, ci);
asyncConnection.addSession(sessionId, this); if (isValid())
asyncConnection.addSession(sessionId, this);
return transfer; return transfer;
} }


Expand Down Expand Up @@ -367,8 +368,11 @@ public void close() {
synchronized (this) { synchronized (this) {
try { try {
traceOperation("SESSION_CLOSE", 0); traceOperation("SESSION_CLOSE", 0);
transfer.writeRequestHeader(sessionId, Session.SESSION_CLOSE).flush(); // 只有当前Session有效时服务器端才持有对应的session
asyncConnection.removeSession(sessionId); if (isValid()) {
transfer.writeRequestHeader(sessionId, Session.SESSION_CLOSE).flush();
asyncConnection.removeSession(sessionId);
}


synchronized (ClientSession.class) { synchronized (ClientSession.class) {
if (asyncConnection.isEmpty()) { if (asyncConnection.isEmpty()) {
Expand Down
2 changes: 2 additions & 0 deletions lealone-common/src/main/java/org/lealone/db/Session.java
Expand Up @@ -194,6 +194,8 @@ public interface Session extends Closeable, Transaction.Participant {


boolean isInvalid(); boolean isInvalid();


boolean isValid();

void setTargetEndpoints(String targetEndpoints); void setTargetEndpoints(String targetEndpoints);


String getTargetEndpoints(); String getTargetEndpoints();
Expand Down
5 changes: 5 additions & 0 deletions lealone-common/src/main/java/org/lealone/db/SessionBase.java
Expand Up @@ -102,6 +102,11 @@ public boolean isInvalid() {
return invalid; return invalid;
} }


@Override
public boolean isValid() {
return !invalid;
}

@Override @Override
public void setTargetEndpoints(String targetEndpoints) { public void setTargetEndpoints(String targetEndpoints) {
this.targetEndpoints = targetEndpoints; this.targetEndpoints = targetEndpoints;
Expand Down
Expand Up @@ -88,7 +88,7 @@ public ServerSession createSession(ConnectionInfo ci) {
// ignore // ignore
} }
} }
if (session.isInvalid()) { if (session.isInvalid()) { // 无效session,不需要进行后续的操作
return session; return session;
} }


Expand Down
56 changes: 18 additions & 38 deletions lealone-net/src/main/java/org/lealone/net/AsyncConnection.java
Expand Up @@ -64,47 +64,23 @@ static class SessionInfo {
final Session session; final Session session;
final CommandHandler commandHandler; final CommandHandler commandHandler;
final ConcurrentLinkedQueue<PreparedCommand> preparedCommandQueue; final ConcurrentLinkedQueue<PreparedCommand> preparedCommandQueue;
final long commandHandlerSequence;


SessionInfo(String hostAndPort, int sessionId, Session session, CommandHandler commandHandler) { SessionInfo(String hostAndPort, int sessionId, Session session, CommandHandler commandHandler) {
this.sessionId = sessionId; this.sessionId = sessionId;
this.hostAndPort = hostAndPort; this.hostAndPort = hostAndPort;
this.session = session; this.session = session;
this.commandHandler = commandHandler; this.commandHandler = commandHandler;
this.preparedCommandQueue = new ConcurrentLinkedQueue<>(); this.preparedCommandQueue = new ConcurrentLinkedQueue<>();

// SessionInfo的hostAndPort和sessionId字段不足以区别它自身,所以用commandHandlerSequence
commandHandlerSequence = commandHandler.getNextSequence();
} }


@Override @Override
public String toString() { public String toString() {
return "SessionInfo [hostAndPort=" + hostAndPort + ", sessionId=" + sessionId + "]"; return "SessionInfo [hostAndPort=" + hostAndPort + ", sessionId=" + sessionId + "]";
} }

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((hostAndPort == null) ? 0 : hostAndPort.hashCode());
result = prime * result + sessionId;
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
SessionInfo other = (SessionInfo) obj;
if (hostAndPort == null) {
if (other.hostAndPort != null)
return false;
} else if (!hostAndPort.equals(other.hostAndPort))
return false;
if (sessionId != other.sessionId)
return false;
return true;
}
} }


private static final Logger logger = LoggerFactory.getLogger(AsyncConnection.class); private static final Logger logger = LoggerFactory.getLogger(AsyncConnection.class);
Expand Down Expand Up @@ -221,7 +197,7 @@ private void readInitPacket(Transfer transfer, int sessionId) {
String userName = transfer.readString(); String userName = transfer.readString();
userName = StringUtils.toUpperEnglish(userName); userName = StringUtils.toUpperEnglish(userName);
Session session = createSession(transfer, originalURL, dbName, userName); Session session = createSession(transfer, originalURL, dbName, userName);
if (!session.isInvalid()) { if (session.isValid()) {
CommandHandler commandHandler = CommandHandler.getNextCommandHandler(); CommandHandler commandHandler = CommandHandler.getNextCommandHandler();
sessions.put(sessionId, session); sessions.put(sessionId, session);
SessionInfo sessionInfo = new SessionInfo(hostAndPort, sessionId, session, commandHandler); SessionInfo sessionInfo = new SessionInfo(hostAndPort, sessionId, session, commandHandler);
Expand Down Expand Up @@ -772,16 +748,16 @@ protected void processRequest(Transfer transfer, int id, int operation) throws I
int sessionId = transfer.readInt(); int sessionId = transfer.readInt();
Session session = getSession(sessionId); Session session = getSession(sessionId);
session.commit(transfer.readString()); session.commit(transfer.readString());
writeResponseHeader(transfer, session, id); // writeResponseHeader(transfer, session, id); //不需要发回响应
transfer.flush(); // transfer.flush();
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK: { case Session.COMMAND_DISTRIBUTED_TRANSACTION_ROLLBACK: {
int sessionId = transfer.readInt(); int sessionId = transfer.readInt();
Session session = getSession(sessionId); Session session = getSession(sessionId);
session.rollback(); session.rollback();
writeResponseHeader(transfer, session, id); // writeResponseHeader(transfer, session, id); //不需要发回响应
transfer.flush(); // transfer.flush();
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT: case Session.COMMAND_DISTRIBUTED_TRANSACTION_ADD_SAVEPOINT:
Expand All @@ -793,8 +769,8 @@ protected void processRequest(Transfer transfer, int id, int operation) throws I
session.addSavepoint(name); session.addSavepoint(name);
else else
session.rollbackToSavepoint(name); session.rollbackToSavepoint(name);
writeResponseHeader(transfer, session, id); // writeResponseHeader(transfer, session, id); //不需要发回响应
transfer.flush(); // transfer.flush();
break; break;
} }
case Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE: { case Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE: {
Expand Down Expand Up @@ -892,9 +868,13 @@ protected void processRequest(Transfer transfer, int id, int operation) throws I
} }
case Session.SESSION_CLOSE: { case Session.SESSION_CLOSE: {
SessionInfo si = sessionInfoMap.remove(id); SessionInfo si = sessionInfoMap.remove(id);
si.commandHandler.removeSession(si); if (si != null) {
Session session = sessions.remove(id); si.commandHandler.removeSession(si);
closeSession(session); Session session = sessions.remove(id);
closeSession(session);
} else {
logger.warn("SessionInfo is null, may be a bug! sessionId = " + id);
}
break; break;
} }
case Session.SESSION_CANCEL_STATEMENT: { case Session.SESSION_CANCEL_STATEMENT: {
Expand Down
13 changes: 10 additions & 3 deletions lealone-net/src/main/java/org/lealone/net/CommandHandler.java
Expand Up @@ -23,6 +23,7 @@
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;


import org.lealone.db.SessionStatus; import org.lealone.db.SessionStatus;
import org.lealone.net.AsyncConnection.SessionInfo; import org.lealone.net.AsyncConnection.SessionInfo;
Expand Down Expand Up @@ -74,17 +75,23 @@ public static void removeConnection(AsyncConnection c) {
c.close(); c.close();
} }


private final ConcurrentHashMap<SessionInfo, SessionInfo> sessionInfoMap = new ConcurrentHashMap<>(); private final AtomicLong sequence = new AtomicLong(0);
private final ConcurrentHashMap<Long, SessionInfo> sessionInfoMap = new ConcurrentHashMap<>();
private final Semaphore haveWork = new Semaphore(1); private final Semaphore haveWork = new Semaphore(1);
private boolean stop; private boolean stop;
private int nested; private int nested;


long getNextSequence() {
return sequence.incrementAndGet();
}

void addSession(SessionInfo sessionInfo) { void addSession(SessionInfo sessionInfo) {
sessionInfoMap.put(sessionInfo, sessionInfo); // SessionInfo的hostAndPort和sessionId字段不足以区别它自身,所以用commandHandlerSequence
sessionInfoMap.put(sessionInfo.commandHandlerSequence, sessionInfo);
} }


void removeSession(SessionInfo sessionInfo) { void removeSession(SessionInfo sessionInfo) {
sessionInfoMap.remove(sessionInfo); sessionInfoMap.remove(sessionInfo.commandHandlerSequence);
} }


public CommandHandler(int id) { public CommandHandler(int id) {
Expand Down

0 comments on commit 722530f

Please sign in to comment.