Skip to content
Browse files

Merge branch 'scanpermit-bug'

  • Loading branch information...
2 parents 056d93c + d59d391 commit dc19e82b62cbb15ae91ad8575702a12f93a32d5c Chinmay Soman committed May 14, 2012
View
10 src/java/voldemort/server/VoldemortConfig.java
@@ -119,6 +119,7 @@
private int clientMaxQueuedRequests;
private int schedulerThreads;
+ private boolean mayInterruptService;
private int numScanPermits;
@@ -307,6 +308,7 @@ public VoldemortConfig(Props props) {
this.slopZonesDownToTerminate = props.getInt("slop.zones.terminate", 0);
this.schedulerThreads = props.getInt("scheduler.threads", 6);
+ this.mayInterruptService = props.getBoolean("service.interruptible", true);
this.numScanPermits = props.getInt("num.scan.permits", 1);
@@ -1133,6 +1135,14 @@ public void setSchedulerThreads(int schedulerThreads) {
this.schedulerThreads = schedulerThreads;
}
+ public boolean canInterruptService() {
+ return mayInterruptService;
+ }
+
+ public void setInterruptible(boolean canInterrupt) {
+ this.mayInterruptService = canInterrupt;
+ }
+
public String getReadOnlyDataStorageDirectory() {
return this.readOnlyStorageDir;
}
View
3 src/java/voldemort/server/VoldemortServer.java
@@ -147,7 +147,8 @@ private void checkHostName() {
/* Services are given in the order they must be started */
List<VoldemortService> services = new ArrayList<VoldemortService>();
SchedulerService scheduler = new SchedulerService(voldemortConfig.getSchedulerThreads(),
- SystemTime.INSTANCE);
+ SystemTime.INSTANCE,
+ voldemortConfig.canInterruptService());
StorageService storageService = new StorageService(storeRepository,
metadata,
scheduler,
View
7 src/java/voldemort/server/scheduler/DataCleanupJob.java
@@ -16,10 +16,9 @@
package voldemort.server.scheduler;
-import java.util.concurrent.Semaphore;
-
import org.apache.log4j.Logger;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.utils.ClosableIterator;
import voldemort.utils.EventThrottler;
@@ -39,13 +38,13 @@
private static final Logger logger = Logger.getLogger(DataCleanupJob.class);
private final StorageEngine<K, V, T> store;
- private final Semaphore cleanupPermits;
+ private final ScanPermitWrapper cleanupPermits;
private final long maxAgeMs;
private final Time time;
private final EventThrottler throttler;
public DataCleanupJob(StorageEngine<K, V, T> store,
- Semaphore cleanupPermits,
+ ScanPermitWrapper cleanupPermits,
long maxAgeMs,
Time time,
EventThrottler throttler) {
View
18 src/java/voldemort/server/scheduler/SchedulerService.java
@@ -48,6 +48,7 @@
public class SchedulerService extends AbstractService {
private static final Logger logger = Logger.getLogger(VoldemortService.class);
+ private boolean mayInterrupt;
private class ScheduledRunnable {
@@ -85,11 +86,16 @@ long getIntervalMs() {
private final ConcurrentHashMap<String, ScheduledRunnable> allJobs;
public SchedulerService(int schedulerThreads, Time time) {
+ this(schedulerThreads, time, true);
+ }
+
+ public SchedulerService(int schedulerThreads, Time time, boolean mayInterrupt) {
super(ServiceType.SCHEDULER);
this.time = time;
this.scheduler = new SchedulerThreadPool(schedulerThreads);
this.scheduledJobResults = new ConcurrentHashMap<String, ScheduledFuture>();
this.allJobs = new ConcurrentHashMap<String, ScheduledRunnable>();
+ this.mayInterrupt = mayInterrupt;
}
@Override
@@ -112,6 +118,18 @@ public void disable(String id) {
}
}
+ @JmxOperation(description = "Terminate a particular scheduled job", impact = MBeanOperationInfo.ACTION)
+ public void terminate(String id) {
+ if(allJobs.containsKey(id) && scheduledJobResults.containsKey(id)) {
+ ScheduledFuture<?> future = scheduledJobResults.get(id);
+ boolean cancelled = future.cancel(this.mayInterrupt);
+ if(cancelled == true) {
+ logger.info("Removed '" + id + "' from list of scheduled jobs");
+ scheduledJobResults.remove(id);
+ }
+ }
+ }
+
@JmxOperation(description = "Enable a particular scheduled job", impact = MBeanOperationInfo.ACTION)
public void enable(String id) {
if(allJobs.containsKey(id) && !scheduledJobResults.containsKey(id)) {
View
8 src/java/voldemort/server/scheduler/slop/BlockingSlopPusherJob.java
@@ -18,7 +18,6 @@
import java.util.Date;
import java.util.Map;
-import java.util.concurrent.Semaphore;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
@@ -28,13 +27,14 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.store.Store;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
-import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.slop.Slop.Operation;
+import voldemort.store.slop.SlopStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.EventThrottler;
@@ -62,13 +62,13 @@
private final MetadataStore metadataStore;
private final FailureDetector failureDetector;
private final long maxWriteBytesPerSec;
- private final Semaphore repairPermits;
+ private final ScanPermitWrapper repairPermits;
public BlockingSlopPusherJob(StoreRepository storeRepo,
MetadataStore metadataStore,
FailureDetector failureDetector,
VoldemortConfig voldemortConfig,
- Semaphore repairPermits) {
+ ScanPermitWrapper repairPermits) {
this.storeRepo = storeRepo;
this.metadataStore = metadataStore;
this.repairPermits = Utils.notNull(repairPermits);
View
6 src/java/voldemort/server/scheduler/slop/StreamingSlopPusherJob.java
@@ -10,7 +10,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -27,6 +26,7 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.store.UnreachableStoreException;
import voldemort.store.metadata.MetadataStore;
@@ -68,13 +68,13 @@
private final Map<Integer, Set<Integer>> zoneMapping;
private ConcurrentHashMap<Integer, Long> attemptedByNode;
private ConcurrentHashMap<Integer, Long> succeededByNode;
- private final Semaphore repairPermits;
+ private final ScanPermitWrapper repairPermits;
public StreamingSlopPusherJob(StoreRepository storeRepo,
MetadataStore metadataStore,
FailureDetector failureDetector,
VoldemortConfig voldemortConfig,
- Semaphore repairPermits) {
+ ScanPermitWrapper repairPermits) {
this.storeRepo = storeRepo;
this.metadataStore = metadataStore;
this.failureDetector = failureDetector;
View
9 src/java/voldemort/server/storage/RepairJob.java
@@ -4,7 +4,6 @@
import java.util.Date;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Semaphore;
import javax.management.MBeanOperationInfo;
@@ -33,22 +32,24 @@
public final static List<String> blackList = Arrays.asList("mysql", "krati", "read-only");
- private final Semaphore repairPermits;
+ private final ScanPermitWrapper repairPermits;
private final StoreRepository storeRepo;
private final MetadataStore metadataStore;
private final int deleteBatchSize;
public RepairJob(StoreRepository storeRepo,
MetadataStore metadataStore,
- Semaphore repairPermits,
+ ScanPermitWrapper repairPermits,
int deleteBatchSize) {
this.storeRepo = storeRepo;
this.metadataStore = metadataStore;
this.repairPermits = Utils.notNull(repairPermits);
this.deleteBatchSize = deleteBatchSize;
}
- public RepairJob(StoreRepository storeRepo, MetadataStore metadataStore, Semaphore repairPermits) {
+ public RepairJob(StoreRepository storeRepo,
+ MetadataStore metadataStore,
+ ScanPermitWrapper repairPermits) {
this(storeRepo, metadataStore, repairPermits, DELETE_BATCH_SIZE);
}
View
56 src/java/voldemort/server/storage/ScanPermitWrapper.java
@@ -0,0 +1,56 @@
+package voldemort.server.storage;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.Semaphore;
+
+public class ScanPermitWrapper {
+
+ private final Semaphore scanPermits;
+ private List<String> permitOwners;
+
+ public ScanPermitWrapper(final int numPermits) {
+ scanPermits = new Semaphore(numPermits);
+ permitOwners = Collections.synchronizedList(new ArrayList<String>());
+ }
+
+ public void acquire() throws InterruptedException {
+ this.scanPermits.acquire();
+ synchronized(permitOwners) {
+ permitOwners.add(Thread.currentThread().getStackTrace()[2].getClassName());
+ }
+ }
+
+ public void release() {
+ this.scanPermits.release();
+ synchronized(permitOwners) {
+ permitOwners.remove(Thread.currentThread().getStackTrace()[2].getClassName());
+ }
+ }
+
+ public List<String> getPermitOwners() {
+ List<String> ownerList = new ArrayList<String>();
+ synchronized(permitOwners) {
+ Iterator<String> i = this.permitOwners.iterator();
+ while(i.hasNext())
+ ownerList.add(i.next());
+ }
+ return ownerList;
+ }
+
+ public boolean tryAcquire() {
+ boolean gotPermit = this.scanPermits.tryAcquire();
+ if(gotPermit) {
+ synchronized(permitOwners) {
+ permitOwners.add(Thread.currentThread().getStackTrace()[2].getClassName());
+ }
+ }
+ return gotPermit;
+ }
+
+ public int availablePermits() {
+ return this.scanPermits.availablePermits();
+ }
+}
View
22 src/java/voldemort/server/storage/StorageService.java
@@ -30,7 +30,6 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import javax.management.MBeanOperationInfo;
@@ -40,6 +39,7 @@
import org.apache.log4j.Logger;
import voldemort.VoldemortException;
+import voldemort.annotations.jmx.JmxGetter;
import voldemort.annotations.jmx.JmxManaged;
import voldemort.annotations.jmx.JmxOperation;
import voldemort.client.ClientThreadPool;
@@ -117,7 +117,7 @@
private final DynamicThrottleLimit dynThrottleLimit;
// Common permit shared by all job which do a disk scan
- private final Semaphore scanPermits;
+ private final ScanPermitWrapper scanPermitWrapper;
private final SocketStoreFactory storeFactory;
private final ConcurrentMap<String, StorageConfiguration> storageConfigs;
private final ClientThreadPool clientThreadPool;
@@ -134,7 +134,7 @@ public StorageService(StoreRepository storeRepository,
this.scheduler = scheduler;
this.storeRepository = storeRepository;
this.metadata = metadata;
- this.scanPermits = new Semaphore(voldemortConfig.getNumScanPermits());
+ this.scanPermitWrapper = new ScanPermitWrapper(voldemortConfig.getNumScanPermits());
this.storageConfigs = new ConcurrentHashMap<String, StorageConfiguration>();
this.clientThreadPool = new ClientThreadPool(config.getClientMaxThreads(),
config.getClientThreadIdleMs(),
@@ -234,20 +234,20 @@ protected void startInner() {
metadata,
failureDetector,
voldemortConfig,
- scanPermits)
+ scanPermitWrapper)
: new StreamingSlopPusherJob(storeRepository,
metadata,
failureDetector,
voldemortConfig,
- scanPermits),
+ scanPermitWrapper),
nextRun,
voldemortConfig.getSlopFrequencyMs());
}
// Create a repair job object and register it with Store repository
if(voldemortConfig.isRepairEnabled()) {
logger.info("Initializing repair job.");
- RepairJob job = new RepairJob(storeRepository, metadata, scanPermits);
+ RepairJob job = new RepairJob(storeRepository, metadata, scanPermitWrapper);
JmxUtils.registerMbean(job, JmxUtils.createObjectName(job.getClass()));
storeRepository.registerRepairJob(job);
}
@@ -586,7 +586,7 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
EventThrottler throttler = new EventThrottler(maxReadRate);
Runnable cleanupJob = new DataCleanupJob<ByteArray, byte[], byte[]>(engine,
- scanPermits,
+ scanPermitWrapper,
storeDef.getRetentionDays()
* Time.MS_PER_DAY,
SystemTime.INSTANCE,
@@ -713,10 +713,10 @@ public void forceCleanupOldDataThrottled(String storeName, int entryScanThrottle
if(storeDef.hasRetentionPeriod()) {
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
- if(scanPermits.availablePermits() >= 1) {
+ if(scanPermitWrapper.availablePermits() >= 1) {
executor.execute(new DataCleanupJob<ByteArray, byte[], byte[]>(engine,
- scanPermits,
+ scanPermitWrapper,
storeDef.getRetentionDays()
* Time.MS_PER_DAY,
SystemTime.INSTANCE,
@@ -815,4 +815,8 @@ public DynamicThrottleLimit getDynThrottleLimit() {
return dynThrottleLimit;
}
+ @JmxGetter(name = "getScanPermitOwners", description = "Returns class names of services holding the scan permit")
+ public List<String> getPermitOwners() {
+ return this.scanPermitWrapper.getPermitOwners();
+ }
}
View
6 test/unit/voldemort/scheduled/BlockingSlopPusherTest.java
@@ -21,7 +21,6 @@
import java.util.Arrays;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.Semaphore;
import junit.framework.TestCase;
import voldemort.ServerTestUtils;
@@ -32,12 +31,13 @@
import voldemort.server.StoreRepository;
import voldemort.server.VoldemortConfig;
import voldemort.server.scheduler.slop.BlockingSlopPusherJob;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.FailingStore;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
-import voldemort.store.slop.SlopStorageEngine;
import voldemort.store.slop.Slop.Operation;
+import voldemort.store.slop.SlopStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.Props;
import voldemort.versioning.Versioned;
@@ -70,7 +70,7 @@ protected void setUp() throws Exception {
metadataStore,
new NoopFailureDetector(),
new VoldemortConfig(props),
- new Semaphore(1));
+ new ScanPermitWrapper(1));
}
private Cluster makeCluster(int numNodes) {
View
4 test/unit/voldemort/scheduled/DataCleanupJobTest.java
@@ -17,11 +17,11 @@
package voldemort.scheduled;
import java.util.List;
-import java.util.concurrent.Semaphore;
import junit.framework.TestCase;
import voldemort.MockTime;
import voldemort.server.scheduler.DataCleanupJob;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.utils.EventThrottler;
@@ -52,7 +52,7 @@ public void testCleanupCleansUp() {
// now run cleanup
new DataCleanupJob<String, String, String>(engine,
- new Semaphore(1),
+ new ScanPermitWrapper(1),
Time.MS_PER_DAY,
time,
new EventThrottler(1)).run();
View
56 test/unit/voldemort/scheduled/StreamingSlopPusherTest.java
@@ -10,7 +10,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.Semaphore;
import org.junit.After;
import org.junit.Before;
@@ -26,6 +25,7 @@
import voldemort.server.VoldemortConfig;
import voldemort.server.VoldemortServer;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.StorageEngine;
import voldemort.store.metadata.MetadataStore;
import voldemort.store.slop.Slop;
@@ -141,7 +141,7 @@ public void testFailedServer() throws IOException, InterruptedException {
metadataStore,
configs[0]))),
configs[0],
- new Semaphore(1));
+ new ScanPermitWrapper(1));
pusher.run();
@@ -156,8 +156,9 @@ public void testFailedServer() throws IOException, InterruptedException {
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(2).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
@@ -274,7 +275,7 @@ public void testOutOfOrder() throws InterruptedException, IOException {
metadataStore,
configs[0]))),
configs[0],
- new Semaphore(1));
+ new ScanPermitWrapper(1));
pusher.run();
@@ -325,7 +326,7 @@ public void testNormalPush() throws InterruptedException, IOException {
metadataStore,
configs[0]))),
configs[0],
- new Semaphore(1));
+ new ScanPermitWrapper(1));
pusher.run();
@@ -340,8 +341,9 @@ public void testNormalPush() throws InterruptedException, IOException {
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(1).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
@@ -440,14 +442,14 @@ public void testNormalPushBothWays() throws InterruptedException, IOException {
metadataStore,
configs[0]))),
configs[0],
- new Semaphore(1)), pusher1 = new StreamingSlopPusherJob(getVoldemortServer(1).getStoreRepository(),
- getVoldemortServer(1).getMetadataStore(),
- new BannagePeriodFailureDetector(new FailureDetectorConfig().setNodes(cluster.getNodes())
- .setStoreVerifier(new ServerStoreVerifier(socketStoreFactory,
- metadataStore,
- configs[1]))),
- configs[1],
- new Semaphore(1));
+ new ScanPermitWrapper(1)), pusher1 = new StreamingSlopPusherJob(getVoldemortServer(1).getStoreRepository(),
+ getVoldemortServer(1).getMetadataStore(),
+ new BannagePeriodFailureDetector(new FailureDetectorConfig().setNodes(cluster.getNodes())
+ .setStoreVerifier(new ServerStoreVerifier(socketStoreFactory,
+ metadataStore,
+ configs[1]))),
+ configs[1],
+ new ScanPermitWrapper(1));
pusher0.run();
pusher1.run();
@@ -463,8 +465,9 @@ public void testNormalPushBothWays() throws InterruptedException, IOException {
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(1).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
@@ -484,8 +487,9 @@ public void testNormalPushBothWays() throws InterruptedException, IOException {
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(0).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
@@ -547,7 +551,7 @@ public void testServerReplacementWithoutBounce() throws IOException, Interrupted
metadataStore,
configs[0]))),
configs[0],
- new Semaphore(1));
+ new ScanPermitWrapper(1));
pusher.run();
@@ -562,8 +566,9 @@ public void testServerReplacementWithoutBounce() throws IOException, Interrupted
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(2).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
@@ -619,8 +624,9 @@ public void testServerReplacementWithoutBounce() throws IOException, Interrupted
StorageEngine<ByteArray, byte[], byte[]> store = getVoldemortServer(1).getStoreRepository()
.getStorageEngine(nextSlop.getStoreName());
if(nextSlop.getOperation().equals(Slop.Operation.PUT)) {
- assertNotSame("entry should be present at store", 0, store.get(nextSlop.getKey(),
- null).size());
+ assertNotSame("entry should be present at store",
+ 0,
+ store.get(nextSlop.getKey(), null).size());
assertEquals("entry value should match",
new String(nextSlop.getValue()),
new String(store.get(nextSlop.getKey(), null).get(0).getValue()));
View
6 test/unit/voldemort/store/routed/HintedHandoffTest.java
@@ -17,7 +17,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
import org.junit.After;
@@ -27,7 +26,6 @@
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.VoldemortException;
@@ -39,12 +37,14 @@
import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
import voldemort.cluster.failuredetector.FailureDetectorUtils;
+import voldemort.cluster.failuredetector.MutableStoreVerifier;
import voldemort.routing.RoutingStrategy;
import voldemort.routing.RoutingStrategyFactory;
import voldemort.routing.RoutingStrategyType;
import voldemort.serialization.SerializerDefinition;
import voldemort.server.StoreRepository;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
+import voldemort.server.storage.ScanPermitWrapper;
import voldemort.store.ForceFailStore;
import voldemort.store.StorageEngine;
import voldemort.store.Store;
@@ -200,7 +200,7 @@ public void setUp() throws Exception {
cluster,
Lists.newArrayList(storeDef),
new Properties()),
- new Semaphore(1));
+ new ScanPermitWrapper(1));
slopPusherJobs.add(pusher);
}

0 comments on commit dc19e82

Please sign in to comment.
Something went wrong with that request. Please try again.