Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
Allow configuring feature provider for bookkeeper client
Browse files Browse the repository at this point in the history
* make the feature provider scope-able
* passing the feature provider through client builder
* provide cacheable and settable feature provider and by default disable all features in the default provider
* deprecate #getFeature and #setFeature in configuration

RB_ID=599328
  • Loading branch information
Sijie Guo committed Mar 11, 2015
1 parent 5c198b6 commit c2a092a
Show file tree
Hide file tree
Showing 14 changed files with 201 additions and 51 deletions.
Expand Up @@ -29,6 +29,8 @@
import org.apache.bookkeeper.client.AsyncCallback.IsClosedCallback;
import org.apache.bookkeeper.client.BKException.Code;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.bookkeeper.meta.CleanupLedgerManager;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerManagerFactory;
Expand Down Expand Up @@ -98,6 +100,7 @@ public class BookKeeper {
final ScheduledExecutorService scheduler;
final HashedWheelTimer requestTimer;
final boolean ownTimer;
final FeatureProvider featureProvider;

// Ledger manager responsible for how to store ledger meta data
final LedgerManagerFactory ledgerManagerFactory;
Expand Down Expand Up @@ -128,7 +131,7 @@ public static class Builder {
StatsLogger statsLogger = null;
DNSToSwitchMapping dnsResolver = null;
HashedWheelTimer requestTimer = null;

FeatureProvider featureProvider = null;

public Builder config(ClientConfiguration conf) {
this.conf = conf;
Expand Down Expand Up @@ -160,10 +163,14 @@ public Builder requestTimer(HashedWheelTimer requestTimer) {
return this;
}

public Builder featureProvider(FeatureProvider featureProvider) {
this.featureProvider = featureProvider;
return this;
}

public BookKeeper build() throws IOException, InterruptedException, KeeperException {
return new BookKeeper(conf, zk, channelFactory,
statsLogger, dnsResolver, requestTimer);
statsLogger, dnsResolver, requestTimer, featureProvider);
}

}
Expand Down Expand Up @@ -199,12 +206,12 @@ public BookKeeper(String servers) throws IOException, InterruptedException,
*/
public BookKeeper(final ClientConfiguration conf)
throws IOException, InterruptedException, KeeperException {
this(conf, null, null, NullStatsLogger.INSTANCE, null, null);
this(conf, null, null, NullStatsLogger.INSTANCE, null, null, null);
}

public BookKeeper(final ClientConfiguration conf, StatsLogger statsLogger)
throws IOException, InterruptedException, KeeperException {
this(conf, null, null, statsLogger, null, null);
this(conf, null, null, statsLogger, null, null, null);
}

/**
Expand All @@ -223,17 +230,17 @@ public BookKeeper(final ClientConfiguration conf, StatsLogger statsLogger)
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, null, NullStatsLogger.INSTANCE, null, null);
this(conf, zk, null, NullStatsLogger.INSTANCE, null, null, null);
}

public BookKeeper(ClientConfiguration conf, ZooKeeper zk, StatsLogger statsLogger)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, null, statsLogger, null, null);
this(conf, zk, null, statsLogger, null, null, null);
}

public BookKeeper(ClientConfiguration conf, ZooKeeper zk, StatsLogger statsLogger, DNSToSwitchMapping dnsToSwitchMapping)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, null, statsLogger, dnsToSwitchMapping, null);
this(conf, zk, null, statsLogger, dnsToSwitchMapping, null, null);
}

/**
Expand All @@ -255,24 +262,25 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, StatsLogger statsLogge
*/
public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, channelFactory, NullStatsLogger.INSTANCE, null, null);
this(conf, zk, channelFactory, NullStatsLogger.INSTANCE, null, null, null);
}

public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory,
StatsLogger statsLogger)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, channelFactory, statsLogger, null, null);
this(conf, zk, channelFactory, statsLogger, null, null, null);
}

public BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory,
StatsLogger statsLogger, DNSToSwitchMapping dnsResolver)
throws IOException, InterruptedException, KeeperException {
this(conf, zk, channelFactory, statsLogger, dnsResolver, null);
this(conf, zk, channelFactory, statsLogger, dnsResolver, null, null);

}

private BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFactory channelFactory,
StatsLogger statsLogger, DNSToSwitchMapping dnsResolver, HashedWheelTimer requestTimer)
StatsLogger statsLogger, DNSToSwitchMapping dnsResolver, HashedWheelTimer requestTimer,
FeatureProvider featureProvider)
throws IOException, InterruptedException, KeeperException {
this.conf = conf;

Expand Down Expand Up @@ -316,12 +324,22 @@ private BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFa
this.ownTimer = false;
}

if (null == featureProvider) {
this.featureProvider = SettableFeatureProvider.DISABLE_ALL;
} else {
this.featureProvider = featureProvider;
}

this.scheduler = Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder().setNameFormat("bkc-scheduler-%d").build());
this.statsLogger = ClientStatsProvider.createBookKeeperClientStatsLogger(statsLogger);
this.alertStatsLogger = new AlertStatsLogger(statsLogger, "bk_alert");
// initialize the ensemble placement
this.placementPolicy = initializeEnsemblePlacementPolicy(dnsResolver, requestTimer, statsLogger.scope("bookkeeper_client"));
this.placementPolicy = initializeEnsemblePlacementPolicy(
dnsResolver,
requestTimer,
this.featureProvider,
statsLogger.scope("bookkeeper_client"));

if (conf.getFirstSpeculativeReadTimeout() > 0) {
this.readSpeculativeRequestPolicy =
Expand Down Expand Up @@ -364,11 +382,13 @@ private BookKeeper(ClientConfiguration conf, ZooKeeper zk, ClientSocketChannelFa
private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(
DNSToSwitchMapping dnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
StatsLogger statsLogger)
throws IOException {
try {
Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
return ReflectionUtils.newInstance(policyCls).initialize(conf, Optional.fromNullable(dnsResolver), timer, statsLogger, alertStatsLogger);
return ReflectionUtils.newInstance(policyCls).initialize(conf, Optional.fromNullable(dnsResolver), timer,
featureProvider, statsLogger, alertStatsLogger);
} catch (ConfigurationException e) {
throw new IOException("Failed to initialize ensemble placement policy : ", e);
}
Expand Down
Expand Up @@ -30,6 +30,7 @@

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -112,6 +113,7 @@ public List<Integer> reorderReadLACSequence(ArrayList<InetSocketAddress> ensembl
public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger) {
return this;
Expand Down
Expand Up @@ -28,6 +28,7 @@

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand All @@ -45,12 +46,14 @@ public interface EnsemblePlacementPolicy {
* @param conf client configuration
* @param optionalDnsResolver dns resolver
* @param hashedWheelTimer timer
* @param featureProvider feature provider
* @param statsLogger stats logger
* @param alertStatsLogger stats logger for alerts
*/
public EnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
HashedWheelTimer hashedWheelTimer,
FeatureProvider featureProvider,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger);

Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.Configurable;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
Expand Down Expand Up @@ -158,6 +159,7 @@ protected RackawareEnsemblePlacementPolicyImpl initialize(DNSToSwitchMapping dns
public RackawareEnsemblePlacementPolicyImpl initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger) {
DNSToSwitchMapping dnsResolver;
Expand Down
Expand Up @@ -34,7 +34,7 @@

import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FixedValueFeature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
Expand All @@ -46,6 +46,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.apache.bookkeeper.util.BookKeeperConstants.*;

public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlacementPolicy {
static final Logger LOG = LoggerFactory.getLogger(RegionAwareEnsemblePlacementPolicy.class);

Expand Down Expand Up @@ -151,9 +153,10 @@ public void handleBookiesThatJoined(Set<InetSocketAddress> joinedBookies) {
public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
Optional<DNSToSwitchMapping> optionalDnsResolver,
HashedWheelTimer timer,
FeatureProvider featureProvider,
StatsLogger statsLogger,
AlertStatsLogger alertStatsLogger) {
super.initialize(conf, optionalDnsResolver, timer, statsLogger, alertStatsLogger);
super.initialize(conf, optionalDnsResolver, timer, featureProvider, statsLogger, alertStatsLogger);
myRegion = getLocalRegion(localNode);
enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true);

Expand All @@ -178,7 +181,11 @@ public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf,
throw new IllegalArgumentException("Regions provided are insufficient to meet the durability constraints");
}
}
disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, new FixedValueFeature(false));
disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, null);
if (null == disableDurabilityFeature) {
disableDurabilityFeature =
featureProvider.getFeature(FEATURE_REPP_DISABLE_DURABILITY_ENFORCEMENT);
}
return this;
}

Expand Down
Expand Up @@ -237,10 +237,12 @@ public int getAsyncProcessLedgersConcurrency() {
return getInt(ASYNC_PROCESS_LEDGERS_CONCURRENCY, 1);
}

@Deprecated
public void setFeature(String configProperty, Feature feature) {
setProperty(configProperty, feature);
}

@Deprecated
public Feature getFeature(String configProperty, Feature defaultValue) {
if (null == getProperty(configProperty)) {
return defaultValue;
Expand Down
@@ -0,0 +1,64 @@
package org.apache.bookkeeper.feature;

import org.apache.commons.lang.StringUtils;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

/**
* Cacheable Feature Provider
*/
public abstract class CacheableFeatureProvider<T extends Feature> implements FeatureProvider {

protected final String scope;
protected final ConcurrentMap<String, FeatureProvider> scopes =
new ConcurrentHashMap<String, FeatureProvider>();
protected final ConcurrentMap<String, T> features =
new ConcurrentHashMap<String, T>();

protected CacheableFeatureProvider(String scope) {
this.scope = scope;
}

protected String makeName(String name) {
if (StringUtils.isBlank(scope)) {
return name;
} else {
return scope + "." + name;
}
}

@Override
public T getFeature(String name) {
T feature = features.get(name);
if (null == feature) {
T newFeature = makeFeature(makeName(name));
T oldFeature = features.putIfAbsent(name, newFeature);
if (null == oldFeature) {
feature = newFeature;
} else {
feature = oldFeature;
}
}
return feature;
}

protected abstract T makeFeature(String featureName);

@Override
public FeatureProvider scope(String name) {
FeatureProvider provider = scopes.get(name);
if (null == provider) {
FeatureProvider newProvider = makeProvider(makeName(name));
FeatureProvider oldProvider = scopes.putIfAbsent(name, newProvider);
if (null == oldProvider) {
provider = newProvider;
} else {
provider = oldProvider;
}
}
return provider;
}

protected abstract FeatureProvider makeProvider(String fullScopeName);
}
Expand Up @@ -11,4 +11,13 @@ public interface FeatureProvider {
* @return feature instance
*/
Feature getFeature(String name);

/**
* Provide the feature provider under scope <i>name</i>.
*
* @param name
* scope name.
* @return feature provider under scope <i>name</i>
*/
FeatureProvider scope(String name);
}
@@ -1,14 +1,17 @@
package org.apache.bookkeeper.feature;

public class FixedValueFeature implements Feature {
protected final String name;
protected int availability;

public FixedValueFeature(int availability) {
public FixedValueFeature(String name, int availability) {
this.name = name;
this.availability = availability;
}

public FixedValueFeature(boolean isAvailabile) {
this.availability = isAvailabile ? FEATURE_AVAILABILITY_MAX_VALUE : 0;
public FixedValueFeature(String name, boolean available) {
this.name = name;
this.availability = available ? FEATURE_AVAILABILITY_MAX_VALUE : 0;
}

@Override
Expand Down
@@ -1,12 +1,12 @@
package org.apache.bookkeeper.feature;

public class SettableFeature extends FixedValueFeature {
public SettableFeature(int initialAvailability) {
super(initialAvailability);
public SettableFeature(String name, int initialAvailability) {
super(name, initialAvailability);
}

public SettableFeature(boolean isAvailabile) {
super(isAvailabile);
public SettableFeature(String name, boolean isAvailabile) {
super(name, isAvailabile);
}

public void set(int availability) {
Expand Down

0 comments on commit c2a092a

Please sign in to comment.