Skip to content

Commit

Permalink
ISPN-1995 Add more info to Hot Rod client "new topology..." message
Browse files Browse the repository at this point in the history
  • Loading branch information
galderz authored and maniksurtani committed May 11, 2012
1 parent ef46696 commit b7c77ed
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 43 deletions.
Expand Up @@ -176,9 +176,31 @@ protected void readNewTopologyAndHash(Transport transport, AtomicInteger topolog
int hashSpace = transport.readVInt();
int clusterSize = transport.readVInt();

localLog.tracef("Topology change request: newTopologyId=%d, numKeyOwners=%d, " +
Map<SocketAddress, Set<Integer>> servers2Hash = computeNewHashes(
transport, localLog, newTopologyId, numKeyOwners,
hashFunctionVersion, hashSpace, clusterSize);

if (localLog.isInfoEnabled()) {
localLog.newTopology(transport.getRemoteSocketAddress(), newTopologyId,
servers2Hash.keySet());
}
transport.getTransportFactory().updateServers(servers2Hash.keySet());
if (hashFunctionVersion == 0) {
localLog.trace("Not using a consistent hash function (hash function version == 0).");
} else {
transport.getTransportFactory().updateHashFunction(
servers2Hash, numKeyOwners, hashFunctionVersion, hashSpace);
}
}

protected Map<SocketAddress, Set<Integer>> computeNewHashes(Transport transport,
Log localLog, int newTopologyId, int numKeyOwners,
short hashFunctionVersion, int hashSpace, int clusterSize) {
if (localLog.isTraceEnabled()) {
localLog.tracef("Topology change request: newTopologyId=%d, numKeyOwners=%d, " +
"hashFunctionVersion=%d, hashSpaceSize=%d, clusterSize=%d",
newTopologyId, numKeyOwners, hashFunctionVersion, hashSpace, clusterSize);
}

Map<SocketAddress, Set<Integer>> servers2Hash = new LinkedHashMap<SocketAddress, Set<Integer>>();

Expand All @@ -196,16 +218,7 @@ protected void readNewTopologyAndHash(Transport transport, AtomicInteger topolog
hashes.add(hashCode);
localLog.tracef("Hash code is: %d", hashCode);
}

if (localLog.isInfoEnabled()) {
localLog.newTopology(servers2Hash.keySet());
}
transport.getTransportFactory().updateServers(servers2Hash.keySet());
if (hashFunctionVersion == 0) {
localLog.trace("Not using a consistent hash function (hash function version == 0).");
} else {
transport.getTransportFactory().updateHashFunction(servers2Hash, numKeyOwners, hashFunctionVersion, hashSpace);
}
return servers2Hash;
}

}
Expand Up @@ -48,55 +48,43 @@ public HeaderParams writeHeader(Transport transport, HeaderParams params) {
}

@Override
protected void readNewTopologyAndHash(Transport transport, AtomicInteger topologyId) {
final Log localLog = getLog();
int newTopologyId = transport.readVInt();
topologyId.set(newTopologyId);
int numKeyOwners = transport.readUnsignedShort();
short hashFctVersion = transport.readByte();
ConsistentHash ch = null;
if (hashFctVersion != 0)
ch = transport.getTransportFactory().getConsistentHashFactory()
.newConsistentHash(hashFctVersion);
else
localLog.trace("Not using a consistent hash function (hash function version == 0)");

int hashSpace = transport.readVInt();
int clusterSize = transport.readVInt();
protected Map<SocketAddress, Set<Integer>> computeNewHashes(Transport transport,
Log localLog, int newTopologyId, int numKeyOwners,
short hashFunctionVersion, int hashSpace, int clusterSize) {
// New in 1.1
int numVirtualNodes = transport.readVInt();

localLog.tracef("Topology change request: newTopologyId=%d, numKeyOwners=%d, " +
"hashFunctionVersion=%d, hashSpaceSize=%d, clusterSize=%d, numVirtualNodes=%d",
newTopologyId, numKeyOwners, hashFctVersion, hashSpace, clusterSize,
numVirtualNodes);
if (localLog.isTraceEnabled()) {
localLog.tracef("Topology change request: newTopologyId=%d, numKeyOwners=%d, " +
"hashFunctionVersion=%d, hashSpaceSize=%d, clusterSize=%d, numVirtualNodes=%d",
newTopologyId, numKeyOwners, hashFunctionVersion, hashSpace, clusterSize,
numVirtualNodes);
}

Map<SocketAddress, Set<Integer>> servers2Hash =
new LinkedHashMap<SocketAddress, Set<Integer>>();

ConsistentHash ch = null;
if (hashFunctionVersion != 0)
ch = transport.getTransportFactory().getConsistentHashFactory()
.newConsistentHash(hashFunctionVersion);
else
localLog.trace("Not using a consistent hash function (hash function version == 0)");

for (int i = 0; i < clusterSize; i++) {
String host = transport.readString();
int port = transport.readUnsignedShort();
// TODO: Performance improvement, since hash positions are fixed, we could maybe only calculate for those nodes that the client is not aware of?
int baseHashCode = transport.read4ByteInt();
int normalizedHashCode = getNormalizedHash(baseHashCode, ch);
localLog.tracef("Server(%s:%d) read with base hash code %d, and normalized hash code %d",
host, port, baseHashCode, normalizedHashCode);
host, port, baseHashCode, normalizedHashCode);
cacheHashCode(servers2Hash, host, port, normalizedHashCode);
if (numVirtualNodes > 1)
calcVirtualHashCodes(baseHashCode, numVirtualNodes, servers2Hash, host, port, ch);
}

if (localLog.isInfoEnabled()) {
localLog.newTopology(servers2Hash.keySet());
}
transport.getTransportFactory().updateServers(servers2Hash.keySet());
if (hashFctVersion == 0) {
localLog.trace("Not using a consistent hash function (hash function version == 0)");
} else {
transport.getTransportFactory().updateHashFunction(
servers2Hash, numKeyOwners, hashFctVersion, hashSpace);
}
return servers2Hash;
}

@Override
Expand Down
Expand Up @@ -22,6 +22,9 @@
*/
package org.infinispan.client.hotrod.impl.transport;

import java.net.InetAddress;
import java.net.SocketAddress;

/**
* Transport abstraction.
*
Expand Down Expand Up @@ -70,4 +73,15 @@ public interface Transport {
void writeString(String string);

byte[] dumpStream();

/**
* Returns the address of the endpoint this transport is connected to, or
* <code>null</code> if it is unconnected.
*
* @return a <code>SocketAddress</code> reprensenting the remote endpoint
* of this transport, or <code>null</code> if it is not connected
* yet.
*/
SocketAddress getRemoteSocketAddress();

}
Expand Up @@ -32,8 +32,10 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.concurrent.atomic.AtomicLong;

Expand Down Expand Up @@ -323,4 +325,10 @@ public byte[] dumpStream() {
}
return os.toByteArray();
}

@Override
public SocketAddress getRemoteSocketAddress() {
return socket.getRemoteSocketAddress();
}

}
Expand Up @@ -68,8 +68,8 @@ public interface Log extends BasicLogger {
void errorFromServer(String message);

@LogMessage(level = INFO)
@Message(value = "New topology: %s", id = 4006)
void newTopology(Set<SocketAddress> topology);
@Message(value = "%s sent new topology view (id=%d): %s", id = 4006)
void newTopology(SocketAddress address, int viewId, Set<SocketAddress> topology);

@LogMessage(level = ERROR)
@Message(value = "Exception encountered. Retry %d out of %d", id = 4007)
Expand Down

0 comments on commit b7c77ed

Please sign in to comment.