diff --git a/.gitignore b/.gitignore
index cd6301beb2..0d38407a48 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,5 @@
classes
+config
dist
*~
*.iml
diff --git a/src/java/voldemort/coordinator/HttpServerPipelineFactory.java b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java
similarity index 81%
rename from src/java/voldemort/coordinator/HttpServerPipelineFactory.java
rename to src/java/voldemort/coordinator/CoordinatorPipelineFactory.java
index 242829a255..45bd7a58f8 100644
--- a/src/java/voldemort/coordinator/HttpServerPipelineFactory.java
+++ b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java
@@ -18,6 +18,8 @@
import static org.jboss.netty.channel.Channels.pipeline;
+import java.util.Map;
+
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
@@ -25,18 +27,13 @@
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-/**
- * @author The Netty Project
- * @author Andy Taylor (andy.taylor@jboss.org)
- * @author Trustin Lee
- *
- * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $
- */
-public class HttpServerPipelineFactory implements ChannelPipelineFactory {
+public class CoordinatorPipelineFactory implements ChannelPipelineFactory {
- boolean noop = false;
+ private boolean noop = false;
+ private Map fatClientMap;
- public HttpServerPipelineFactory(boolean noop) {
+ public CoordinatorPipelineFactory(Map fatClientMap, boolean noop) {
+ this.fatClientMap = fatClientMap;
this.noop = noop;
}
@@ -61,7 +58,7 @@ public ChannelPipeline getPipeline() throws Exception {
if(this.noop) {
pipeline.addLast("handler", new NoopHttpRequestHandler());
} else {
- pipeline.addLast("handler", new HttpRequestHandler());
+ pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap));
}
return pipeline;
}
diff --git a/src/java/voldemort/coordinator/CoordinatorService.java b/src/java/voldemort/coordinator/CoordinatorService.java
new file mode 100644
index 0000000000..7ece75e880
--- /dev/null
+++ b/src/java/voldemort/coordinator/CoordinatorService.java
@@ -0,0 +1,193 @@
+package voldemort.coordinator;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.InetSocketAddress;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.JsonDecoder;
+import org.apache.avro.util.Utf8;
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+
+import voldemort.client.ClientConfig;
+import voldemort.client.SocketStoreClientFactory;
+import voldemort.client.SystemStoreRepository;
+import voldemort.client.scheduler.AsyncMetadataVersionManager;
+import voldemort.common.service.SchedulerService;
+import voldemort.store.StoreDefinition;
+import voldemort.store.metadata.MetadataStore;
+import voldemort.utils.SystemTime;
+import voldemort.xml.StoreDefinitionsMapper;
+
+import com.google.common.base.Joiner;
+
+public class CoordinatorService {
+
+ private static boolean noop = false;
+ private static SocketStoreClientFactory storeClientFactory = null;
+ private static String[] bootstrapURLs;
+ private static AsyncMetadataVersionManager asyncMetadataManager = null;
+ private static SchedulerService schedulerService = null;
+ private static final Logger logger = Logger.getLogger(CoordinatorService.class);
+ private static Map fatClientMap = null;
+ private static long asyncMetadataCheckIntervalInMs = 5000;
+ public final static Schema CLIENT_CONFIGS_AVRO_SCHEMA = Schema.parse("{ \"name\": \"clientConfigs\", \"type\":\"array\","
+ + "\"items\": { \"name\": \"clientConfig\", \"type\": \"map\", \"values\":\"string\" }}}");
+ private static final String STORE_NAME_KEY = "store_name";
+
+ private static void initializeFatClients() {
+ StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();
+ String storesXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.STORES_KEY);
+ List storeDefList = storeMapper.readStoreList(new StringReader(storesXml),
+ false);
+ Map fatClientConfigMap = readClientConfig("/home/csoman/Downloads/clientConfigs.avro",
+ bootstrapURLs);
+ // For now Simply create the map of store definition to
+ // FatClientWrappers
+ // TODO: After the fat client improvements is done, modify this to
+ // - Fetch cluster.xml and stores.xml
+ // - Pass these on to each FatClientWrapper
+ // - Set up AsyncMetadataVersionManager
+ fatClientMap = new HashMap();
+ for(StoreDefinition storeDef: storeDefList) {
+ String storeName = storeDef.getName();
+ logger.info("Creating a Fat client wrapper for store: " + storeName);
+ logger.info("Using config: " + fatClientConfigMap.get(storeName));
+ fatClientMap.put(storeName, new FatClientWrapper(storeName,
+ bootstrapURLs,
+ fatClientConfigMap.get(storeName)));
+
+ }
+
+ }
+
+ public static void main(String[] args) {
+
+ if(args.length < 1) {
+ System.err.println("Missing argument: ");
+ System.exit(-1);
+ }
+
+ if(args.length == 2) {
+ if(args[1].equals("noop")) {
+ noop = true;
+ }
+ }
+
+ // Initialize the Voldemort Metadata
+ bootstrapURLs = new String[1];
+ bootstrapURLs[0] = args[0];
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setBootstrapUrls(bootstrapURLs);
+ storeClientFactory = new SocketStoreClientFactory(clientConfig);
+ initializeFatClients();
+
+ // Setup the Async Metadata checker
+ SystemStoreRepository sysRepository = new SystemStoreRepository();
+ String clusterXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY);
+
+ sysRepository.createSystemStores(clientConfig,
+ clusterXml,
+ storeClientFactory.getFailureDetector());
+ // Create a callback for re-bootstrapping the client
+ Callable rebootstrapCallback = new Callable() {
+
+ public Void call() throws Exception {
+ initializeFatClients();
+ return null;
+ }
+
+ };
+
+ asyncMetadataManager = new AsyncMetadataVersionManager(sysRepository,
+ rebootstrapCallback,
+ null);
+
+ schedulerService = new SchedulerService(1, SystemTime.INSTANCE, true);
+ schedulerService.schedule(asyncMetadataManager.getClass().getName(),
+ asyncMetadataManager,
+ new Date(),
+ asyncMetadataCheckIntervalInMs);
+
+ // Configure the server.
+ ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool()));
+ bootstrap.setOption("backlog", 1000);
+
+ // Set up the event pipeline factory.
+ bootstrap.setPipelineFactory(new CoordinatorPipelineFactory(fatClientMap, noop));
+
+ // Bind and start to accept incoming connections.
+ bootstrap.bind(new InetSocketAddress(8080));
+ }
+
+ @SuppressWarnings("unchecked")
+ private static Map readClientConfig(String configFilePath,
+ String[] bootstrapURLs) {
+ String line;
+ Map storeNameConfigMap = new HashMap();
+ try {
+ line = Joiner.on(" ")
+ .join(IOUtils.readLines(new FileReader(new File(configFilePath))))
+ .trim();
+
+ JsonDecoder decoder = new JsonDecoder(CLIENT_CONFIGS_AVRO_SCHEMA, line);
+ GenericDatumReader