Skip to content

Commit

Permalink
- 'retention-frequency' to control frequency for running DataCleanupJob
Browse files Browse the repository at this point in the history
- Use same jmxid as the factory across the board
- add server config to control socket backlog
- add count() jmx call to obtain number of k-v pairs from store
  • Loading branch information
vinothchandar committed Aug 30, 2012
1 parent 5e7ef39 commit 9bafcc8
Show file tree
Hide file tree
Showing 26 changed files with 510 additions and 65 deletions.
21 changes: 9 additions & 12 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -91,7 +91,7 @@ public abstract class AbstractStoreClientFactory implements StoreClientFactory {
private final SerializerFactory serializerFactory;
private final boolean isJmxEnabled;
private final RequestFormatType requestFormatType;
private final int jmxId;
protected final int jmxId;
protected volatile FailureDetector failureDetector;
private final int maxBootstrapRetries;
private final StoreStats stats;
Expand Down Expand Up @@ -120,10 +120,11 @@ public AbstractStoreClientFactory(ClientConfig config) {
JmxUtils.registerMbean(threadPool,
JmxUtils.createObjectName(JmxUtils.getPackageName(threadPool.getClass()),
JmxUtils.getClassName(threadPool.getClass())
+ jmxId()));
+ JmxUtils.getJmxId(jmxId)));
JmxUtils.registerMbean(new StoreStatsJmx(stats),
JmxUtils.createObjectName("voldemort.store.stats.aggregate",
"aggregate-perf" + jmxId()));
"aggregate-perf"
+ JmxUtils.getJmxId(jmxId)));
}
}

Expand Down Expand Up @@ -215,15 +216,17 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
repairReads,
clientZoneId,
getFailureDetector(),
isJmxEnabled);
isJmxEnabled,
jmxId);
store = new LoggingStore(store);

if(isJmxEnabled) {
StatTrackingStore statStore = new StatTrackingStore(store, this.stats);
store = statStore;
JmxUtils.registerMbean(new StoreStatsJmx(statStore.getStats()),
JmxUtils.createObjectName(JmxUtils.getPackageName(store.getClass()),
store.getName() + jmxId()));
store.getName()
+ JmxUtils.getJmxId(jmxId)));
}

if(storeDef.getKeySerializer().hasCompression()
Expand Down Expand Up @@ -279,7 +282,7 @@ public FailureDetector getFailureDetector() {
JmxUtils.registerMbean(failureDetector,
JmxUtils.createObjectName(JmxUtils.getPackageName(failureDetector.getClass()),
JmxUtils.getClassName(failureDetector.getClass())
+ jmxId()));
+ JmxUtils.getJmxId(jmxId)));
}
}
}
Expand Down Expand Up @@ -403,10 +406,4 @@ public void close() {
if(failureDetector != null)
failureDetector.destroy();
}

/* Give a unique id to avoid jmx clashes */
public String jmxId() {
return jmxId == 0 ? "" : Integer.toString(jmxId);
}

}
5 changes: 3 additions & 2 deletions src/java/voldemort/client/SocketStoreClientFactory.java
Expand Up @@ -67,7 +67,8 @@ public SocketStoreClientFactory(ClientConfig config) {
config.getSocketTimeout(TimeUnit.MILLISECONDS),
config.getSocketBufferSize(),
config.getSocketKeepAlive(),
config.isJmxEnabled());
config.isJmxEnabled(),
jmxId);
}

@Override
Expand Down Expand Up @@ -149,7 +150,7 @@ protected Store<ByteArray, byte[], byte[]> getStoreInternal(Node node) {
FailureDetectorConfig failureDetectorConfig = new FailureDetectorConfig(config).setNodes(nodes)
.setStoreVerifier(storeVerifier);

return create(failureDetectorConfig, true, failureDetectorListener);
return create(failureDetectorConfig, false, failureDetectorListener);
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -113,6 +113,7 @@ public class VoldemortConfig implements Serializable {
private boolean useNioConnector;
private int nioConnectorSelectors;
private int nioAdminConnectorSelectors;
private int nioAcceptorBacklog;

private int clientSelectors;
private int clientRoutingTimeoutMs;
Expand Down Expand Up @@ -277,6 +278,8 @@ public VoldemortConfig(Props props) {
this.nioAdminConnectorSelectors = props.getInt("nio.admin.connector.selectors",
Math.max(8, Runtime.getRuntime()
.availableProcessors()));
// a value <= 0 forces the default to be used
this.nioAcceptorBacklog = props.getInt("nio.acceptor.backlog", -1);

this.clientSelectors = props.getInt("client.selectors", 4);
this.clientMaxConnectionsPerNode = props.getInt("client.max.connections.per.node", 50);
Expand Down Expand Up @@ -1288,6 +1291,14 @@ public void setNioAdminConnectorSelectors(int nioAdminConnectorSelectors) {
this.nioAdminConnectorSelectors = nioAdminConnectorSelectors;
}

public int getNioAcceptorBacklog() {
return nioAcceptorBacklog;
}

public void setNioAcceptorBacklog(int nioAcceptorBacklog) {
this.nioAcceptorBacklog = nioAcceptorBacklog;
}

public int getAdminSocketBufferSize() {
return adminStreamBufferSize;
}
Expand Down
6 changes: 4 additions & 2 deletions src/java/voldemort/server/VoldemortServer.java
Expand Up @@ -183,7 +183,8 @@ private List<VoldemortService> createServices() {
voldemortConfig.getSocketBufferSize(),
voldemortConfig.getNioConnectorSelectors(),
"nio-socket-server",
voldemortConfig.isJmxEnabled()));
voldemortConfig.isJmxEnabled(),
voldemortConfig.getNioAcceptorBacklog()));
} else {
logger.info("Using BIO Connector.");
services.add(new SocketService(socketRequestHandlerFactory,
Expand Down Expand Up @@ -222,7 +223,8 @@ private List<VoldemortService> createServices() {
voldemortConfig.getAdminSocketBufferSize(),
voldemortConfig.getNioAdminConnectorSelectors(),
"admin-server",
voldemortConfig.isJmxEnabled()));
voldemortConfig.isJmxEnabled(),
voldemortConfig.getNioAcceptorBacklog()));
} else {
logger.info("Using BIO Connector for Admin Service.");
services.add(new SocketService(adminRequestHandlerFactory,
Expand Down
8 changes: 6 additions & 2 deletions src/java/voldemort/server/niosocket/NioSocketService.java
Expand Up @@ -72,6 +72,8 @@ public class NioSocketService extends AbstractSocketService {

private final int socketBufferSize;

private final int acceptorBacklog;

private final StatusManager statusManager;

private final Thread acceptorThread;
Expand All @@ -83,10 +85,12 @@ public NioSocketService(RequestHandlerFactory requestHandlerFactory,
int socketBufferSize,
int selectors,
String serviceName,
boolean enableJmx) {
boolean enableJmx,
int acceptorBacklog) {
super(ServiceType.SOCKET, port, serviceName, enableJmx);
this.requestHandlerFactory = requestHandlerFactory;
this.socketBufferSize = socketBufferSize;
this.acceptorBacklog = acceptorBacklog;

try {
this.serverSocketChannel = ServerSocketChannel.open();
Expand Down Expand Up @@ -122,7 +126,7 @@ protected void startInner() {
selectorManagerThreadPool.execute(selectorManagers[i]);
}

serverSocketChannel.socket().bind(endpoint);
serverSocketChannel.socket().bind(endpoint, acceptorBacklog);
serverSocketChannel.socket().setReceiveBufferSize(socketBufferSize);
serverSocketChannel.socket().setReuseAddress(true);

Expand Down
8 changes: 5 additions & 3 deletions src/java/voldemort/server/storage/StorageService.java
Expand Up @@ -239,6 +239,7 @@ protected void startInner() {
null,
null,
null,
null,
0);
SlopStorageEngine slopEngine = new SlopStorageEngine(config.getStore(slopStoreDefinition),
metadata.getCluster());
Expand Down Expand Up @@ -631,12 +632,13 @@ private void scheduleCleanupJob(StoreDefinition storeDef,
JmxUtils.registerMbean("DataCleanupJob-" + engine.getName(), cleanupJob);
}

long retentionFreqHours = storeDef.hasRetentionFrequencyDays() ? (storeDef.getRetentionFrequencyDays() * Time.HOURS_PER_DAY)
: voldemortConfig.getRetentionCleanupScheduledPeriodInHour();

this.scheduler.schedule("cleanup-" + storeDef.getName(),
cleanupJob,
startTime,
voldemortConfig.getRetentionCleanupScheduledPeriodInHour()
* Time.MS_PER_HOUR);

retentionFreqHours * Time.MS_PER_HOUR);
}

@Override
Expand Down
11 changes: 11 additions & 0 deletions src/java/voldemort/store/StoreDefinition.java
Expand Up @@ -50,6 +50,7 @@ public class StoreDefinition implements Serializable {
private final int requiredReads;
private final Integer retentionPeriodDays;
private final Integer retentionScanThrottleRate;
private final Integer retentionFrequencyDays;
private final String routingStrategyType;
private final String viewOf;
private final HashMap<Integer, Integer> zoneReplicationFactor;
Expand Down Expand Up @@ -82,6 +83,7 @@ public StoreDefinition(String name,
Integer zoneCountWrites,
Integer retentionDays,
Integer retentionThrottleRate,
Integer retentionFrequencyDays,
String factory,
HintedHandoffStrategyType hintedHandoffStrategyType,
Integer hintPrefListSize,
Expand All @@ -101,6 +103,7 @@ public StoreDefinition(String name,
this.transformsSerializer = transformsSerializer;
this.retentionPeriodDays = retentionDays;
this.retentionScanThrottleRate = retentionThrottleRate;
this.retentionFrequencyDays = retentionFrequencyDays;
this.memoryFootprintMB = memoryFootprintMB;
this.routingStrategyType = routingStrategyType;
this.viewOf = viewOfStore;
Expand Down Expand Up @@ -282,6 +285,14 @@ public Integer getRetentionScanThrottleRate() {
return this.retentionScanThrottleRate;
}

public boolean hasRetentionFrequencyDays() {
return this.retentionFrequencyDays != null;
}

public Integer getRetentionFrequencyDays() {
return this.retentionFrequencyDays;
}

public boolean isView() {
return this.viewOf != null;
}
Expand Down
11 changes: 11 additions & 0 deletions src/java/voldemort/store/StoreDefinitionBuilder.java
Expand Up @@ -30,6 +30,7 @@ public class StoreDefinitionBuilder {
private int requiredReads = -1;
private Integer retentionPeriodDays = null;
private Integer retentionScanThrottleRate = null;
private Integer retentionFrequencyDays = null;
private String routingStrategyType = null;
private String viewOf = null;
private HashMap<Integer, Integer> zoneReplicationFactor = null;
Expand Down Expand Up @@ -180,6 +181,15 @@ public StoreDefinitionBuilder setRetentionScanThrottleRate(Integer retentionScan
return this;
}

public Integer getRetentionFrequencyDays() {
return this.retentionFrequencyDays;
}

public StoreDefinitionBuilder setRetentionFrequencyDays(Integer retentionFreqDays) {
this.retentionFrequencyDays = retentionFreqDays;
return this;
}

public String getRoutingStrategyType() {
return routingStrategyType;
}
Expand Down Expand Up @@ -304,6 +314,7 @@ public StoreDefinition build() {
this.getZoneCountWrites(),
this.getRetentionPeriodDays(),
this.getRetentionScanThrottleRate(),
this.getRetentionFrequencyDays(),
this.getSerializerFactory(),
this.getHintedHandoffStrategy(),
this.getHintPrefListSize(),
Expand Down
16 changes: 16 additions & 0 deletions src/java/voldemort/store/bdb/BdbStorageConfiguration.java
Expand Up @@ -294,6 +294,22 @@ public void cleanLogs() {
}
}

@JmxOperation(description = "Obtain the number of k-v entries in the store")
public long getEntryCount(String storeName) throws Exception {
Environment storeEnv = environments.get(storeName);
if(storeEnv != null) {
Database storeDb = null;
try {
storeDb = storeEnv.openDatabase(null, storeName, databaseConfig);
return storeDb.count();
} finally {
if(storeDb != null)
storeDb.close();
}
}
return 0;
}

public void close() {
synchronized(lock) {
try {
Expand Down
17 changes: 5 additions & 12 deletions src/java/voldemort/store/routed/PipelineRoutedStore.java
Expand Up @@ -21,7 +21,6 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import voldemort.VoldemortException;
import voldemort.client.TimeoutConfig;
Expand Down Expand Up @@ -70,16 +69,13 @@
*/
public class PipelineRoutedStore extends RoutedStore {

private static AtomicInteger jmxIdCounter = new AtomicInteger(0);

private final Map<Integer, NonblockingStore> nonblockingStores;
private final Map<Integer, Store<ByteArray, Slop, byte[]>> slopStores;
private final Map<Integer, NonblockingStore> nonblockingSlopStores;
private final HintedHandoffStrategy handoffStrategy;
private Zone clientZone;
private boolean zoneRoutingEnabled;
private PipelineRoutedStats stats;
private final int jmxId;

/**
* Create a PipelineRoutedStore
Expand All @@ -94,6 +90,8 @@ public class PipelineRoutedStore extends RoutedStore {
* @param clientZoneId Zone the client is in
* @param timeoutMs Routing timeout
* @param failureDetector Failure detector object
* @param jmxEnabled is monitoring enabled
* @param jmxId unique ID for the factory instance
*/
public PipelineRoutedStore(String name,
Map<Integer, Store<ByteArray, byte[], byte[]>> innerStores,
Expand All @@ -106,7 +104,8 @@ public PipelineRoutedStore(String name,
int clientZoneId,
TimeoutConfig timeoutConfig,
FailureDetector failureDetector,
boolean jmxEnabled) {
boolean jmxEnabled,
int jmxId) {
super(name,
innerStores,
cluster,
Expand All @@ -122,7 +121,6 @@ public PipelineRoutedStore(String name,
} else {
zoneRoutingEnabled = false;
}
this.jmxId = jmxIdCounter.getAndIncrement();
this.nonblockingStores = new ConcurrentHashMap<Integer, NonblockingStore>(nonblockingStores);
this.slopStores = slopStores;
if(storeDef.hasHintedHandoffStrategyType()) {
Expand All @@ -137,7 +135,7 @@ public PipelineRoutedStore(String name,
stats = new PipelineRoutedStats();
JmxUtils.registerMbean(stats,
JmxUtils.createObjectName(JmxUtils.getPackageName(stats.getClass()),
getName() + jmxId()));
getName() + JmxUtils.getJmxId(jmxId)));
}
}

Expand Down Expand Up @@ -727,9 +725,4 @@ public void close() {

super.close();
}

/* Give a unique id to avoid jmx clashes */
private String jmxId() {
return jmxId == 0 ? "" : Integer.toString(jmxId);
}
}
9 changes: 6 additions & 3 deletions src/java/voldemort/store/routed/RoutedStoreFactory.java
Expand Up @@ -68,7 +68,8 @@ public RoutedStore create(Cluster cluster,
repairReads,
clientZoneId,
failureDetector,
false);
false,
0);
}

public RoutedStore create(Cluster cluster,
Expand All @@ -80,7 +81,8 @@ public RoutedStore create(Cluster cluster,
boolean repairReads,
int clientZoneId,
FailureDetector failureDetector,
boolean jmxEnabled) {
boolean jmxEnabled,
int jmxId) {
if(isPipelineRoutedStoreEnabled) {
return new PipelineRoutedStore(storeDefinition.getName(),
nodeStores,
Expand All @@ -93,7 +95,8 @@ public RoutedStore create(Cluster cluster,
clientZoneId,
timeoutConfig,
failureDetector,
jmxEnabled);
jmxEnabled,
jmxId);
} else {
if(storeDefinition.getRoutingStrategyType()
.compareTo(RoutingStrategyType.ZONE_STRATEGY) == 0) {
Expand Down

0 comments on commit 9bafcc8

Please sign in to comment.