Permalink
Browse files

Merge pull request #1 from tellapart/dns-resolve

Late-bind DNS resolutions
2 parents 953c322 + dc7d88b commit cd5b8f60f90ac121f5f5bfac7c56746974e179b8 Kevin Ballard committed Oct 23, 2014
Showing with 321 additions and 248 deletions.
  1. +1 −1 pom.xml
  2. +10 −13 src/main/java/net/spy/memcached/AddrUtil.java
  3. +13 −7 src/main/java/net/spy/memcached/BinaryConnectionFactory.java
  4. +2 −5 src/main/java/net/spy/memcached/ConnectionFactory.java
  5. +3 −5 src/main/java/net/spy/memcached/ConnectionObserver.java
  6. +10 −9 src/main/java/net/spy/memcached/DefaultConnectionFactory.java
  7. +79 −0 src/main/java/net/spy/memcached/HostPort.java
  8. +28 −31 src/main/java/net/spy/memcached/MemcachedClient.java
  9. +5 −6 src/main/java/net/spy/memcached/MemcachedClientIF.java
  10. +18 −14 src/main/java/net/spy/memcached/MemcachedConnection.java
  11. +2 −2 src/main/java/net/spy/memcached/MemcachedNode.java
  12. +2 −3 src/main/java/net/spy/memcached/MemcachedNodeROImpl.java
  13. +3 −4 src/main/java/net/spy/memcached/TapClient.java
  14. +10 −12 src/main/java/net/spy/memcached/TapConnectionProvider.java
  15. +5 −5 src/main/java/net/spy/memcached/auth/AuthThread.java
  16. +1 −1 src/main/java/net/spy/memcached/internal/CheckedOperationTimeoutException.java
  17. +22 −13 src/main/java/net/spy/memcached/protocol/TCPMemcachedNodeImpl.java
  18. +13 −6 src/main/java/net/spy/memcached/protocol/ascii/AsciiMemcachedNodeImpl.java
  19. +14 −7 src/main/java/net/spy/memcached/protocol/binary/BinaryMemcachedNodeImpl.java
  20. +2 −1 src/main/java/net/spy/memcached/spring/MemcachedClientFactoryBean.java
  21. +3 −3 src/main/java/net/spy/memcached/util/DefaultKetamaNodeLocatorConfiguration.java
  22. +10 −11 src/test/java/net/spy/memcached/AddrUtilTest.java
  23. +1 −2 src/test/java/net/spy/memcached/ClientBaseCase.java
  24. +5 −9 src/test/java/net/spy/memcached/ConnectionFactoryBuilderTest.java
  25. +5 −6 src/test/java/net/spy/memcached/ConsistentHashingTest.java
  26. +7 −10 src/test/java/net/spy/memcached/KetamaNodeLocatorTest.java
  27. +1 −2 src/test/java/net/spy/memcached/LongClientTest.java
  28. +7 −10 src/test/java/net/spy/memcached/MemcachedClientConstructorTest.java
  29. +5 −6 src/test/java/net/spy/memcached/MemcachedConnectionTest.java
  30. +4 −6 src/test/java/net/spy/memcached/MemcachedNodeROImplTest.java
  31. +8 −10 src/test/java/net/spy/memcached/MockMemcachedNode.java
  32. +6 −7 src/test/java/net/spy/memcached/ObserverTest.java
  33. +8 −12 src/test/java/net/spy/memcached/ProtocolBaseCase.java
  34. +4 −4 src/test/java/net/spy/memcached/QueueOverflowTest.java
  35. +2 −3 src/test/java/net/spy/memcached/WokenUpOnIdleTest.java
  36. +2 −2 src/test/java/net/spy/memcached/internal/CheckedOperationTimeoutExceptionTest.java
View
@@ -14,7 +14,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>net.spy</groupId>
<artifactId>spymemcached</artifactId>
- <version>2.999.999-SNAPSHOT</version> <!-- not used -->
+ <version>2.11.4-tellapart</version> <!-- not used -->
<name>Spymemcached</name>
<description>A client library for memcached.</description>
@@ -23,7 +23,6 @@
package net.spy.memcached;
-import java.net.InetSocketAddress;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
@@ -45,14 +44,14 @@ private AddrUtil() {
*
* Note that colon-delimited IPv6 is also supported. For example: ::1:11211
*/
- public static List<InetSocketAddress> getAddresses(String s) {
+ public static List<HostPort> getAddresses(String s) {
if (s == null) {
throw new NullPointerException("Null host list");
}
if (s.trim().equals("")) {
throw new IllegalArgumentException("No hosts in list: ``" + s + "''");
}
- ArrayList<InetSocketAddress> addrs = new ArrayList<InetSocketAddress>();
+ ArrayList<HostPort> addrs = new ArrayList<HostPort>();
for (String hoststuff : s.split("(?:\\s|,)+")) {
if (hoststuff.equals("")) {
@@ -67,15 +66,14 @@ private AddrUtil() {
String hostPart = hoststuff.substring(0, finalColon);
String portNum = hoststuff.substring(finalColon + 1);
- addrs.add(new InetSocketAddress(hostPart, Integer.parseInt(portNum)));
+ addrs.add(new HostPort(hostPart, Integer.parseInt(portNum)));
}
assert !addrs.isEmpty() : "No addrs found";
return addrs;
}
- public static List<InetSocketAddress> getAddresses(List<String> servers) {
- ArrayList<InetSocketAddress> addrs =
- new ArrayList<InetSocketAddress>(servers.size());
+ public static List<HostPort> getAddresses(List<String> servers) {
+ ArrayList<HostPort> addrs = new ArrayList<HostPort>(servers.size());
for (String server : servers) {
int finalColon = server.lastIndexOf(':');
if (finalColon < 1) {
@@ -85,7 +83,7 @@ private AddrUtil() {
String hostPart = server.substring(0, finalColon);
String portNum = server.substring(finalColon + 1);
- addrs.add(new InetSocketAddress(hostPart, Integer.parseInt(portNum)));
+ addrs.add(new HostPort(hostPart, Integer.parseInt(portNum)));
}
if (addrs.isEmpty()) {
// servers was passed in empty, and shouldn't have been
@@ -94,13 +92,12 @@ private AddrUtil() {
return addrs;
}
- public static List<InetSocketAddress>
- getAddressesFromURL(List<URL> servers) {
- ArrayList<InetSocketAddress> addrs =
- new ArrayList<InetSocketAddress>(servers.size());
+ public static List<HostPort> getAddressesFromURL(List<URL> servers) {
+ ArrayList<HostPort> addrs = new ArrayList<HostPort>(servers.size());
for (URL server : servers) {
- addrs.add(new InetSocketAddress(server.getHost(), server.getPort()));
+ addrs.add(new HostPort(server.getHost(), server.getPort()));
}
return addrs;
}
+
}
@@ -23,7 +23,6 @@
package net.spy.memcached;
-import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import net.spy.memcached.protocol.binary.BinaryMemcachedNodeImpl;
@@ -61,16 +60,23 @@ public BinaryConnectionFactory(int len, int bufSize, HashAlgorithm hash) {
}
@Override
- public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
- int bufSize) {
+ public MemcachedNode createMemcachedNode(HostPort hp, SocketChannel c, int bufSize) {
boolean doAuth = false;
if (getAuthDescriptor() != null) {
doAuth = true;
}
- return new BinaryMemcachedNodeImpl(sa, c, bufSize,
- createReadOperationQueue(), createWriteOperationQueue(),
- createOperationQueue(), getOpQueueMaxBlockTime(), doAuth,
- getOperationTimeout(), getAuthWaitTime(), this);
+ return new BinaryMemcachedNodeImpl(
+ hp,
+ c,
+ bufSize,
+ createReadOperationQueue(),
+ createWriteOperationQueue(),
+ createOperationQueue(),
+ getOpQueueMaxBlockTime(),
+ doAuth,
+ getOperationTimeout(),
+ getAuthWaitTime(),
+ this);
}
@Override
@@ -24,8 +24,6 @@
package net.spy.memcached;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.List;
@@ -51,14 +49,13 @@
* @return a new MemcachedConnection connected to those addresses
* @throws IOException for problems initializing the memcached connections
*/
- MemcachedConnection createConnection(List<InetSocketAddress> addrs)
+ MemcachedConnection createConnection(List<HostPort> addrs)
throws IOException;
/**
* Create a new memcached node.
*/
- MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
- int bufSize);
+ MemcachedNode createMemcachedNode(HostPort hp, SocketChannel c,int bufSize);
/**
* Create a BlockingQueue for operations for a connection.
@@ -22,8 +22,6 @@
package net.spy.memcached;
-import java.net.SocketAddress;
-
/**
* Users of this interface will be notified when changes to the state of
* connections take place.
@@ -33,16 +31,16 @@
/**
* A connection has just successfully been established on the given socket.
*
- * @param sa the address of the node whose connection was established
+ * @param hp the address of the node whose connection was established
* @param reconnectCount the number of attempts before the connection was
* established
*/
- void connectionEstablished(SocketAddress sa, int reconnectCount);
+ void connectionEstablished(HostPort hp, int reconnectCount);
/**
* A connection was just lost on the given socket.
*
* @param sa the address of the node whose connection was lost
*/
- void connectionLost(SocketAddress sa);
+ void connectionLost(HostPort sa);
}
@@ -24,18 +24,14 @@
package net.spy.memcached;
import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -166,12 +162,14 @@ public DefaultConnectionFactory() {
this(DEFAULT_OP_QUEUE_LEN, DEFAULT_READ_BUFFER_SIZE);
}
- public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
- int bufSize) {
+ public MemcachedNode createMemcachedNode(HostPort hp, SocketChannel c, int bufSize) {
OperationFactory of = getOperationFactory();
if (of instanceof AsciiOperationFactory) {
- return new AsciiMemcachedNodeImpl(sa, c, bufSize,
+ return new AsciiMemcachedNodeImpl(
+ hp,
+ c,
+ bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
@@ -184,7 +182,10 @@ public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
if (getAuthDescriptor() != null) {
doAuth = true;
}
- return new BinaryMemcachedNodeImpl(sa, c, bufSize,
+ return new BinaryMemcachedNodeImpl(
+ hp,
+ c,
+ bufSize,
createReadOperationQueue(),
createWriteOperationQueue(),
createOperationQueue(),
@@ -203,7 +204,7 @@ public MemcachedNode createMemcachedNode(SocketAddress sa, SocketChannel c,
*
* @see net.spy.memcached.ConnectionFactory#createConnection(java.util.List)
*/
- public MemcachedConnection createConnection(List<InetSocketAddress> addrs)
+ public MemcachedConnection createConnection(List<HostPort> addrs)
throws IOException {
return new MemcachedConnection(getReadBufSize(), this, addrs,
getInitialObservers(), getFailureMode(), getOperationFactory());
@@ -0,0 +1,79 @@
+package net.spy.memcached;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Container for the hostname and port of a Node, and resolve them into an
+ * InetSocketAddress.
+ */
+public class HostPort {
+
+ private final String host;
+ private final int port;
+
+ private volatile InetSocketAddress address;
+
+ public HostPort(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public String getHostName() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ /**
+ * Get the InetSocketAddress. If it has not already been resolved then
+ * resolve it.
+ */
+ public InetSocketAddress getAddress() {
+ if (address == null) {
+ resolveAddress();
+ }
+ return address;
+ }
+
+ /**
+ * Resolve and return the InetSocketAddress. If it has already been resolved,
+ * it will be re-resolved.
+ */
+ public InetSocketAddress resolveAddress() {
+ address = new InetSocketAddress(host, port);
+ return address;
+ }
+
+ @Override
+ public String toString() {
+ if (address == null) {
+ return InetSocketAddress.createUnresolved(host, port).toString();
+ }
+ else {
+ return address.toString();
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ HostPort hostPort = (HostPort) o;
+
+ if (port != hostPort.port) return false;
+ if (host != null ? !host.equals(hostPort.host) : hostPort.host != null) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = host != null ? host.hashCode() : 0;
+ result = 31 * result + port;
+ return result;
+ }
+
+}
Oops, something went wrong.

0 comments on commit cd5b8f6

Please sign in to comment.