Skip to content
This repository has been archived by the owner on Sep 26, 2019. It is now read-only.

Commit

Permalink
BookKeeper Client: Allow using a decider to dynamically disable durab…
Browse files Browse the repository at this point in the history
…ility enforcement in region aware placement policy

- Introduce Features that are dynamic configuration options
- Allow specifying Features as configuration parameters
- Use a configuration parameter so that the client can specify the Feature to be used to determine if the region aware ensemble placement policy should enforce durability
- Make each call to newEnsemble or replaceBookie independently enforce durability based on the dynamic feature regardless of how the region aware ensemble placement policy was configured

RB_ID=588200
  • Loading branch information
dhamanka committed Feb 24, 2015
1 parent 8fe74ba commit f9762d1
Show file tree
Hide file tree
Showing 11 changed files with 182 additions and 34 deletions.
Expand Up @@ -29,9 +29,9 @@
import com.google.common.base.Optional;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.Configuration;

/**
* Default Ensemble Placement Policy, which picks bookies randomly
Expand Down Expand Up @@ -113,7 +113,7 @@ public List<Integer> reorderReadLACSequence(ArrayList<InetSocketAddress> ensembl
* @param statsLogger
*/
@Override
public EnsemblePlacementPolicy initialize(Configuration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
public EnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
return this;
}

Expand Down
Expand Up @@ -27,6 +27,8 @@
import com.google.common.base.Optional;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.AbstractConfiguration;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.Configuration;
Expand All @@ -43,7 +45,7 @@ public interface EnsemblePlacementPolicy {
*
* @param statsLogger
*/
public EnsemblePlacementPolicy initialize(Configuration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger);
public EnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger);

/**
* Uninitialize the policy
Expand Down
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.Configurable;
import org.apache.bookkeeper.net.CachedDNSToSwitchMapping;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
Expand Down Expand Up @@ -133,7 +134,7 @@ protected RackawareEnsemblePlacementPolicy initialize(DNSToSwitchMapping dnsReso
}

@Override
public RackawareEnsemblePlacementPolicy initialize(Configuration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
public RackawareEnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
DNSToSwitchMapping dnsResolver;
if (optionalDnsResolver.isPresent()) {
dnsResolver = optionalDnsResolver.get();
Expand Down
Expand Up @@ -32,6 +32,9 @@
import com.google.common.base.Optional;


import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FixedValueFeature;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.net.Node;
Expand All @@ -48,6 +51,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
public static final String REPP_REGIONS_TO_WRITE = "reppRegionsToWrite";
public static final String REPP_MINIMUM_REGIONS_FOR_DURABILITY = "reppMinimumRegionsForDurability";
public static final String REPP_ENABLE_DURABILITY_ENFORCEMENT_IN_REPLACE = "reppEnableDurabilityEnforcementInReplace";
public static final String REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE = "reppDisableDurabilityEnforcementFeature";
public static final String REPP_ENABLE_VALIDATION = "reppEnableValidation";
public static final String REGION_AWARE_ANOMALOUS_ENSEMBLE = "region_aware_anomalous_ensemble";
static final int MINIMUM_REGIONS_FOR_DURABILITY_DEFAULT = 2;
Expand All @@ -61,6 +65,7 @@ public class RegionAwareEnsemblePlacementPolicy extends RackawareEnsemblePlaceme
protected int minRegionsForDurability = 0;
protected boolean enableValidation = true;
protected boolean enforceDurabilityInReplace = false;
protected Feature disableDurabilityFeature;

RegionAwareEnsemblePlacementPolicy() {
super();
Expand Down Expand Up @@ -141,7 +146,7 @@ protected void handleBookiesThatJoined(Set<InetSocketAddress> joinedBookies) {
}

@Override
public RegionAwareEnsemblePlacementPolicy initialize(Configuration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
public RegionAwareEnsemblePlacementPolicy initialize(ClientConfiguration conf, Optional<DNSToSwitchMapping> optionalDnsResolver, StatsLogger statsLogger) {
super.initialize(conf, optionalDnsResolver, statsLogger);
myRegion = getLocalRegion(localNode);
enableValidation = conf.getBoolean(REPP_ENABLE_VALIDATION, true);
Expand All @@ -166,27 +171,30 @@ public RegionAwareEnsemblePlacementPolicy initialize(Configuration conf, Optiona
throw new IllegalArgumentException("Regions provided are insufficient to meet the durability constraints");
}
}

disableDurabilityFeature = conf.getFeature(REPP_DISABLE_DURABILITY_ENFORCEMENT_FEATURE, new FixedValueFeature(false));
return this;
}

@Override
public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuorumSize, int ackQuorumSize,
Set<InetSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {

int effectiveMinRegionsForDurability = disableDurabilityFeature.isAvailable() ? 1 : minRegionsForDurability;

// All of these conditions indicate bad configuration
if (ackQuorumSize < minRegionsForDurability) {
if (ackQuorumSize < effectiveMinRegionsForDurability) {
throw new IllegalArgumentException("Ack Quorum size provided are insufficient to meet the durability constraints");
} else if (ensembleSize < writeQuorumSize) {
throw new IllegalArgumentException("write quorum (" + writeQuorumSize + ") cannot exceed ensemble size (" + ensembleSize + ")");
} else if (writeQuorumSize < ackQuorumSize) {
throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") cannot exceed write quorum size (" + writeQuorumSize + ")");
} else if (minRegionsForDurability > 0) {
// We must survive the failure of numRegions - minRegionsForDurability. When these
} else if (effectiveMinRegionsForDurability > 0) {
// We must survive the failure of numRegions - effectiveMinRegionsForDurability. When these
// regions have failed we would spread the replicas over the remaining
// minRegionsForDurability regions; we have to make sure that the ack quorum is large
// effectiveMinRegionsForDurability regions; we have to make sure that the ack quorum is large
// enough such that there is a configuration for spreading the replicas across
// minRegionsForDurability - 1 regions
if (ackQuorumSize <= (writeQuorumSize - (writeQuorumSize / minRegionsForDurability))) {
// effectiveMinRegionsForDurability - 1 regions
if (ackQuorumSize <= (writeQuorumSize - (writeQuorumSize / effectiveMinRegionsForDurability))) {
throw new IllegalArgumentException("ack quorum (" + ackQuorumSize + ") " +
"violates the requirement to satisfy durability constraints when running in degraded mode");
}
Expand All @@ -213,8 +221,8 @@ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuoru
writeQuorumSize,
ackQuorumSize,
REGIONID_DISTANCE_FROM_LEAVES,
minRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
minRegionsForDurability);
effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
effectiveMinRegionsForDurability);
return perRegionPlacement.values().iterator().next().newEnsembleInternal(ensembleSize, writeQuorumSize, excludeBookies, ensemble);
}

Expand All @@ -241,8 +249,8 @@ public ArrayList<InetSocketAddress> newEnsemble(int ensembleSize, int writeQuoru
writeQuorumSize,
ackQuorumSize,
REGIONID_DISTANCE_FROM_LEAVES,
minRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
minRegionsForDurability);
effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
effectiveMinRegionsForDurability);
remainingEnsembleBeforeIteration = remainingEnsemble;
for (String region: regionsWiseAllocation.keySet()) {
final Pair<Integer, Integer> currentAllocation = regionsWiseAllocation.get(region);
Expand Down Expand Up @@ -331,14 +339,15 @@ public InetSocketAddress replaceBookie(int ensembleSize, int writeQuorumSize, in
Set<InetSocketAddress> excludeBookies) throws BKException.BKNotEnoughBookiesException {
rwLock.readLock().lock();
try {
boolean enforceDurability = enforceDurabilityInReplace;
boolean enforceDurability = enforceDurabilityInReplace && !disableDurabilityFeature.isAvailable();
int effectiveMinRegionsForDurability = enforceDurability ? minRegionsForDurability : 1;
Set<Node> excludeNodes = convertBookiesToNodes(excludeBookies);
RRTopologyAwareCoverageEnsemble ensemble = new RRTopologyAwareCoverageEnsemble(ensembleSize,
writeQuorumSize,
ackQuorumSize,
REGIONID_DISTANCE_FROM_LEAVES,
minRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
minRegionsForDurability);
effectiveMinRegionsForDurability > 0 ? new HashSet<String>(perRegionPlacement.keySet()) : null,
effectiveMinRegionsForDurability);

BookieNode bookieNodeToReplace = knownBookies.get(bookieToReplace);
if (null == bookieNodeToReplace) {
Expand Down
Expand Up @@ -19,6 +19,7 @@

import java.net.URL;

import org.apache.bookkeeper.feature.Feature;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
Expand Down Expand Up @@ -235,4 +236,16 @@ public void setAsyncProcessLedgersConcurrency(int concurrency) {
public int getAsyncProcessLedgersConcurrency() {
return getInt(ASYNC_PROCESS_LEDGERS_CONCURRENCY, 1);
}

public void setFeature(String configProperty, Feature feature) {
setProperty(configProperty, feature);
}

public Feature getFeature(String configProperty, Feature defaultValue) {
if (null == getProperty(configProperty)) {
return defaultValue;
} else {
return (Feature)getProperty(configProperty);
}
}
}
@@ -0,0 +1,30 @@
package org.apache.bookkeeper.feature;

/**
* This interface represents a feature.
*/
public interface Feature {
public static int FEATURE_AVAILABILITY_MAX_VALUE = 100;

/**
* Returns a textual representation of the feature.
*
* @return name of the feature.
*/
String name();

/**
* Returns the availability of this feature, an integer between 0 and 100.
*
* @return the availability of this feature.
*/
int availability();

/**
* Whether this feature is available or not.
*
* @return true if this feature is available, otherwise false.
*/
boolean isAvailable();
}

@@ -0,0 +1,14 @@
package org.apache.bookkeeper.feature;

/**
* Provider to provide features.
*/
public interface FeatureProvider {
/**
* Return the feature with given name.
*
* @param name feature name
* @return feature instance
*/
Feature getFeature(String name);
}
@@ -0,0 +1,28 @@
package org.apache.bookkeeper.feature;

public class FixedValueFeature implements Feature {
protected int availability;

public FixedValueFeature(int availability) {
this.availability = availability;
}

public FixedValueFeature(boolean isAvailabile) {
this.availability = isAvailabile ? FEATURE_AVAILABILITY_MAX_VALUE : 0;
}

@Override
public String name() {
return null;
}

@Override
public int availability() {
return availability;
}

@Override
public boolean isAvailable() {
return availability() > 0;
}
}
@@ -0,0 +1,20 @@
package org.apache.bookkeeper.feature;

public class SettableFeature extends FixedValueFeature {
public SettableFeature(int initialAvailability) {
super(initialAvailability);
}

public SettableFeature(boolean isAvailabile) {
super(isAvailabile);
}

public void set(int availability) {
this.availability = availability;
}

public void set(boolean isAvailabile) {
this.availability = isAvailabile ? FEATURE_AVAILABILITY_MAX_VALUE : 0;
}

}
Expand Up @@ -32,11 +32,10 @@
import junit.framework.TestCase;

import org.apache.bookkeeper.client.BKException.BKNotEnoughBookiesException;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.bookkeeper.net.NetworkTopology;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -48,7 +47,7 @@ public class TestRackawareEnsemblePlacementPolicy extends TestCase {
RackawareEnsemblePlacementPolicy repp;
final ArrayList<InetSocketAddress> ensemble = new ArrayList<InetSocketAddress>();
final List<Integer> writeSet = new ArrayList<Integer>();
Configuration conf = new CompositeConfiguration();
ClientConfiguration conf = new ClientConfiguration();
InetSocketAddress addr1, addr2, addr3, addr4;

@Override
Expand Down

0 comments on commit f9762d1

Please sign in to comment.