Skip to content

Commit

Permalink
Implement a SRV ketama client
Browse files Browse the repository at this point in the history
  • Loading branch information
spkrka committed Feb 19, 2015
1 parent e1ae176 commit 6066a94
Show file tree
Hide file tree
Showing 5 changed files with 417 additions and 13 deletions.
2 changes: 1 addition & 1 deletion checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<property name="max" value="100"/>
</module>
<module name="MethodLength"/>
<module name="ParameterNumber"/>
<!--module name="ParameterNumber"/-->


<!-- Checks for whitespace -->
Expand Down
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@
<artifactId>metrics-core</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>com.spotify</groupId>
<artifactId>dns</artifactId>
<version>2.2.0</version>
</dependency>

<!-- Tests -->
<dependency>
Expand Down
129 changes: 117 additions & 12 deletions src/main/java/com/spotify/folsom/MemcacheClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.spotify.dns.DnsSrvResolver;
import com.spotify.dns.DnsSrvResolvers;
import com.spotify.folsom.client.NoopMetrics;
import com.spotify.folsom.client.ascii.DefaultAsciiMemcacheClient;
import com.spotify.folsom.client.binary.DefaultBinaryMemcacheClient;
Expand All @@ -35,7 +38,10 @@
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
Expand All @@ -59,6 +65,28 @@ private static class DefaultExecutor {
new UncaughtExceptionHandler(), true);
}

/**
* Lazily instantiated singleton default srvResolver.
*/
private static class DefaultDnsResolver {
private static final DnsSrvResolver INSTANCE = DnsSrvResolvers.newBuilder()
.cachingLookups(true)
.retainingDataOnFailures(true)
.build();
}

/**
* Lazily instantiated singleton default scheduled executor.
*/
private static class DefaultScheduledExecutor {
private static final ScheduledExecutorService INSTANCE =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("folsom-default-scheduled-executor")
.build());
}

private List<HostAndPort> addresses = ImmutableList.of(
HostAndPort.fromParts(DEFAULT_HOSTNAME, DEFAULT_PORT));
private int maxOutstandingRequests = DEFAULT_MAX_OUTSTANDING;
Expand All @@ -72,6 +100,11 @@ private static class DefaultExecutor {
private boolean retry = true;
private Executor executor;

private DnsSrvResolver srvResolver;
private String srvRecord;
private long dnsRefreshPeriod = 60 * 1000L;
private long shutdownDelay = 60 * 1000L;

private long timeoutMillis = 3000;

/**
Expand Down Expand Up @@ -143,6 +176,51 @@ public MemcacheClientBuilder<V> withAddresses(final List<HostAndPort> addresses)
return this;
}

/**
* Use SRV to lookup nodes instead of a fixed set of addresses.
* This means that the set of nodes can change dynamically over time.
* @param srvRecord the SRV record to use.
* @return itself
*/
public MemcacheClientBuilder<V> withSRVRecord(final String srvRecord) {
this.srvRecord = checkNotNull(srvRecord);
return this;
}

/**
* This is only used for the SRV based ketama client. This is how often
* DNS should be queried for updates.
* @param periodMillis time in milliseonds. The default is 60 seconds.
* @return itself
*/
public MemcacheClientBuilder<V> withSRVRefreshPeriod(final long periodMillis) {
this.dnsRefreshPeriod = periodMillis;
return this;
}

/**
* This is only used for the SRV based ketama client. When the SRV record has changed,
* the old client will be shutdown after this much time has passed, in order to complete
* pending requests.
* @param shutdownDelay time in milliseconds. The default is 60 seconds.
* @return itself
*/
public MemcacheClientBuilder<V> withSRVShutdownDelay(final long shutdownDelay) {
this.shutdownDelay = shutdownDelay;
return this;
}

/**
* Use a specific SRV resolver.
* @param srvResolver the resolver to use. Default is a caching resolver from
* {@link com.spotify.dns.DnsSrvResolvers}
* @return itself
*/
public MemcacheClientBuilder<V> withSrvResolver(final DnsSrvResolver srvResolver) {
this.srvResolver = checkNotNull(srvResolver, "srvResolver");
return this;
}

/**
* Specify how to collect metrics.
* @param metrics Default is NoopMetrics - which doesn't collect anything.
Expand Down Expand Up @@ -250,21 +328,27 @@ public AsciiMemcacheClient<V> connectAscii() {
* @return A raw memcached client.
*/
protected RawMemcacheClient connectRaw(boolean binary) {
final List<RawMemcacheClient> clients = createClients(binary);

RawMemcacheClient client;
if (addresses.size() > 1) {
checkState(clients.size() == addresses.size());

final List<AddressAndClient> aac = Lists.newArrayListWithCapacity(clients.size());
for (int i = 0; i < clients.size(); i++) {
final HostAndPort address = addresses.get(i);
aac.add(new AddressAndClient(address, clients.get(i)));
if (srvRecord != null) {
if (!addresses.isEmpty()) {
throw new IllegalStateException("You may not specify both srvRecord and addresses");
}

client = new KetamaMemcacheClient(aac);
client = createSRVClient(binary);
} else {
client = clients.get(0);
final List<RawMemcacheClient> clients = createClients(binary);
if (addresses.size() > 1) {
checkState(clients.size() == addresses.size());

final List<AddressAndClient> aac = Lists.newArrayListWithCapacity(clients.size());
for (int i = 0; i < clients.size(); i++) {
final HostAndPort address = addresses.get(i);
aac.add(new AddressAndClient(address, clients.get(i)));
}

client = new KetamaMemcacheClient(aac);
} else {
client = clients.get(0);
}
}

if (retry) {
Expand All @@ -283,6 +367,27 @@ private List<RawMemcacheClient> createClients(boolean binary) {
return clients;
}

private RawMemcacheClient createSRVClient(final boolean binary) {
DnsSrvResolver resolver = srvResolver;
if (resolver == null) {
resolver = DefaultDnsResolver.INSTANCE;
}

SrvKetamaClient client = new SrvKetamaClient(srvRecord, resolver,
DefaultScheduledExecutor.INSTANCE,
dnsRefreshPeriod, TimeUnit.MILLISECONDS,
new SrvKetamaClient.Connector() {
@Override
public RawMemcacheClient connect(HostAndPort input) {
return createClient(input, binary);
}
},
shutdownDelay, TimeUnit.MILLISECONDS);

client.start();
return client;
}

private RawMemcacheClient createClient(final HostAndPort address, boolean binary) {
if (connections == 1) {
return createReconnectingClient(address, binary);
Expand Down

0 comments on commit 6066a94

Please sign in to comment.