Skip to content

Commit

Permalink
Moving SocketStoreClientFactory creation from SystemStore to SystemSt…
Browse files Browse the repository at this point in the history
…oreRepository. This prevents leaking of sockets during re-bootstrap
  • Loading branch information
Chinmay Soman committed Jul 16, 2013
1 parent f826906 commit 9142525
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 24 deletions.
41 changes: 24 additions & 17 deletions src/java/voldemort/client/SystemStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
public class SystemStore<K, V> {

private final Logger logger = Logger.getLogger(SystemStore.class);
private final SocketStoreClientFactory socketStoreFactory;
private final String storeName;
private volatile Store<K, V, Object> sysStore;

Expand All @@ -58,7 +57,7 @@ public class SystemStore<K, V> {
* (determines routing strategy)
*/
public SystemStore(String storeName, String[] bootstrapUrls, int clientZoneID) {
this(storeName, bootstrapUrls, clientZoneID, null, null, new ClientConfig());
this(storeName, bootstrapUrls, clientZoneID, null, null, new ClientConfig(), null);
}

/**
Expand All @@ -77,7 +76,7 @@ public SystemStore(String storeName,
String[] bootstrapUrls,
int clientZoneID,
ClientConfig baseConfig) {
this(storeName, bootstrapUrls, clientZoneID, null, null, baseConfig);
this(storeName, bootstrapUrls, clientZoneID, null, null, baseConfig, null);
}

/**
Expand All @@ -97,7 +96,7 @@ public SystemStore(String storeName,
int clientZoneID,
String clusterXml,
FailureDetector fd) {
this(storeName, bootstrapUrls, clientZoneID, clusterXml, fd, new ClientConfig());
this(storeName, bootstrapUrls, clientZoneID, clusterXml, fd, new ClientConfig(), null);
}

/**
Expand All @@ -119,25 +118,33 @@ public SystemStore(String storeName,
int clientZoneID,
String clusterXml,
FailureDetector fd,
ClientConfig baseConfig) {
ClientConfig baseConfig,
SocketStoreClientFactory factory) {
String prefix = storeName.substring(0, SystemStoreConstants.NAME_PREFIX.length());
if(!SystemStoreConstants.NAME_PREFIX.equals(prefix))
throw new VoldemortException("Illegal system store : " + storeName);

ClientConfig config = new ClientConfig();
config.setSelectors(1)
.setBootstrapUrls(bootstrapUrls)
.setMaxConnectionsPerNode(baseConfig.getSysMaxConnectionsPerNode())
.setConnectionTimeout(baseConfig.getSysConnectionTimeout(), TimeUnit.MILLISECONDS)
.setSocketTimeout(baseConfig.getSysSocketTimeout(), TimeUnit.MILLISECONDS)
.setRoutingTimeout(baseConfig.getSysRoutingTimeout(), TimeUnit.MILLISECONDS)
.setEnableJmx(baseConfig.getSysEnableJmx())
.setEnablePipelineRoutedStore(baseConfig.getSysEnablePipelineRoutedStore())
.setClientZoneId(clientZoneID);
this.socketStoreFactory = new SocketStoreClientFactory(config);
SocketStoreClientFactory socketStoreFactory = null;

if(factory == null) {
ClientConfig config = new ClientConfig();
config.setSelectors(1)
.setBootstrapUrls(bootstrapUrls)
.setMaxConnectionsPerNode(baseConfig.getSysMaxConnectionsPerNode())
.setConnectionTimeout(baseConfig.getSysConnectionTimeout(), TimeUnit.MILLISECONDS)
.setSocketTimeout(baseConfig.getSysSocketTimeout(), TimeUnit.MILLISECONDS)
.setRoutingTimeout(baseConfig.getSysRoutingTimeout(), TimeUnit.MILLISECONDS)
.setEnableJmx(baseConfig.getSysEnableJmx())
.setEnablePipelineRoutedStore(baseConfig.getSysEnablePipelineRoutedStore())
.setClientZoneId(clientZoneID);
socketStoreFactory = new SocketStoreClientFactory(config);
} else {
socketStoreFactory = factory;
}

this.storeName = storeName;
try {
this.sysStore = this.socketStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
this.sysStore = socketStoreFactory.getSystemStore(this.storeName, clusterXml, fd);
} catch(Exception e) {
logger.debug("Error while creating a system store client for store : " + this.storeName);
}
Expand Down
21 changes: 19 additions & 2 deletions src/java/voldemort/client/SystemStoreRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package voldemort.client;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import voldemort.cluster.failuredetector.FailureDetector;
import voldemort.store.system.SystemStoreConstants;
Expand All @@ -30,9 +31,24 @@
public class SystemStoreRepository {

private ConcurrentHashMap<String, SystemStore> sysStoreMap;
private final SocketStoreClientFactory socketStoreFactory;

public SystemStoreRepository() {
public SystemStoreRepository(ClientConfig clientConfig) {
sysStoreMap = new ConcurrentHashMap<String, SystemStore>();
ClientConfig systemStoreConfig = new ClientConfig();
systemStoreConfig.setSelectors(1)
.setBootstrapUrls(clientConfig.getBootstrapUrls())
.setMaxConnectionsPerNode(clientConfig.getSysMaxConnectionsPerNode())
.setConnectionTimeout(clientConfig.getSysConnectionTimeout(),
TimeUnit.MILLISECONDS)
.setSocketTimeout(clientConfig.getSysSocketTimeout(),
TimeUnit.MILLISECONDS)
.setRoutingTimeout(clientConfig.getSysRoutingTimeout(),
TimeUnit.MILLISECONDS)
.setEnableJmx(clientConfig.getSysEnableJmx())
.setEnablePipelineRoutedStore(clientConfig.getSysEnablePipelineRoutedStore())
.setClientZoneId(clientConfig.getClientZoneId());
this.socketStoreFactory = new SocketStoreClientFactory(systemStoreConfig);
}

public void addSystemStore(SystemStore newSysStore, String storeName) {
Expand All @@ -46,7 +62,8 @@ public void createSystemStores(ClientConfig config, String clusterXml, FailureDe
config.getClientZoneId(),
clusterXml,
fd,
config);
config,
this.socketStoreFactory);
this.sysStoreMap.put(storeName.name(), sysStore);
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/java/voldemort/client/ZenStoreClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public ZenStoreClient(String storeName,
config);
this.clientId = generateClientId(clientInfo);
this.config = config;
this.sysRepository = new SystemStoreRepository();
this.sysRepository = new SystemStoreRepository(config);
this.scheduler = scheduler;

// Registering self to be able to bootstrap client dynamically via JMX
Expand All @@ -114,7 +114,7 @@ public ZenStoreClient(String storeName,
clientRegistryRefresher = registerClient(clientId,
config.getClientRegistryUpdateIntervalInSecs());
}

logger.info("Voldemort client created: " + clientId + "\n" + clientInfo);
}

Expand Down
2 changes: 1 addition & 1 deletion src/java/voldemort/coordinator/CoordinatorService.java
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ protected void startInner() {
initializeFatClients();

// Setup the Async Metadata checker
SystemStoreRepository sysRepository = new SystemStoreRepository();
SystemStoreRepository sysRepository = new SystemStoreRepository(clientConfig);
String clusterXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY);

sysRepository.createSystemStores(clientConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.junit.Test;

import voldemort.ServerTestUtils;
import voldemort.client.ClientConfig;
import voldemort.client.RoutingTier;
import voldemort.client.SystemStore;
import voldemort.client.SystemStoreRepository;
Expand Down Expand Up @@ -161,7 +162,9 @@ public void setUp() throws Exception {
sysVersionStore = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
bootStrapUrls,
this.CLIENT_ZONE_ID);
repository = new SystemStoreRepository();
ClientConfig clientConfig = new ClientConfig();
clientConfig.setBootstrapUrls(bootStrapUrls).setClientZoneId(this.CLIENT_ZONE_ID);
repository = new SystemStoreRepository(clientConfig);
repository.addSystemStore(sysVersionStore,
SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name());
this.scheduler = new SchedulerService(2, SystemTime.INSTANCE, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import voldemort.ClusterTestUtils;
import voldemort.ServerTestUtils;
import voldemort.TestUtils;
import voldemort.client.ClientConfig;
import voldemort.client.SystemStore;
import voldemort.client.SystemStoreRepository;
import voldemort.client.scheduler.AsyncMetadataVersionManager;
Expand Down Expand Up @@ -101,7 +102,10 @@ public void setUp() throws Exception {
sysVersionStore = new SystemStore<String, String>(SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name(),
bootStrapUrls,
this.CLIENT_ZONE_ID);
repository = new SystemStoreRepository();
ClientConfig clientConfig = new ClientConfig();
clientConfig.setBootstrapUrls(bootStrapUrls).setClientZoneId(this.CLIENT_ZONE_ID);
repository = new SystemStoreRepository(clientConfig);

repository.addSystemStore(sysVersionStore,
SystemStoreConstants.SystemStoreName.voldsys$_metadata_version_persistence.name());
this.scheduler = new SchedulerService(2, SystemTime.INSTANCE, true);
Expand Down

0 comments on commit 9142525

Please sign in to comment.