Skip to content

Commit

Permalink
Bug fix in AsyncMetadataVersionManager. Client config parameters adde…
Browse files Browse the repository at this point in the history
…d for System store. Changed AdminClient to use a Timestamp instead of a counter for metadata version
  • Loading branch information
Chinmay Soman committed Sep 14, 2012
1 parent 07be427 commit 91ab1d8
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 101 deletions.
4 changes: 4 additions & 0 deletions src/java/voldemort/VoldemortAdminTool.java
Expand Up @@ -583,6 +583,10 @@ private static void synchronizeMetadataVersion(AdminClient adminClient, int base
Properties props = new Properties();
try {
props.load(new ByteArrayInputStream(valueObject.getBytes()));
if(props.size() == 0) {
System.err.println("The specified node does not have any versions metadata ! Exiting ...");
System.exit(-1);
}
adminClient.setMetadataversion(props);
System.out.println("Metadata versions synchronized successfully.");
} catch(IOException e) {
Expand Down
6 changes: 3 additions & 3 deletions src/java/voldemort/client/AbstractStoreClientFactory.java
Expand Up @@ -193,10 +193,10 @@ public <K, V, T> Store<K, V, T> getRawStore(String storeName,
String clusterXmlString,
FailureDetector fd) {

if(logger.isDebugEnabled()) {
logger.debug("Client zone-id [" + clientZoneId
+ "] Attempting to obtain metadata for store [" + storeName + "] ");
logger.info("Client zone-id [" + clientZoneId
+ "] Attempting to obtain metadata for store [" + storeName + "] ");

if(logger.isDebugEnabled()) {
for(URI uri: bootstrapUrls) {
logger.debug("Client Bootstrap url [" + uri + "]");
}
Expand Down
104 changes: 103 additions & 1 deletion src/java/voldemort/client/ClientConfig.java
Expand Up @@ -89,6 +89,14 @@ public class ClientConfig {
private volatile int clientRegistryRefreshInterval = 3600 * 12;
private volatile int asyncJobThreadPoolSize = 2;

/* SystemStore client config */
private volatile int sysMaxConnectionsPerNode = 2;
private volatile int sysRoutingTimeout = 5000;
private volatile int sysSocketTimeout = 5000;
private volatile int sysConnectionTimeout = 1500;
private volatile boolean sysEnableJmx = false;
private volatile boolean sysEnablePipelineRoutedStore = true;

public ClientConfig() {}

/* Propery names for propery-based configuration */
Expand Down Expand Up @@ -130,9 +138,15 @@ public ClientConfig() {}
public static final String MAX_BOOTSTRAP_RETRIES = "max_bootstrap_retries";
public static final String CLIENT_CONTEXT_NAME = "voldemort_client_context";
public static final String ASYNC_CHECK_METADATA_INTERVAL = "check_metadata_interval";
private static final String USE_DEFAULT_CLIENT = "use_default_client";
public static final String USE_DEFAULT_CLIENT = "use_default_client";
public static final String CLIENT_REGISTRY_REFRESH_INTERVAL = "client_registry_refresh_interval";
public static final String ASYNC_JOB_THREAD_POOL_SIZE = "async_job_thread_pool_size";
public static final String SYS_MAX_CONNECTIONS_PER_NODE = "sys_max_connections_per_node";
public static final String SYS_ROUTING_TIMEOUT_MS = "sys_routing_timeout_ms";
public static final String SYS_CONNECTION_TIMEOUT_MS = "sys_connection_timeout_ms";
public static final String SYS_SOCKET_TIMEOUT_MS = "sys_socket_timeout_ms";
public static final String SYS_ENABLE_JMX = "sys_enable_jmx";
public static final String SYS_ENABLE_PIPELINE_ROUTED_STORE = "sys_enable_pipeline_routed_store";

/**
* Instantiate the client config using a properties file
Expand Down Expand Up @@ -310,6 +324,94 @@ private void setProperties(Properties properties) {
if(props.containsKey(ASYNC_JOB_THREAD_POOL_SIZE)) {
this.setAsyncJobThreadPoolSize(props.getInt(ASYNC_JOB_THREAD_POOL_SIZE));
}

/* Check for system store paramaters if any */
if(props.containsKey(SYS_MAX_CONNECTIONS_PER_NODE)) {
this.setSysMaxConnectionsPerNode(props.getInt(SYS_MAX_CONNECTIONS_PER_NODE));
}

if(props.containsKey(SYS_ROUTING_TIMEOUT_MS)) {
this.setSysRoutingTimeout(props.getInt(SYS_ROUTING_TIMEOUT_MS));
}

if(props.containsKey(SYS_SOCKET_TIMEOUT_MS)) {
this.setSysSocketTimeout(props.getInt(SYS_SOCKET_TIMEOUT_MS));
}

if(props.containsKey(SYS_CONNECTION_TIMEOUT_MS)) {
this.setSysConnectionTimeout(props.getInt(SYS_CONNECTION_TIMEOUT_MS));
}

if(props.containsKey(SYS_ENABLE_JMX)) {
this.setSysEnableJmx(props.getBoolean(SYS_ENABLE_JMX));
}

if(props.containsKey(SYS_ENABLE_PIPELINE_ROUTED_STORE)) {
this.setSysEnablePipelineRoutedStore(props.getBoolean(SYS_ENABLE_PIPELINE_ROUTED_STORE));
}

}

private ClientConfig setSysMaxConnectionsPerNode(int maxConnectionsPerNode) {
if(maxConnectionsPerNode <= 0)
throw new IllegalArgumentException("Value must be greater than zero.");
this.sysMaxConnectionsPerNode = maxConnectionsPerNode;
return this;
}

public int getSysMaxConnectionsPerNode() {
return this.sysMaxConnectionsPerNode;
}

private ClientConfig setSysRoutingTimeout(int sysRoutingTimeout) {
if(sysRoutingTimeout <= 0)
throw new IllegalArgumentException("Value must be greater than zero.");
this.sysRoutingTimeout = sysRoutingTimeout;
return this;
}

public int getSysRoutingTimeout() {
return this.sysRoutingTimeout;
}

private ClientConfig setSysSocketTimeout(int sysSocketTimeout) {
if(sysSocketTimeout <= 0)
throw new IllegalArgumentException("Value must be greater than zero.");
this.sysSocketTimeout = sysSocketTimeout;
return this;
}

public int getSysSocketTimeout() {
return this.sysSocketTimeout;
}

private ClientConfig setSysConnectionTimeout(int sysConnectionTimeout) {
if(sysConnectionTimeout <= 0)
throw new IllegalArgumentException("Value must be greater than zero.");
this.sysConnectionTimeout = sysConnectionTimeout;
return this;
}

public int getSysConnectionTimeout() {
return this.sysConnectionTimeout;
}

public boolean getSysEnableJmx() {
return this.sysEnableJmx;
}

public ClientConfig setSysEnableJmx(boolean sysEnableJmx) {
this.sysEnableJmx = sysEnableJmx;
return this;
}

public boolean getSysEnablePipelineRoutedStore() {
return this.sysEnablePipelineRoutedStore;
}

public ClientConfig setSysEnablePipelineRoutedStore(boolean sysEnablePipelineRoutedStore) {
this.sysEnablePipelineRoutedStore = sysEnablePipelineRoutedStore;
return this;
}

public int getMaxConnectionsPerNode() {
Expand Down
19 changes: 19 additions & 0 deletions src/java/voldemort/client/ClientInfo.java
Expand Up @@ -48,6 +48,7 @@ public class ClientInfo implements Serializable {
private long updateTime;
private String releaseVersion;
private ClientConfig config;
private long clusterMetadataVersion;

public ClientInfo(String storeName,
String clientContext,
Expand All @@ -64,6 +65,7 @@ public ClientInfo(String storeName,
this.updateTime = bootstrapTime;
this.releaseVersion = version;
this.config = config;
this.clusterMetadataVersion = 0;

if(logger.isDebugEnabled()) {
logger.debug(this.toString());
Expand Down Expand Up @@ -161,6 +163,10 @@ public synchronized ClientConfig getClientConfig() {
return this.config;
}

public synchronized void setClusterMetadataVersion(long newVersion) {
this.clusterMetadataVersion = newVersion;
}

/**
* At the moment we're not checking if the Config objects are similar. TODO:
* reevaluate in the future.
Expand Down Expand Up @@ -195,6 +201,7 @@ public synchronized String toString() {
builder.append("storeName=").append(storeName).append("\n");
builder.append("updateTime=").append(updateTime).append("\n");
builder.append("releaseVersion=").append(releaseVersion).append("\n");
builder.append("clusterMetadataVersion=").append(clusterMetadataVersion).append("\n");

/**
* Append the Client Config information. Right now we only track the
Expand Down Expand Up @@ -222,6 +229,18 @@ public synchronized String toString() {
builder.append("failuredetector_implementation=")
.append(this.config.getFailureDetectorImplementation())
.append("\n");
builder.append("failuredetector_threshold=")
.append(this.config.getFailureDetectorThreshold())
.append("\n");
builder.append("failuredetector_threshold_count_minimum=")
.append(this.config.getFailureDetectorThresholdCountMinimum())
.append("\n");
builder.append("failuredetector_threshold_interval=")
.append(this.config.getFailureDetectorThresholdInterval())
.append("\n");
builder.append("failuredetector_threshold_async_recovery_interval=")
.append(this.config.getFailureDetectorAsyncRecoveryInterval())
.append("\n");

return builder.toString();
}
Expand Down
89 changes: 79 additions & 10 deletions src/java/voldemort/client/SystemStore.java
Expand Up @@ -14,40 +14,109 @@
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

/*
* A client interface for interacting with System stores (managed by the
* cluster). The naming convention is kept consistent with SocketStore (which is
* also a client interface).
*/
public class SystemStore<K, V> {

private final Logger logger = Logger.getLogger(SystemStore.class);
private final SocketStoreClientFactory systemStoreFactory;
private final SocketStoreClientFactory socketStoreFactory;
private final String storeName;
private volatile Store<K, V, Object> sysStore;

/**
* Wrapper for the actual SystemStore constructor. Used when we dont have
* custom Cluster XML, failure detector or a base Voldemort Client config to
* be used with this system store client.
*
* @param storeName Name of the system store
* @param bootstrapUrls Bootstrap URLs used to connect to
* @param clientZoneID Primary zone ID for this system store client
* (determines routing strategy)
*/
public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID) {
this(storeName, bootstrapUrls, clientZoneID, null, null);
this(storeName, bootstrapUrls, clientZoneID, null, null, new ClientConfig());
}

/**
* Wrapper for the actual SystemStore constructor. Used when we dont have
* custom Cluster XML or failure detector to be used with this system store
* client.
*
* @param storeName Name of the system store
* @param bootstrapUrls Bootstrap URLs used to connect to
* @param clientZoneID Primary zone ID for this system store client
* (determines routing strategy)
* @param baseConfig Base Voldemort Client config which specifies properties
* for this system store client
*/
public SystemStore(String storeName,
String[] bootstrapUrls,
int clientZoneID,
ClientConfig baseConfig) {
this(storeName, bootstrapUrls, clientZoneID, null, null, baseConfig);
}

/**
* SystemStore Constructor wrapper for the actual constructor. Used when we
* dont want to specify a base Voldemort Client Config.
*
* @param storeName Name of the system store
* @param bootstrapUrls Bootstrap URLs used to connect to
* @param clientZoneID Primary zone ID for this system store client
* (determines routing strategy)
* @param clusterXml Custom ClusterXml to be used for this system store
* client
* @param fd Failure Detector to be used with this system store client
*/
public SystemStore(String storeName,
String[] bootstrapUrls,
int clientZoneID,
String clusterXml,
FailureDetector fd) {
this(storeName, bootstrapUrls, clientZoneID, clusterXml, fd, new ClientConfig());
}

/**
* SystemStore Constructor that creates a system store client which can be
* used to interact with the system stores managed by the cluster
*
* @param storeName Name of the system store
* @param bootstrapUrls Bootstrap URLs used to connect to
* @param clientZoneID Primary zone ID for this system store client
* (determines routing strategy)
* @param clusterXml Custom ClusterXml to be used for this system store
* client
* @param fd Failure Detector to be used with this system store client
* @param baseConfig Base Voldemort Client config which specifies properties
* for this system store client
*/
public SystemStore(String storeName,
String[] bootstrapUrls,
int clientZoneID,
String clusterXml,
FailureDetector fd,
ClientConfig baseConfig) {
String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length());
if(!SystemStoreConstants.NAME_PREFIX.equals(prefix))
throw new VoldemortException("Illegal system store : " + storeName);

ClientConfig config = new ClientConfig();
config.setSelectors(1)
.setBootstrapUrls(bootstrapUrls)
.setMaxConnectionsPerNode(2)
.setConnectionTimeout(1500, TimeUnit.MILLISECONDS)
.setSocketTimeout(5000, TimeUnit.MILLISECONDS)
.setRoutingTimeout(5000, TimeUnit.MILLISECONDS)
.setEnableJmx(false)
.setEnablePipelineRoutedStore(true)
.setMaxConnectionsPerNode(baseConfig.getSysMaxConnectionsPerNode())
.setConnectionTimeout(baseConfig.getSysConnectionTimeout(), TimeUnit.MILLISECONDS)
.setSocketTimeout(baseConfig.getSysSocketTimeout(), TimeUnit.MILLISECONDS)
.setRoutingTimeout(baseConfig.getSysRoutingTimeout(), TimeUnit.MILLISECONDS)
.setEnableJmx(baseConfig.getSysEnableJmx())
.setEnablePipelineRoutedStore(baseConfig.getSysEnablePipelineRoutedStore())
.setClientZoneId(clientZoneID);
this.systemStoreFactory = new SystemStoreClientFactory(config);
this.socketStoreFactory = new SocketStoreClientFactory(config);
this.storeName = storeName;
try {
this.sysStore = this.systemStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
this.sysStore = this.socketStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
} catch(Exception e) {
logger.debug("Error while creating a system store client for store : " + this.storeName);
}
Expand Down
14 changes: 0 additions & 14 deletions src/java/voldemort/client/SystemStoreClientFactory.java

This file was deleted.

3 changes: 2 additions & 1 deletion src/java/voldemort/client/SystemStoreRepository.java
Expand Up @@ -29,7 +29,8 @@ public void createSystemStores(ClientConfig config, String clusterXml, FailureDe
config.getBootstrapUrls(),
config.getClientZoneId(),
clusterXml,
fd);
fd,
config);
this.sysStoreMap.put(storeName.name(), sysStore);
}
}
Expand Down

0 comments on commit 91ab1d8

Please sign in to comment.