Skip to content

Commit

Permalink
A working implementation of the Coordinator and thin client. Includes…
Browse files Browse the repository at this point in the history
… following things:

- Creating AbstractStore and AbstractStorageEngine to refactor
  the corresponding Store and StorageEngine interfaces.
- Refactored the fat client to accomodate dynamic per call timeout.
- Isolated Fat client wrapper to safeguard multitenancy
- Autobootstrap mechanism added to the Coordinator service
- Basic HTTP request/response parsing and Error handling
  • Loading branch information
Chinmay Soman committed Mar 26, 2013
1 parent 791ae59 commit 9785e68
Show file tree
Hide file tree
Showing 53 changed files with 1,698 additions and 1,072 deletions.
Expand Up @@ -20,25 +20,21 @@
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
import voldemort.store.NoSuchCapabilityException;
import voldemort.store.StorageEngine;
import voldemort.store.StoreCapabilityType;
import voldemort.store.AbstractStorageEngine;
import voldemort.store.StoreUtils;
import voldemort.utils.ByteArray;
import voldemort.utils.ClosableIterator;
import voldemort.utils.Pair;
import voldemort.utils.StripedLock;
import voldemort.utils.Utils;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.Occurred;
import voldemort.versioning.VectorClock;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;

public class KratiStorageEngine implements StorageEngine<ByteArray, byte[], byte[]> {
public class KratiStorageEngine extends AbstractStorageEngine<ByteArray, byte[], byte[]> {

private static final Logger logger = Logger.getLogger(KratiStorageEngine.class);
private final String name;
private final DynamicDataStore datastore;
private final StripedLock locks;

Expand All @@ -49,7 +45,7 @@ public KratiStorageEngine(String name,
double hashLoadFactor,
int initLevel,
File dataDirectory) {
this.name = Utils.notNull(name);
super(name);
try {
this.datastore = new DynamicDataStore(dataDirectory,
initLevel,
Expand All @@ -64,36 +60,30 @@ public KratiStorageEngine(String name,

}

public Object getCapability(StoreCapabilityType capability) {
throw new NoSuchCapabilityException(capability, getName());
}

public String getName() {
return this.name;
}

public void close() throws VoldemortException {}

@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys,
Map<ByteArray, byte[]> transforms)
throws VoldemortException {
StoreUtils.assertValidKeys(keys);
return StoreUtils.getAll(this, keys, null);
}

@Override
public List<Version> getVersions(ByteArray key) {
return StoreUtils.getVersions(get(key, null));
}

@Override
public void truncate() {
try {
datastore.clear();
} catch(Exception e) {
logger.error("Failed to truncate store '" + name + "': ", e);
throw new VoldemortException("Failed to truncate store '" + name + "'.");
logger.error("Failed to truncate store '" + getName() + "': ", e);
throw new VoldemortException("Failed to truncate store '" + getName() + "'.");
}
}

@Override
public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws VoldemortException {
StoreUtils.assertValidKey(key);
try {
Expand All @@ -104,6 +94,7 @@ public List<Versioned<byte[]>> get(ByteArray key, byte[] transforms) throws Vold
}
}

@Override
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
List<Pair<ByteArray, Versioned<byte[]>>> returnedList = new ArrayList<Pair<ByteArray, Versioned<byte[]>>>();
DataArray array = datastore.getDataArray();
Expand Down Expand Up @@ -143,18 +134,22 @@ public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries() {
return new KratiClosableIterator(returnedList);
}

@Override
public ClosableIterator<ByteArray> keys() {
return StoreUtils.keys(entries());
}

@Override
public ClosableIterator<Pair<ByteArray, Versioned<byte[]>>> entries(int partition) {
throw new UnsupportedOperationException("Partition based entries scan not supported for this storage type");
}

@Override
public ClosableIterator<ByteArray> keys(int partition) {
throw new UnsupportedOperationException("Partition based key scan not supported for this storage type");
}

@Override
public boolean delete(ByteArray key, Version maxVersion) throws VoldemortException {
StoreUtils.assertValidKey(key);

Expand Down Expand Up @@ -197,6 +192,7 @@ public boolean delete(ByteArray key, Version maxVersion) throws VoldemortExcepti
}
}

@Override
public void put(ByteArray key, Versioned<byte[]> value, byte[] transforms)
throws VoldemortException {
StoreUtils.assertValidKey(key);
Expand Down Expand Up @@ -298,40 +294,26 @@ public KratiClosableIterator(List<Pair<ByteArray, Versioned<byte[]>>> list) {
iter = list.iterator();
}

@Override
public void close() {
// Nothing to close here
}

@Override
public boolean hasNext() {
return iter.hasNext();
}

@Override
public Pair<ByteArray, Versioned<byte[]>> next() {
return iter.next();
}

@Override
public void remove() {
Pair<ByteArray, Versioned<byte[]>> currentPair = next();
delete(currentPair.getFirst(), currentPair.getSecond().getVersion());
}

}

public boolean isPartitionAware() {
return false;
}

public boolean isPartitionScanSupported() {
return false;
}

@Override
public boolean beginBatchModifications() {
return false;
}

@Override
public boolean endBatchModifications() {
return false;
}
}
9 changes: 9 additions & 0 deletions src/java/voldemort/client/ClientConfig.java
Expand Up @@ -1104,4 +1104,13 @@ public ClientConfig setEnableInconsistencyResolvingLayer(boolean enableInconsist
this.enableInconsistencyResolvingLayer = enableInconsistencyResolvingLayer;
return this;
}

public String toString() {
StringBuilder clientConfigInfo = new StringBuilder();
clientConfigInfo.append("Max connections per node: " + this.maxConnectionsPerNode + "\n");
clientConfigInfo.append("Connection timeout : " + this.connectionTimeoutMs + "\n");
clientConfigInfo.append("Socket timeout : " + this.socketTimeoutMs + "\n");
clientConfigInfo.append("Routing timeout : " + this.routingTimeoutMs + "\n");
return clientConfigInfo.toString();
}
}
24 changes: 11 additions & 13 deletions src/java/voldemort/coordinator/CoordinatorPipelineFactory.java
@@ -1,11 +1,9 @@
package voldemort.coordinator;

/*
* Copyright 2009 Red Hat, Inc.
* Copyright 2008-2013 LinkedIn, Inc
*
* Red Hat licenses this file to you under the Apache License, version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
Expand All @@ -16,6 +14,8 @@
* the License.
*/

package voldemort.coordinator;

import static org.jboss.netty.channel.Channels.pipeline;

import java.util.Map;
Expand All @@ -27,6 +27,11 @@
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;

/**
* A PipelineFactory implementation to setup the Netty Pipeline in the
* Coordinator
*
*/
public class CoordinatorPipelineFactory implements ChannelPipelineFactory {

private boolean noop = false;
Expand All @@ -42,14 +47,7 @@ public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = pipeline();

// Uncomment the following line if you want HTTPS
// SSLEngine engine =
// SecureChatSslContextFactory.getServerContext().createSSLEngine();
// engine.setUseClientMode(false);
// pipeline.addLast("ssl", new SslHandler(engine));

pipeline.addLast("decoder", new HttpRequestDecoder());
// Uncomment the following line if you don't want to handle HttpChunks.
pipeline.addLast("aggregator", new HttpChunkAggregator(1048576));
pipeline.addLast("encoder", new HttpResponseEncoder());
// Remove the following line if you don't want automatic content
Expand Down
62 changes: 54 additions & 8 deletions src/java/voldemort/coordinator/CoordinatorService.java
@@ -1,3 +1,19 @@
/*
* Copyright 2008-2013 LinkedIn, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package voldemort.coordinator;

import java.io.File;
Expand Down Expand Up @@ -36,6 +52,11 @@

import com.google.common.base.Joiner;

/**
* A Netty based HTTP service that accepts REST requests from the Voldemort thin
* clients and invokes the corresponding Fat client API.
*
*/
public class CoordinatorService {

private static boolean noop = false;
Expand All @@ -49,13 +70,24 @@ public class CoordinatorService {
public final static Schema CLIENT_CONFIGS_AVRO_SCHEMA = Schema.parse("{ \"name\": \"clientConfigs\", \"type\":\"array\","
+ "\"items\": { \"name\": \"clientConfig\", \"type\": \"map\", \"values\":\"string\" }}}");
private static final String STORE_NAME_KEY = "store_name";

private static String CLIENT_CONFIG_AVRO_FILE_PATH = "";

/**
* Initializes all the Fat clients (1 per store) for the cluster that this
* Coordinator talks to. This is invoked once during startup and then every
* time the Metadata manager detects changes to the cluster and stores
* metadata.
*/
private static void initializeFatClients() {
StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper();

// Fetch the state once and use this to initialize all the Fat clients
String storesXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.STORES_KEY);
String clusterXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY);

List<StoreDefinition> storeDefList = storeMapper.readStoreList(new StringReader(storesXml),
false);
Map<String, ClientConfig> fatClientConfigMap = readClientConfig("/home/csoman/Downloads/clientConfigs.avro",
Map<String, ClientConfig> fatClientConfigMap = readClientConfig(CLIENT_CONFIG_AVRO_FILE_PATH,
bootstrapURLs);
// For now Simply create the map of store definition to
// FatClientWrappers
Expand All @@ -70,28 +102,33 @@ private static void initializeFatClients() {
logger.info("Using config: " + fatClientConfigMap.get(storeName));
fatClientMap.put(storeName, new FatClientWrapper(storeName,
bootstrapURLs,
fatClientConfigMap.get(storeName)));
fatClientConfigMap.get(storeName),
storesXml,
clusterXml));

}

}

public static void main(String[] args) {

if(args.length < 1) {
System.err.println("Missing argument: <Bootstrap URL>");
if(args.length < 2) {
System.err.println("Missing argument: <Bootstrap URL> <fat client config file path>");
System.exit(-1);
}

if(args.length == 2) {
if(args[1].equals("noop")) {
if(args.length == 3) {
if(args[2].equals("noop")) {
noop = true;
}
}

// Initialize the Voldemort Metadata
// Initialize Config
bootstrapURLs = new String[1];
bootstrapURLs[0] = args[0];
CLIENT_CONFIG_AVRO_FILE_PATH = args[1];

// Initialize the Voldemort Metadata
ClientConfig clientConfig = new ClientConfig();
clientConfig.setBootstrapUrls(bootstrapURLs);
storeClientFactory = new SocketStoreClientFactory(clientConfig);
Expand All @@ -107,6 +144,7 @@ public static void main(String[] args) {
// Create a callback for re-bootstrapping the client
Callable<Void> rebootstrapCallback = new Callable<Void>() {

@Override
public Void call() throws Exception {
initializeFatClients();
return null;
Expand Down Expand Up @@ -136,6 +174,14 @@ public Void call() throws Exception {
bootstrap.bind(new InetSocketAddress(8080));
}

/**
* A function to parse the specified Avro file in order to obtain the config
* for each fat client managed by this coordinator.
*
* @param configFilePath Path of the Avro file containing fat client configs
* @param bootstrapURLs The server URLs used during bootstrap
* @return Map of store name to the corresponding fat client config
*/
@SuppressWarnings("unchecked")
private static Map<String, ClientConfig> readClientConfig(String configFilePath,
String[] bootstrapURLs) {
Expand Down

0 comments on commit 9785e68

Please sign in to comment.