Skip to content

Commit

Permalink
Implementing partition scans
Browse files Browse the repository at this point in the history
  • Loading branch information
vinothchandar committed Oct 19, 2012
1 parent 07e509d commit 4c1064e
Show file tree
Hide file tree
Showing 53 changed files with 1,889 additions and 302 deletions.
Expand Up @@ -52,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
final Store<ByteArray, byte[], byte[]> store = new BdbStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName), TestUtils.makeSingleNodeRoutingStrategy());

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -52,7 +52,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
String storeName = args[1];
String jsonDataFile = args[2];

final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName));
final Store<ByteArray, byte[], byte[]> store = new MysqlStorageConfiguration(new VoldemortConfig(new Props(new File(serverPropsFile)))).getStore(TestUtils.makeStoreDefinition(storeName), TestUtils.makeSingleNodeRoutingStrategy());

final AtomicInteger obsoletes = new AtomicInteger(0);

Expand Down
Expand Up @@ -8,6 +8,7 @@
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.routing.RoutingStrategy;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
Expand Down Expand Up @@ -45,7 +46,8 @@ public KratiStorageConfiguration(VoldemortConfig config) {

public void close() {}

public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef) {
public StorageEngine<ByteArray, byte[], byte[]> getStore(StoreDefinition storeDef,
RoutingStrategy strategy) {
synchronized(lock) {
File storeDir = new File(dataDirectory, storeDef.getName());
if(!storeDir.exists()) {
Expand Down
Expand Up @@ -147,6 +147,14 @@ public ClosableIterator<ByteArray> keys() {
return StoreUtils.keys(entries());
}

public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries(int partition) {
throw new UnsupportedOperationException("Partition based entries scan not supported for this storage type");
}

public ClosableIterator<ByteArray> keys(int partition) {
throw new UnsupportedOperationException("Partition based key scan not supported for this storage type");
}

public boolean delete(ByteArray key, Version maxVersion) throws VoldemortException {
StoreUtils.assertValidKey(key);

Expand Down Expand Up @@ -312,4 +320,8 @@ public void remove() {
public boolean isPartitionAware() {
return false;
}

public boolean isPartitionScanSupported() {
return false;
}
}
119 changes: 71 additions & 48 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -90,6 +90,7 @@ public class VoldemortConfig implements Serializable {
private boolean bdbCleanerLazyMigration;
private boolean bdbCacheModeEvictLN;
private boolean bdbMinimizeScanImpact;
private boolean bdbPrefixKeysWithPartitionId;

private String mysqlUsername;
private String mysqlPassword;
Expand Down Expand Up @@ -239,6 +240,8 @@ public VoldemortConfig(Props props) {
this.bdbCleanerLazyMigration = props.getBoolean("bdb.cleaner.lazy.migration", true);
this.bdbCacheModeEvictLN = props.getBoolean("bdb.cache.evictln", false);
this.bdbMinimizeScanImpact = props.getBoolean("bdb.minimize.scan.impact", false);
this.bdbPrefixKeysWithPartitionId = props.getBoolean("bdb.prefix.keys.with.partitionid",
true);

this.readOnlyBackups = props.getInt("readonly.backups", 1);
this.readOnlySearchStrategy = props.getString("readonly.search.strategy",
Expand Down Expand Up @@ -883,6 +886,74 @@ public void setBdbMinimizeScanImpact(boolean bdbMinimizeScanImpact) {
this.bdbMinimizeScanImpact = bdbMinimizeScanImpact;
}

public boolean isBdbWriteTransactionsEnabled() {
return bdbWriteTransactions;
}

public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
this.bdbWriteTransactions = bdbWriteTransactions;
}

public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
this.bdbOneEnvPerStore = bdbOneEnvPerStore;
}

public boolean isBdbOneEnvPerStore() {
return bdbOneEnvPerStore;
}

/**
* If true, keys will be prefixed by the partition Id on disk. This can
* dramatically speed up rebalancing, restore operations, at the cost of 2
* bytes of extra storage per key
*
* <ul>
* <li>Property : "bdb.prefix.keys.with.partitionid"</li>
* <li>Default : true</li>
* </ul>
*
* @return
*/
public boolean getBdbPrefixKeysWithPartitionId() {
return bdbPrefixKeysWithPartitionId;
}

public void setBdbPrefixKeysWithPartitionId(boolean bdbPrefixKeysWithPartitionId) {
this.bdbPrefixKeysWithPartitionId = bdbPrefixKeysWithPartitionId;
}

public long getBdbCheckpointBytes() {
return this.bdbCheckpointBytes;
}

public void setBdbCheckpointBytes(long bdbCheckpointBytes) {
this.bdbCheckpointBytes = bdbCheckpointBytes;
}

public long getBdbCheckpointMs() {
return this.bdbCheckpointMs;
}

public void setBdbCheckpointMs(long bdbCheckpointMs) {
this.bdbCheckpointMs = bdbCheckpointMs;
}

public long getBdbStatsCacheTtlMs() {
return this.bdbStatsCacheTtlMs;
}

public void setBdbStatsCacheTtlMs(long statsCacheTtlMs) {
this.bdbStatsCacheTtlMs = statsCacheTtlMs;
}

public long getBdbMinimumSharedCache() {
return this.bdbMinimumSharedCache;
}

public void setBdbMinimumSharedCache(long minimumSharedCache) {
this.bdbMinimumSharedCache = minimumSharedCache;
}

/**
* The comfortable number of threads the threadpool will attempt to
* maintain. Specified by "core.threads" default: max(1, floor(0.5 *
Expand Down Expand Up @@ -1240,38 +1311,6 @@ public void setEnableMetadataChecking(boolean enableMetadataChecking) {
this.enableMetadataChecking = enableMetadataChecking;
}

public long getBdbCheckpointBytes() {
return this.bdbCheckpointBytes;
}

public void setBdbCheckpointBytes(long bdbCheckpointBytes) {
this.bdbCheckpointBytes = bdbCheckpointBytes;
}

public long getBdbCheckpointMs() {
return this.bdbCheckpointMs;
}

public void setBdbCheckpointMs(long bdbCheckpointMs) {
this.bdbCheckpointMs = bdbCheckpointMs;
}

public long getBdbStatsCacheTtlMs() {
return this.bdbStatsCacheTtlMs;
}

public void setBdbStatsCacheTtlMs(long statsCacheTtlMs) {
this.bdbStatsCacheTtlMs = statsCacheTtlMs;
}

public long getBdbMinimumSharedCache() {
return this.bdbMinimumSharedCache;
}

public void setBdbMinimumSharedCache(long minimumSharedCache) {
this.bdbMinimumSharedCache = minimumSharedCache;
}

public int getSchedulerThreads() {
return schedulerThreads;
}
Expand Down Expand Up @@ -1319,22 +1358,6 @@ public void setReadOnlyDeleteBackupMs(int readOnlyDeleteBackupTimeMs) {
this.readOnlyDeleteBackupTimeMs = readOnlyDeleteBackupTimeMs;
}

public boolean isBdbWriteTransactionsEnabled() {
return bdbWriteTransactions;
}

public void setBdbWriteTransactions(boolean bdbWriteTransactions) {
this.bdbWriteTransactions = bdbWriteTransactions;
}

public void setBdbOneEnvPerStore(boolean bdbOneEnvPerStore) {
this.bdbOneEnvPerStore = bdbOneEnvPerStore;
}

public boolean isBdbOneEnvPerStore() {
return bdbOneEnvPerStore;
}

public int getSocketBufferSize() {
return socketBufferSize;
}
Expand Down
Expand Up @@ -217,10 +217,10 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
ProtoUtils.writeMessage(outputStream, handleDeleteStore(request.getDeleteStore()));
break;
case FETCH_STORE:
ProtoUtils.writeMessage(outputStream, handleFetchStore(request.getFetchStore()));
ProtoUtils.writeMessage(outputStream, handleFetchROStore(request.getFetchStore()));
break;
case SWAP_STORE:
ProtoUtils.writeMessage(outputStream, handleSwapStore(request.getSwapStore()));
ProtoUtils.writeMessage(outputStream, handleSwapROStore(request.getSwapStore()));
break;
case ROLLBACK_STORE:
ProtoUtils.writeMessage(outputStream,
Expand All @@ -239,12 +239,12 @@ public StreamRequestHandler handleRequest(final DataInputStream inputStream,
handleGetROStorageFormat(request.getGetRoStorageFormat()));
break;
case FETCH_PARTITION_FILES:
return handleFetchPartitionFiles(request.getFetchPartitionFiles());
return handleFetchROPartitionFiles(request.getFetchPartitionFiles());
case UPDATE_SLOP_ENTRIES:
return handleUpdateSlopEntries(request.getUpdateSlopEntries());
case FAILED_FETCH_STORE:
ProtoUtils.writeMessage(outputStream,
handleFailedFetch(request.getFailedFetchStore()));
handleFailedROFetch(request.getFailedFetchStore()));
break;
case REBALANCE_STATE_CHANGE:
ProtoUtils.writeMessage(outputStream,
Expand Down Expand Up @@ -493,7 +493,7 @@ public VAdminProto.GetROStorageFormatResponse handleGetROStorageFormat(VAdminPro
return response.build();
}

public VAdminProto.FailedFetchStoreResponse handleFailedFetch(VAdminProto.FailedFetchStoreRequest request) {
public VAdminProto.FailedFetchStoreResponse handleFailedROFetch(VAdminProto.FailedFetchStoreRequest request) {
final String storeDir = request.getStoreDir();
final String storeName = request.getStoreName();
VAdminProto.FailedFetchStoreResponse.Builder response = VAdminProto.FailedFetchStoreResponse.newBuilder();
Expand Down Expand Up @@ -528,7 +528,7 @@ public VAdminProto.FailedFetchStoreResponse handleFailedFetch(VAdminProto.Failed
return response.build();
}

public StreamRequestHandler handleFetchPartitionFiles(VAdminProto.FetchPartitionFilesRequest request) {
public StreamRequestHandler handleFetchROPartitionFiles(VAdminProto.FetchPartitionFilesRequest request) {
return new FetchPartitionFileStreamRequestHandler(request,
metadataStore,
voldemortConfig,
Expand All @@ -542,23 +542,44 @@ public StreamRequestHandler handleUpdateSlopEntries(VAdminProto.UpdateSlopEntrie

public StreamRequestHandler handleFetchPartitionEntries(VAdminProto.FetchPartitionEntriesRequest request) {
boolean fetchValues = request.hasFetchValues() && request.getFetchValues();
StorageEngine<ByteArray, byte[], byte[]> storageEngine = AdminServiceRequestHandler.getStorageEngine(storeRepository,
request.getStore());

if(fetchValues) {
return new FetchEntriesStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
} else
return new FetchKeysStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
if(storageEngine.isPartitionScanSupported())
return new FetchPartitionEntriesStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
else
return new FetchEntriesStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
} else {
if(storageEngine.isPartitionScanSupported())
return new FetchPartitionKeysStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
else
return new FetchKeysStreamRequestHandler(request,
metadataStore,
errorCodeMapper,
voldemortConfig,
storeRepository,
networkClassLoader,
stats);
}
}

public StreamRequestHandler handleUpdatePartitionEntries(VAdminProto.UpdatePartitionEntriesRequest request) {
Expand Down Expand Up @@ -687,7 +708,7 @@ private String swapStore(String storeName, String directory) throws VoldemortExc
return currentDirPath;
}

public VAdminProto.SwapStoreResponse handleSwapStore(VAdminProto.SwapStoreRequest request) {
public VAdminProto.SwapStoreResponse handleSwapROStore(VAdminProto.SwapStoreRequest request) {
final String dir = request.getStoreDir();
final String storeName = request.getStoreName();
VAdminProto.SwapStoreResponse.Builder response = VAdminProto.SwapStoreResponse.newBuilder();
Expand All @@ -713,7 +734,7 @@ public VAdminProto.SwapStoreResponse handleSwapStore(VAdminProto.SwapStoreReques
}
}

public VAdminProto.AsyncOperationStatusResponse handleFetchStore(VAdminProto.FetchStoreRequest request) {
public VAdminProto.AsyncOperationStatusResponse handleFetchROStore(VAdminProto.FetchStoreRequest request) {
final String fetchUrl = request.getStoreDir();
final String storeName = request.getStoreName();

Expand Down
Expand Up @@ -15,6 +15,7 @@
import voldemort.store.stats.StreamStats;
import voldemort.store.stats.StreamStats.Operation;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.NetworkClassLoader;
import voldemort.utils.RebalanceUtils;
import voldemort.versioning.Versioned;
Expand All @@ -31,6 +32,8 @@

public class FetchEntriesStreamRequestHandler extends FetchStreamRequestHandler {

protected final ClosableIterator<ByteArray> keyIterator;

public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
MetadataStore metadataStore,
ErrorCodeMapper errorCodeMapper,
Expand All @@ -46,6 +49,7 @@ public FetchEntriesStreamRequestHandler(FetchPartitionEntriesRequest request,
networkClassLoader,
stats,
Operation.FETCH_ENTRIES);
this.keyIterator = storageEngine.keys();
logger.info("Starting fetch entries for store '" + storageEngine.getName()
+ "' with replica to partition mapping " + replicaToPartitionList);
}
Expand All @@ -65,7 +69,7 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
initialCluster,
storeDef)

&& counter % skipRecords == 0) {
&& counter % skipRecords == 0) {
List<Versioned<byte[]>> values = storageEngine.get(key, null);
stats.recordDiskTime(handle, System.nanoTime() - startNs);
for(Versioned<byte[]> value: values) {
Expand Down Expand Up @@ -114,4 +118,10 @@ public StreamRequestHandlerState handleRequest(DataInputStream inputStream,
}
}

@Override
public final void close(DataOutputStream outputStream) throws IOException {
if(null != keyIterator)
keyIterator.close();
super.close(outputStream);
}
}

0 comments on commit 4c1064e

Please sign in to comment.