Skip to content

Commit

Permalink
First working version of Coordinator service. Includes REST request a…
Browse files Browse the repository at this point in the history
…nd response handling, Error handling, automatic checking of metadata changes, fat client config management
  • Loading branch information
Chinmay Soman committed Mar 26, 2013
1 parent 4cd607d commit 572f87b
Show file tree
Hide file tree
Showing 13 changed files with 583 additions and 287 deletions.
1 change: 1 addition & 0 deletions .gitignore
@@ -1,4 +1,5 @@
classes
config
dist
*~
*.iml
Expand Down
Expand Up @@ -18,25 +18,22 @@

import static org.jboss.netty.channel.Channels.pipeline;

import java.util.Map;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpContentCompressor;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;

/**
* @author <a href="http://www.jboss.org/netty/">The Netty Project</a>
* @author Andy Taylor (andy.taylor@jboss.org)
* @author <a href="http://gleamynode.net/">Trustin Lee</a>
*
* @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
*/
public class HttpServerPipelineFactory implements ChannelPipelineFactory {
public class CoordinatorPipelineFactory implements ChannelPipelineFactory {

boolean noop = false;
private boolean noop = false;
private Map<String, FatClientWrapper> fatClientMap;

public HttpServerPipelineFactory(boolean noop) {
public CoordinatorPipelineFactory(Map<String, FatClientWrapper> fatClientMap, boolean noop) {
this.fatClientMap = fatClientMap;
this.noop = noop;
}

Expand All @@ -61,7 +58,7 @@ public ChannelPipeline getPipeline() throws Exception {
if(this.noop) {
pipeline.addLast("handler", new NoopHttpRequestHandler());
} else {
pipeline.addLast("handler", new HttpRequestHandler());
pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap));
}
return pipeline;
}
Expand Down
193 changes: 193 additions & 0 deletions src/java/voldemort/coordinator/CoordinatorService.java
@@ -0,0 +1,193 @@
package voldemort.coordinator;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.util.Utf8;
import org.apache.commons.io.IOUtils;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

import voldemort.client.ClientConfig;
import voldemort.client.SocketStoreClientFactory;
import voldemort.client.SystemStoreRepository;
import voldemort.client.scheduler.AsyncMetadataVersionManager;
import voldemort.common.service.SchedulerService;
import voldemort.store.StoreDefinition;
import voldemort.store.metadata.MetadataStore;
import voldemort.utils.SystemTime;
import voldemort.xml.StoreDefinitionsMapper;

import com.google.common.base.Joiner;

public class CoordinatorService {

private static boolean noop = false;
private static SocketStoreClientFactory storeClientFactory = null;
private static String[] bootstrapURLs;
private static AsyncMetadataVersionManager asyncMetadataManager = null;
private static SchedulerService schedulerService = null;
private static final Logger logger = Logger.getLogger(CoordinatorService.class);
private static Map<String, FatClientWrapper> fatClientMap = null;
private static long asyncMetadataCheckIntervalInMs = 5000;
public final static Schema CLIENT_CONFIGS_AVRO_SCHEMA = Schema.parse("{ \"name\": \"clientConfigs\", \"type\":\"array\","
+ "\"items\": { \"name\": \"clientConfig\", \"type\": \"map\", \"values\":\"string\" }}}");
private static final String STORE_NAME_KEY = "store_name";

private static void initializeFatClients() {
StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();
String storesXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.STORES_KEY);
List<StoreDefinition> storeDefList = storeMapper.readStoreList(new StringReader(storesXml),
false);
Map<String, ClientConfig> fatClientConfigMap = readClientConfig("/home/csoman/Downloads/clientConfigs.avro",
bootstrapURLs);
// For now Simply create the map of store definition to
// FatClientWrappers
// TODO: After the fat client improvements is done, modify this to
// - Fetch cluster.xml and stores.xml
// - Pass these on to each FatClientWrapper
// - Set up AsyncMetadataVersionManager
fatClientMap = new HashMap<String, FatClientWrapper>();
for(StoreDefinition storeDef: storeDefList) {
String storeName = storeDef.getName();
logger.info("Creating a Fat client wrapper for store: " + storeName);
logger.info("Using config: " + fatClientConfigMap.get(storeName));
fatClientMap.put(storeName, new FatClientWrapper(storeName,
bootstrapURLs,
fatClientConfigMap.get(storeName)));

}

}

public static void main(String[] args) {

if(args.length < 1) {
System.err.println("Missing argument: <Bootstrap URL>");
System.exit(-1);
}

if(args.length == 2) {
if(args[1].equals("noop")) {
noop = true;
}
}

// Initialize the Voldemort Metadata
bootstrapURLs = new String[1];
bootstrapURLs[0] = args[0];
ClientConfig clientConfig = new ClientConfig();
clientConfig.setBootstrapUrls(bootstrapURLs);
storeClientFactory = new SocketStoreClientFactory(clientConfig);
initializeFatClients();

// Setup the Async Metadata checker
SystemStoreRepository sysRepository = new SystemStoreRepository();
String clusterXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY);

sysRepository.createSystemStores(clientConfig,
clusterXml,
storeClientFactory.getFailureDetector());
// Create a callback for re-bootstrapping the client
Callable<Void> rebootstrapCallback = new Callable<Void>() {

public Void call() throws Exception {
initializeFatClients();
return null;
}

};

asyncMetadataManager = new AsyncMetadataVersionManager(sysRepository,
rebootstrapCallback,
null);

schedulerService = new SchedulerService(1, SystemTime.INSTANCE, true);
schedulerService.schedule(asyncMetadataManager.getClass().getName(),
asyncMetadataManager,
new Date(),
asyncMetadataCheckIntervalInMs);

// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setOption("backlog", 1000);

// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new CoordinatorPipelineFactory(fatClientMap, noop));

// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}

@SuppressWarnings("unchecked")
private static Map<String, ClientConfig> readClientConfig(String configFilePath,
String[] bootstrapURLs) {
String line;
Map<String, ClientConfig> storeNameConfigMap = new HashMap<String, ClientConfig>();
try {
line = Joiner.on(" ")
.join(IOUtils.readLines(new FileReader(new File(configFilePath))))
.trim();

JsonDecoder decoder = new JsonDecoder(CLIENT_CONFIGS_AVRO_SCHEMA, line);
GenericDatumReader<Object> datumReader = new GenericDatumReader<Object>(CLIENT_CONFIGS_AVRO_SCHEMA);
GenericData.Array<Map<Utf8, Utf8>> flowMaps = (GenericData.Array<Map<Utf8, Utf8>>) datumReader.read(null,
decoder);

// Flows to return back
if(flowMaps != null && flowMaps.size() > 0) {
for(Map<Utf8, Utf8> flowMap: flowMaps) {
Properties props = new Properties();
for(Utf8 key: flowMap.keySet()) {
props.put(key.toString(), flowMap.get(key).toString());
}

String storeName = flowMap.get(new Utf8(STORE_NAME_KEY)).toString();

storeName = props.getProperty(STORE_NAME_KEY);
if(storeName == null || storeName.length() == 0) {
throw new Exception("Illegal Store Name !!!");
}

ClientConfig config = new ClientConfig(props);
config.setBootstrapUrls(bootstrapURLs)
.setEnableCompressionLayer(false)
.setEnableSerializationLayer(false)
.enableDefaultClient(true)
.setEnableLazy(false);

storeNameConfigMap.put(storeName, config);

}
}
} catch(FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch(IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch(Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

return storeNameConfigMap;
}
}

0 comments on commit 572f87b

Please sign in to comment.