Skip to content

Commit

Permalink
TCPServer启动后设置HostAndPort,用于复制有效性验证
Browse files Browse the repository at this point in the history
  • Loading branch information
codefollower committed Mar 16, 2017
1 parent 0d390d7 commit 7e2d7d9
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 28 deletions.
44 changes: 36 additions & 8 deletions lealone-client/src/main/java/org/lealone/client/ClientSession.java
Expand Up @@ -13,6 +13,8 @@
import java.util.Random; import java.util.Random;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;


import org.lealone.api.ErrorCode; import org.lealone.api.ErrorCode;
import org.lealone.common.exceptions.DbException; import org.lealone.common.exceptions.DbException;
Expand Down Expand Up @@ -507,13 +509,25 @@ public synchronized int readLob(long lobId, byte[] hmac, long offset, byte[] buf
transfer.writeBytes(hmac); transfer.writeBytes(hmac);
transfer.writeLong(offset); transfer.writeLong(offset);
transfer.writeInt(length); transfer.writeInt(length);
AtomicInteger lengthAI = new AtomicInteger();
AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override
public void runInternal() {
try {
int length = transfer.readInt();
if (length > 0) {
transfer.readBytes(buff, off, length);
}
lengthAI.set(length);
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush(); transfer.flush();
length = transfer.readInt(); ac.await();
if (length <= 0) { return lengthAI.get();
return length;
}
transfer.readBytes(buff, off, length);
return length;
} catch (IOException e) { } catch (IOException e) {
handleException(e); handleException(e);
} }
Expand Down Expand Up @@ -576,8 +590,22 @@ public synchronized boolean validateTransaction(String localTransactionName) {
try { try {
int id = getNextId(); int id = getNextId();
transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE); transfer.writeRequestHeader(id, Session.COMMAND_DISTRIBUTED_TRANSACTION_VALIDATE);
transfer.writeInt(sessionId).writeString(localTransactionName).flush(); transfer.writeInt(sessionId).writeString(localTransactionName);
return transfer.readBoolean(); AtomicBoolean isValid = new AtomicBoolean();
AsyncCallback<Void> ac = new AsyncCallback<Void>() {
@Override
public void runInternal() {
try {
isValid.set(transfer.readBoolean());
} catch (IOException e) {
throw DbException.convert(e);
}
}
};
transfer.addAsyncCallback(id, ac);
transfer.flush();
ac.await();
return isValid.get();
} catch (Exception e) { } catch (Exception e) {
handleException(e); handleException(e);
return false; return false;
Expand Down
36 changes: 22 additions & 14 deletions lealone-common/src/main/java/org/lealone/db/HostAndPort.java
Expand Up @@ -21,6 +21,16 @@


public class HostAndPort { public class HostAndPort {


private static HostAndPort local = new HostAndPort(Constants.DEFAULT_HOST, Constants.DEFAULT_TCP_PORT);

public static void setLocalHostAndPort(String host, int port) {
local = new HostAndPort(host, port);
}

public static HostAndPort getLocalHostAndPort() {
return local;
}

public final String host; public final String host;
public final int port; public final int port;


Expand All @@ -39,21 +49,22 @@ public HostAndPort(String str) {
port = Integer.decode(str.substring(idx + 1)); port = Integer.decode(str.substring(idx + 1));
str = str.substring(0, idx); str = str.substring(0, idx);
} }

this.host = str; this.host = str;
this.port = port; this.port = port;
this.value = host + ":" + port; this.value = host + ":" + port;
this.inetSocketAddress = new InetSocketAddress(this.host, this.port);
}


public HostAndPort(String host, int port) {
this.host = host;
this.port = port;
this.value = host + ":" + port;
this.inetSocketAddress = new InetSocketAddress(this.host, this.port); this.inetSocketAddress = new InetSocketAddress(this.host, this.port);
} }


@Override @Override
public int hashCode() { public int hashCode() {
final int prime = 31; return inetSocketAddress.hashCode();
int result = 1;
result = prime * result + ((host == null) ? 0 : host.hashCode());
result = prime * result + port;
return result;
} }


@Override @Override
Expand All @@ -65,14 +76,11 @@ public boolean equals(Object obj) {
if (getClass() != obj.getClass()) if (getClass() != obj.getClass())
return false; return false;
HostAndPort other = (HostAndPort) obj; HostAndPort other = (HostAndPort) obj;
if (host == null) { return this.inetSocketAddress.equals(other.inetSocketAddress);
if (other.host != null) }
return false;
} else if (!host.equals(other.host)) public boolean equals(String str) {
return false; return equals(new HostAndPort(str));
if (port != other.port)
return false;
return true;
} }


} }
8 changes: 4 additions & 4 deletions lealone-db/src/main/java/org/lealone/db/ServerSession.java
Expand Up @@ -1284,16 +1284,16 @@ public boolean validate(String localTransactionName) {


@Override @Override
public boolean validate(String hostAndPort, String localTransactionName) { public boolean validate(String hostAndPort, String localTransactionName) {
Session fs = null; Session s = null;
try { try {
String dbName = getDatabase().getShortName(); String dbName = getDatabase().getShortName();
String url = createURL(dbName, hostAndPort); String url = createURL(dbName, hostAndPort);
fs = SessionPool.getSession(this, url); s = SessionPool.getSession(this, url);
return fs.validateTransaction(localTransactionName); return s.validateTransaction(localTransactionName);
} catch (Exception e) { } catch (Exception e) {
throw DbException.convert(e); throw DbException.convert(e);
} finally { } finally {
SessionPool.release(fs); SessionPool.release(s);
} }
} }


Expand Down
3 changes: 2 additions & 1 deletion lealone-db/src/main/java/org/lealone/db/SessionPool.java
Expand Up @@ -68,7 +68,8 @@ public static Session getSession(ServerSession originalSession, String url, bool
if (usesClientSession) if (usesClientSession)
ci.setClient(true); ci.setClient(true);
try { try {
session = ci.getSessionFactory().createSession(ci).connectEmbeddedOrServer(); // 因为已经精确知道要连哪个节点了,connectEmbeddedOrServer不用考虑运行模式,所以用false
session = ci.getSessionFactory().createSession(ci).connectEmbeddedOrServer(false);
session.setLocal(true); session.setLocal(true);
} catch (SQLException e) { } catch (SQLException e) {
throw DbException.convert(e); throw DbException.convert(e);
Expand Down
Expand Up @@ -56,7 +56,7 @@ public MVCCTransaction(MVCCTransactionEngine engine, long tid) {
public MVCCTransaction(MVCCTransactionEngine engine, long tid, String hostAndPort) { public MVCCTransaction(MVCCTransactionEngine engine, long tid, String hostAndPort) {
transactionEngine = engine; transactionEngine = engine;
transactionId = tid; transactionId = tid;
transactionName = getTransactionName(null, tid); transactionName = getTransactionName(hostAndPort, tid);
status = Transaction.STATUS_OPEN; status = Transaction.STATUS_OPEN;
} }


Expand Down
Expand Up @@ -17,6 +17,7 @@
import org.lealone.common.logging.LoggerFactory; import org.lealone.common.logging.LoggerFactory;
import org.lealone.common.security.EncryptionOptions.ServerEncryptionOptions; import org.lealone.common.security.EncryptionOptions.ServerEncryptionOptions;
import org.lealone.db.Constants; import org.lealone.db.Constants;
import org.lealone.db.HostAndPort;
import org.lealone.net.AsyncConnection; import org.lealone.net.AsyncConnection;
import org.lealone.net.CommandHandler; import org.lealone.net.CommandHandler;
import org.lealone.net.NetFactory; import org.lealone.net.NetFactory;
Expand Down Expand Up @@ -68,6 +69,8 @@ public void init(Map<String, String> config) { // TODO 对于不支持的参数
vertx = NetFactory.getVertx(config); vertx = NetFactory.getVertx(config);
} }
} }

HostAndPort.setLocalHostAndPort(host, port);
} }


@Override @Override
Expand Down

0 comments on commit 7e2d7d9

Please sign in to comment.