From 572f87b66190168284e8d540afea6ccd8ca19562 Mon Sep 17 00:00:00 2001 From: Chinmay Soman Date: Sat, 2 Mar 2013 15:31:41 -0800 Subject: [PATCH] First working version of Coordinator service. Includes REST request and response handling, Error handling, automatic checking of metadata changes, fat client config management --- .gitignore | 1 + ...y.java => CoordinatorPipelineFactory.java} | 19 +- .../coordinator/CoordinatorService.java | 193 ++++++++++++++++++ .../coordinator/FatClientWrapper.java | 99 +-------- .../coordinator/GetRequestExecutor.java | 117 +++++++++++ .../voldemort/coordinator/HttpServer.java | 63 ------ .../coordinator/NoopHttpRequestHandler.java | 27 +-- .../voldemort/coordinator/R2StoreWrapper.java | 69 ++++--- .../coordinator/RESTErrorHandler.java | 70 +++++++ .../coordinator/SampleRESTClient.java | 3 +- .../coordinator/VectorClockWrapper.java | 38 ++++ ....java => VoldemortHttpRequestHandler.java} | 151 +++++++------- src/java/voldemort/versioning/ClockEntry.java | 20 +- 13 files changed, 583 insertions(+), 287 deletions(-) rename src/java/voldemort/coordinator/{HttpServerPipelineFactory.java => CoordinatorPipelineFactory.java} (81%) create mode 100644 src/java/voldemort/coordinator/CoordinatorService.java create mode 100644 src/java/voldemort/coordinator/GetRequestExecutor.java delete mode 100644 src/java/voldemort/coordinator/HttpServer.java create mode 100644 src/java/voldemort/coordinator/RESTErrorHandler.java create mode 100644 src/java/voldemort/coordinator/VectorClockWrapper.java rename src/java/voldemort/coordinator/{HttpRequestHandler.java => VoldemortHttpRequestHandler.java} (63%) diff --git a/.gitignore b/.gitignore index cd6301beb2..0d38407a48 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ classes +config dist *~ *.iml diff --git a/src/java/voldemort/coordinator/HttpServerPipelineFactory.java b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java similarity index 81% rename from src/java/voldemort/coordinator/HttpServerPipelineFactory.java rename to src/java/voldemort/coordinator/CoordinatorPipelineFactory.java index 242829a255..45bd7a58f8 100644 --- a/src/java/voldemort/coordinator/HttpServerPipelineFactory.java +++ b/src/java/voldemort/coordinator/CoordinatorPipelineFactory.java @@ -18,6 +18,8 @@ import static org.jboss.netty.channel.Channels.pipeline; +import java.util.Map; + import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.handler.codec.http.HttpChunkAggregator; @@ -25,18 +27,13 @@ import org.jboss.netty.handler.codec.http.HttpRequestDecoder; import org.jboss.netty.handler.codec.http.HttpResponseEncoder; -/** - * @author The Netty Project - * @author Andy Taylor (andy.taylor@jboss.org) - * @author Trustin Lee - * - * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ - */ -public class HttpServerPipelineFactory implements ChannelPipelineFactory { +public class CoordinatorPipelineFactory implements ChannelPipelineFactory { - boolean noop = false; + private boolean noop = false; + private Map fatClientMap; - public HttpServerPipelineFactory(boolean noop) { + public CoordinatorPipelineFactory(Map fatClientMap, boolean noop) { + this.fatClientMap = fatClientMap; this.noop = noop; } @@ -61,7 +58,7 @@ public ChannelPipeline getPipeline() throws Exception { if(this.noop) { pipeline.addLast("handler", new NoopHttpRequestHandler()); } else { - pipeline.addLast("handler", new HttpRequestHandler()); + pipeline.addLast("handler", new VoldemortHttpRequestHandler(this.fatClientMap)); } return pipeline; } diff --git a/src/java/voldemort/coordinator/CoordinatorService.java b/src/java/voldemort/coordinator/CoordinatorService.java new file mode 100644 index 0000000000..7ece75e880 --- /dev/null +++ b/src/java/voldemort/coordinator/CoordinatorService.java @@ -0,0 +1,193 @@ +package voldemort.coordinator; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.StringReader; +import java.net.InetSocketAddress; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.io.JsonDecoder; +import org.apache.avro.util.Utf8; +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; +import org.jboss.netty.bootstrap.ServerBootstrap; +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; + +import voldemort.client.ClientConfig; +import voldemort.client.SocketStoreClientFactory; +import voldemort.client.SystemStoreRepository; +import voldemort.client.scheduler.AsyncMetadataVersionManager; +import voldemort.common.service.SchedulerService; +import voldemort.store.StoreDefinition; +import voldemort.store.metadata.MetadataStore; +import voldemort.utils.SystemTime; +import voldemort.xml.StoreDefinitionsMapper; + +import com.google.common.base.Joiner; + +public class CoordinatorService { + + private static boolean noop = false; + private static SocketStoreClientFactory storeClientFactory = null; + private static String[] bootstrapURLs; + private static AsyncMetadataVersionManager asyncMetadataManager = null; + private static SchedulerService schedulerService = null; + private static final Logger logger = Logger.getLogger(CoordinatorService.class); + private static Map fatClientMap = null; + private static long asyncMetadataCheckIntervalInMs = 5000; + public final static Schema CLIENT_CONFIGS_AVRO_SCHEMA = Schema.parse("{ \"name\": \"clientConfigs\", \"type\":\"array\"," + + "\"items\": { \"name\": \"clientConfig\", \"type\": \"map\", \"values\":\"string\" }}}"); + private static final String STORE_NAME_KEY = "store_name"; + + private static void initializeFatClients() { + StoreDefinitionsMapper storeMapper = new StoreDefinitionsMapper(); + String storesXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.STORES_KEY); + List storeDefList = storeMapper.readStoreList(new StringReader(storesXml), + false); + Map fatClientConfigMap = readClientConfig("/home/csoman/Downloads/clientConfigs.avro", + bootstrapURLs); + // For now Simply create the map of store definition to + // FatClientWrappers + // TODO: After the fat client improvements is done, modify this to + // - Fetch cluster.xml and stores.xml + // - Pass these on to each FatClientWrapper + // - Set up AsyncMetadataVersionManager + fatClientMap = new HashMap(); + for(StoreDefinition storeDef: storeDefList) { + String storeName = storeDef.getName(); + logger.info("Creating a Fat client wrapper for store: " + storeName); + logger.info("Using config: " + fatClientConfigMap.get(storeName)); + fatClientMap.put(storeName, new FatClientWrapper(storeName, + bootstrapURLs, + fatClientConfigMap.get(storeName))); + + } + + } + + public static void main(String[] args) { + + if(args.length < 1) { + System.err.println("Missing argument: "); + System.exit(-1); + } + + if(args.length == 2) { + if(args[1].equals("noop")) { + noop = true; + } + } + + // Initialize the Voldemort Metadata + bootstrapURLs = new String[1]; + bootstrapURLs[0] = args[0]; + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setBootstrapUrls(bootstrapURLs); + storeClientFactory = new SocketStoreClientFactory(clientConfig); + initializeFatClients(); + + // Setup the Async Metadata checker + SystemStoreRepository sysRepository = new SystemStoreRepository(); + String clusterXml = storeClientFactory.bootstrapMetadataWithRetries(MetadataStore.CLUSTER_KEY); + + sysRepository.createSystemStores(clientConfig, + clusterXml, + storeClientFactory.getFailureDetector()); + // Create a callback for re-bootstrapping the client + Callable rebootstrapCallback = new Callable() { + + public Void call() throws Exception { + initializeFatClients(); + return null; + } + + }; + + asyncMetadataManager = new AsyncMetadataVersionManager(sysRepository, + rebootstrapCallback, + null); + + schedulerService = new SchedulerService(1, SystemTime.INSTANCE, true); + schedulerService.schedule(asyncMetadataManager.getClass().getName(), + asyncMetadataManager, + new Date(), + asyncMetadataCheckIntervalInMs); + + // Configure the server. + ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), + Executors.newCachedThreadPool())); + bootstrap.setOption("backlog", 1000); + + // Set up the event pipeline factory. + bootstrap.setPipelineFactory(new CoordinatorPipelineFactory(fatClientMap, noop)); + + // Bind and start to accept incoming connections. + bootstrap.bind(new InetSocketAddress(8080)); + } + + @SuppressWarnings("unchecked") + private static Map readClientConfig(String configFilePath, + String[] bootstrapURLs) { + String line; + Map storeNameConfigMap = new HashMap(); + try { + line = Joiner.on(" ") + .join(IOUtils.readLines(new FileReader(new File(configFilePath)))) + .trim(); + + JsonDecoder decoder = new JsonDecoder(CLIENT_CONFIGS_AVRO_SCHEMA, line); + GenericDatumReader datumReader = new GenericDatumReader(CLIENT_CONFIGS_AVRO_SCHEMA); + GenericData.Array> flowMaps = (GenericData.Array>) datumReader.read(null, + decoder); + + // Flows to return back + if(flowMaps != null && flowMaps.size() > 0) { + for(Map flowMap: flowMaps) { + Properties props = new Properties(); + for(Utf8 key: flowMap.keySet()) { + props.put(key.toString(), flowMap.get(key).toString()); + } + + String storeName = flowMap.get(new Utf8(STORE_NAME_KEY)).toString(); + + storeName = props.getProperty(STORE_NAME_KEY); + if(storeName == null || storeName.length() == 0) { + throw new Exception("Illegal Store Name !!!"); + } + + ClientConfig config = new ClientConfig(props); + config.setBootstrapUrls(bootstrapURLs) + .setEnableCompressionLayer(false) + .setEnableSerializationLayer(false) + .enableDefaultClient(true) + .setEnableLazy(false); + + storeNameConfigMap.put(storeName, config); + + } + } + } catch(FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch(IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch(Exception e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + return storeNameConfigMap; + } +} diff --git a/src/java/voldemort/coordinator/FatClientWrapper.java b/src/java/voldemort/coordinator/FatClientWrapper.java index bbbb2a1c3e..215aa02bed 100644 --- a/src/java/voldemort/coordinator/FatClientWrapper.java +++ b/src/java/voldemort/coordinator/FatClientWrapper.java @@ -5,10 +5,7 @@ import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.SynchronousQueue; @@ -16,6 +13,7 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; @@ -29,15 +27,13 @@ import voldemort.client.SocketStoreClientFactory; import voldemort.utils.ByteArray; import voldemort.versioning.ObsoleteVersionException; -import voldemort.versioning.VectorClock; -import voldemort.versioning.Versioned; public class FatClientWrapper { private ExecutorService fatClientExecutor; - private SynchronousQueue fatClientRequestQueue; private SocketStoreClientFactory storeClientFactory; private DefaultStoreClient storeClient; + private final Logger logger = Logger.getLogger(FatClientWrapper.class); /** * A Wrapper class to provide asynchronous API for calling the fat client @@ -47,7 +43,7 @@ public class FatClientWrapper { * @param storeName: Store to connect to via this fat client * @param bootstrapURLs: Bootstrap URLs for the intended cluster */ - public FatClientWrapper(String storeName, String[] bootstrapURLs) { + public FatClientWrapper(String storeName, String[] bootstrapURLs, ClientConfig clientConfig) { this.fatClientExecutor = new ThreadPoolExecutor(20, // Core pool size 20, // Max pool size 60, // Keepalive @@ -80,15 +76,7 @@ public void rejectedExecution(Runnable r, } }); - this.fatClientRequestQueue = new SynchronousQueue(); - - // Setup the Voldemort client - ClientConfig clientConfig = new ClientConfig().setBootstrapUrls(bootstrapURLs) - .setEnableLazy(false) - .setEnableCompressionLayer(false) - .setEnableSerializationLayer(false) - .setEnableInconsistencyResolvingLayer(false) - .enableDefaultClient(true); + // this.fatClientRequestQueue = new SynchronousQueue(); this.storeClientFactory = new SocketStoreClientFactory(clientConfig); this.storeClient = (DefaultStoreClient) this.storeClientFactory.getStoreClient(storeName); @@ -105,7 +93,10 @@ public void rejectedExecution(Runnable r, void submitGetRequest(final ByteArray key, final MessageEvent getRequest) { try { - Future f = this.fatClientExecutor.submit(new GetRequestExecutor(key, null, getRequest)); + this.fatClientExecutor.submit(new GetRequestExecutor(key, + null, + getRequest, + this.storeClient)); // Keep track of this request for monitoring // this.fatClientRequestQueue.add(f); @@ -124,7 +115,7 @@ void submitGetRequest(final ByteArray key, final MessageEvent getRequest) { void submitPutRequest(final ByteArray key, final byte[] value, final MessageEvent putRequest) { try { - Future f = this.fatClientExecutor.submit(new PutRequestExecutor(key, value, putRequest)); + this.fatClientExecutor.submit(new PutRequestExecutor(key, value, putRequest)); // Keep track of this request for monitoring // this.fatClientRequestQueue.add(f); @@ -138,74 +129,6 @@ private void handleRejectedException(MessageEvent getRequest) { // client } - private class GetRequestExecutor implements Runnable { - - private ByteArray key; - private Versioned defaultValue; - private MessageEvent getRequest; - private ChannelBuffer responseContent; - - public GetRequestExecutor(ByteArray key, - Versioned defaultValue, - MessageEvent request) { - this.key = key; - this.defaultValue = defaultValue; - this.getRequest = request; - } - - private void writeResponse(List> values) { - responseContent.writeInt(values.size()); - for(Versioned v: values) { - byte[] clock = ((VectorClock) v.getVersion()).toBytes(); - byte[] value = (byte[]) v.getValue(); - responseContent.writeInt(clock.length + value.length); - responseContent.writeBytes(clock); - responseContent.writeBytes(value); - } - - // 1. Create the Response object - HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); - - // 2. Set the right headers - // response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setHeader(CONTENT_TYPE, "application/pdf"); - // response.setChunked(true); - - // 3. Copy the data into the payload - response.setContent(responseContent); - response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes()); - - // Write the response to the Netty Channel - ChannelFuture future = this.getRequest.getChannel().write(response); - - // Close the non-keep-alive connection after the write operation is - // done. - future.addListener(ChannelFutureListener.CLOSE); - - } - - @Override - public void run() { - List> results = new ArrayList>(); - Versioned responseVersioned = storeClient.get(this.key); - System.out.println("Get successful !"); - if(responseVersioned == null) { - if(this.defaultValue != null) { - responseVersioned = this.defaultValue; - } else { - byte[] nullByteArray = new byte[1]; - nullByteArray[0] = 0; - responseVersioned = new Versioned(nullByteArray); - } - } - results.add(responseVersioned); - byte[] responseValue = (byte[]) responseVersioned.getValue(); - this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length); - writeResponse(results); - } - - } - private class PutRequestExecutor implements Runnable { private ByteArray key; @@ -225,7 +148,7 @@ private void writeResponse() { // 2. Set the right headers // response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); - response.setHeader(CONTENT_TYPE, "application/pdf"); + response.setHeader(CONTENT_TYPE, "application/json"); // response.setChunked(true); // 3. Copy the data into the payload @@ -245,7 +168,7 @@ public void run() { try { storeClient.put(key, value); - System.out.println("Put successful !"); + logger.info("Put successful !"); } catch(ObsoleteVersionException oe) { // Ideally propagate the exception ! } diff --git a/src/java/voldemort/coordinator/GetRequestExecutor.java b/src/java/voldemort/coordinator/GetRequestExecutor.java new file mode 100644 index 0000000000..a9d1933b09 --- /dev/null +++ b/src/java/voldemort/coordinator/GetRequestExecutor.java @@ -0,0 +1,117 @@ +package voldemort.coordinator; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TRANSFER_ENCODING; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.ETAG; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import java.io.IOException; + +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; + +import voldemort.client.DefaultStoreClient; +import voldemort.utils.ByteArray; +import voldemort.versioning.VectorClock; +import voldemort.versioning.Versioned; + +public class GetRequestExecutor implements Runnable { + + private ByteArray key; + private Versioned defaultValue; + private MessageEvent getRequestMessageEvent; + private ChannelBuffer responseContent; + DefaultStoreClient storeClient; + private final Logger logger = Logger.getLogger(GetRequestExecutor.class); + + public GetRequestExecutor(ByteArray key, + Versioned defaultValue, + MessageEvent requestEvent, + DefaultStoreClient storeClient) { + this.key = key; + this.defaultValue = defaultValue; + this.getRequestMessageEvent = requestEvent; + this.storeClient = storeClient; + } + + public void writeResponse(Versioned responseVersioned) { + + byte[] value = (byte[]) responseVersioned.getValue(); + + // Set the value as the HTTP response payload + byte[] responseValue = (byte[]) responseVersioned.getValue(); + this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length); + this.responseContent.writeBytes(value); + + VectorClock vc = (VectorClock) responseVersioned.getVersion(); + VectorClockWrapper vcWrapper = new VectorClockWrapper(vc); + ObjectMapper mapper = new ObjectMapper(); + String eTag = ""; + try { + eTag = mapper.writeValueAsString(vcWrapper); + } catch(JsonGenerationException e) { + e.printStackTrace(); + } catch(JsonMappingException e) { + e.printStackTrace(); + } catch(IOException e) { + e.printStackTrace(); + } + + logger.info("ETAG : " + eTag); + + // 1. Create the Response object + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK); + + // 2. Set the right headers + // response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setHeader(CONTENT_TYPE, "application/json"); + response.setHeader(CONTENT_TRANSFER_ENCODING, "binary"); + response.setHeader(ETAG, eTag); + + // 3. Copy the data into the payload + response.setContent(responseContent); + response.setHeader(CONTENT_LENGTH, response.getContent().readableBytes()); + + logger.info("Response = " + response); + + // Write the response to the Netty Channel + ChannelFuture future = this.getRequestMessageEvent.getChannel().write(response); + + // Close the non-keep-alive connection after the write operation is + // done. + future.addListener(ChannelFutureListener.CLOSE); + + } + + public void setResponseContent(Versioned responseVersioned) {} + + @Override + public void run() { + Versioned responseVersioned = storeClient.get(this.key); + logger.info("Get successful !"); + if(responseVersioned == null) { + if(this.defaultValue != null) { + responseVersioned = this.defaultValue; + } else { + RESTErrorHandler.handleError(NOT_FOUND, + this.getRequestMessageEvent, + false, + "Requested Key does not exist"); + } + } + writeResponse(responseVersioned); + } + +} \ No newline at end of file diff --git a/src/java/voldemort/coordinator/HttpServer.java b/src/java/voldemort/coordinator/HttpServer.java deleted file mode 100644 index eec172ab82..0000000000 --- a/src/java/voldemort/coordinator/HttpServer.java +++ /dev/null @@ -1,63 +0,0 @@ -package voldemort.coordinator; - -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -import java.net.InetSocketAddress; -import java.util.concurrent.Executors; - -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; - -/** - * An HTTP server that sends back the content of the received HTTP request in a - * pretty plaintext form. - * - * @author The Netty Project - * @author Andy Taylor (andy.taylor@jboss.org) - * @author Trustin Lee - * - * @version $Rev: 2080 $, $Date: 2010-01-26 18:04:19 +0900 (Tue, 26 Jan 2010) $ - */ -public class HttpServer { - - private static boolean noop = false; - - public static void main(String[] args) { - - if(args.length < 1) { - System.err.println("Missing argument: "); - System.exit(-1); - } - - if(args.length == 2) { - if(args[1].equals("noop")) { - noop = true; - } - } - - // Configure the server. - ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); - bootstrap.setOption("backlog", 1000); - - // Set up the event pipeline factory. - bootstrap.setPipelineFactory(new HttpServerPipelineFactory(noop)); - - // Bind and start to accept incoming connections. - bootstrap.bind(new InetSocketAddress(8080)); - } -} diff --git a/src/java/voldemort/coordinator/NoopHttpRequestHandler.java b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java index 716813d146..d29de914ab 100644 --- a/src/java/voldemort/coordinator/NoopHttpRequestHandler.java +++ b/src/java/voldemort/coordinator/NoopHttpRequestHandler.java @@ -1,40 +1,30 @@ package voldemort.coordinator; -import java.util.ArrayList; -import java.util.List; - import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.handler.codec.http.HttpRequest; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; import voldemort.versioning.Versioned; -public class NoopHttpRequestHandler extends HttpRequestHandler { +public class NoopHttpRequestHandler extends VoldemortHttpRequestHandler { public NoopHttpRequestHandler() {} @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { - List> results = new ArrayList>(); - - HttpRequest request = this.request = (HttpRequest) e.getMessage(); - QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri()); - - // Decode the operation type - OP_TYPE operation = getOperationType(queryStringDecoder.getPath()); + OP_TYPE operation = getOperationType(this.request.getMethod()); switch(operation) { case GET: + GetRequestExecutor getExecutor = new GetRequestExecutor(null, null, e, null); + Versioned responseVersioned = null; byte[] nullByteArray = new byte[1]; nullByteArray[0] = 0; responseVersioned = new Versioned(nullByteArray); - results.add(responseVersioned); - byte[] responseValue = (byte[]) responseVersioned.getValue(); - this.responseContent = ChannelBuffers.dynamicBuffer(responseValue.length); - writeResults(results); + + getExecutor.setResponseContent(responseVersioned); + getExecutor.writeResponse(responseVersioned); break; case PUT: this.responseContent = ChannelBuffers.EMPTY_BUFFER; @@ -44,8 +34,5 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex this.responseContent = ChannelBuffers.copiedBuffer("Illegal operation.".getBytes()); return; } - - writeResponse(e); - } } diff --git a/src/java/voldemort/coordinator/R2StoreWrapper.java b/src/java/voldemort/coordinator/R2StoreWrapper.java index 095d8dd64e..d2726c06e7 100644 --- a/src/java/voldemort/coordinator/R2StoreWrapper.java +++ b/src/java/voldemort/coordinator/R2StoreWrapper.java @@ -1,7 +1,6 @@ package voldemort.coordinator; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.net.HttpURLConnection; @@ -14,12 +13,14 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; +import org.apache.commons.codec.binary.Base64; +import org.codehaus.jackson.map.ObjectMapper; + import voldemort.VoldemortException; import voldemort.store.Store; import voldemort.store.StoreCapabilityType; import voldemort.store.StoreUtils; import voldemort.utils.ByteArray; -import voldemort.utils.ByteUtils; import voldemort.versioning.VectorClock; import voldemort.versioning.Version; import voldemort.versioning.Versioned; @@ -37,6 +38,8 @@ public class R2StoreWrapper implements Store { + private static final String GET = "GET"; + private static final String PUT = "PUT"; private URL url = null; HttpURLConnection conn = null; private HttpClientFactory _clientFactory; @@ -85,12 +88,15 @@ public List> get(ByteArray key, byte[] transforms) throws Vold DataOutputStream outputStream = new DataOutputStream(outputBytes); writeGetRequest(outputStream, key); - // Create the REST request with this byte array - RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL - + "/get?store_name=test")); - rb.setMethod("POST"); + String base64Key = new String(Base64.encodeBase64(key.get())); + RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/test/" + + base64Key)); + // RestRequestBuilder rb = new RestRequestBuilder(new + // URI(this.baseURL + "/" + base64Key)); + + rb.setMethod(GET); rb.setEntity(outputBytes.toByteArray()); - rb.setHeader("CONTENT_TYPE", "application/pdf"); + rb.setHeader("Accept", "application/json"); RestRequest request = rb.build(); Future f = client.restRequest(request); @@ -98,10 +104,10 @@ public List> get(ByteArray key, byte[] transforms) throws Vold // This will block RestResponse response = f.get(); final ByteString entity = response.getEntity(); + String eTag = response.getHeader("ETag"); + String lastModified = response.getHeader("Last-Modified"); if(entity != null) { - // System.out.println(entity.asString("UTF-8")); - DataInputStream inputStream = new DataInputStream(entity.asInputStream()); - resultList = readResults(inputStream); + resultList = readResults(entity, eTag, lastModified); } else { System.out.println("NOTHING!"); } @@ -122,14 +128,22 @@ public void put(ByteArray key, Versioned value, byte[] transform) // Create the byte[] array ByteArrayOutputStream outputBytes = new ByteArrayOutputStream(); DataOutputStream outputStream = new DataOutputStream(outputBytes); - writePutRequest(outputStream, key, value.getValue()); + + // Write the value in the payload + byte[] payload = value.getValue(); + outputStream.write(payload); // Create the REST request with this byte array - RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL - + "/put?store_name=test")); - rb.setMethod("POST"); + String base64Key = new String(Base64.encodeBase64(key.get())); + RestRequestBuilder rb = new RestRequestBuilder(new URI(this.baseURL + "/test/" + + base64Key)); + // RestRequestBuilder rb = new RestRequestBuilder(new + // URI(this.baseURL + "/" + base64Key)); + + rb.setMethod(PUT); rb.setEntity(outputBytes.toByteArray()); - rb.setHeader("CONTENT_TYPE", "application/pdf"); + rb.setHeader("Content-Type", "application/json"); + rb.setHeader("Content-Length", "" + payload.length); RestRequest request = rb.build(); Future f = client.restRequest(request); @@ -155,18 +169,19 @@ private void writeGetRequest(DataOutputStream outputStream, ByteArray key) throw outputStream.write(key.get()); } - private List> readResults(DataInputStream inputStream) throws IOException { - int resultSize = inputStream.readInt(); - List> results = new ArrayList>(resultSize); - for(int i = 0; i < resultSize; i++) { - int valueSize = inputStream.readInt(); - byte[] bytes = new byte[valueSize]; - ByteUtils.read(inputStream, bytes); - VectorClock clock = new VectorClock(bytes); - results.add(new Versioned(ByteUtils.copy(bytes, - clock.sizeInBytes(), - bytes.length), clock)); - } + private List> readResults(ByteString entity, String eTag, String lastModified) + throws IOException { + + ObjectMapper mapper = new ObjectMapper(); + System.out.println("Received etag : " + eTag); + System.out.println("Received last modified date : " + lastModified); + VectorClockWrapper vcWrapper = mapper.readValue(eTag, VectorClockWrapper.class); + List> results = new ArrayList>(2); + + byte[] bytes = new byte[entity.length()]; + entity.copyBytes(bytes, 0); + VectorClock clock = new VectorClock(vcWrapper.getVersions(), vcWrapper.getTimestamp()); + results.add(new Versioned(bytes, clock)); return results; } diff --git a/src/java/voldemort/coordinator/RESTErrorHandler.java b/src/java/voldemort/coordinator/RESTErrorHandler.java new file mode 100644 index 0000000000..04f6ca77a2 --- /dev/null +++ b/src/java/voldemort/coordinator/RESTErrorHandler.java @@ -0,0 +1,70 @@ +package voldemort.coordinator; + +import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR; +import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; + +import org.apache.log4j.Logger; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.handler.codec.http.DefaultHttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponse; +import org.jboss.netty.handler.codec.http.HttpResponseStatus; +import org.jboss.netty.util.CharsetUtil; + +public class RESTErrorHandler { + + private static final Logger logger = Logger.getLogger(RESTErrorHandler.class); + + public static void handleError(HttpResponseStatus status, + MessageEvent e, + boolean keepAlive, + String message) { + // 1. Create the Response object + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status); + + response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8"); + response.setContent(ChannelBuffers.copiedBuffer("Failure: " + status.toString() + ". " + + message + "\r\n", CharsetUtil.UTF_8)); + + // Write the response to the Netty Channel + ChannelFuture future = e.getChannel().write(response); + + // Close the non-keep-alive connection after the write operation is + // done. + if(!keepAlive) { + future.addListener(ChannelFutureListener.CLOSE); + } + } + + public static void handleBadRequestError(MessageEvent e, boolean keepAlive) { + // 1. Create the Response object + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, BAD_REQUEST); + + // Write the response to the Netty Channel + ChannelFuture future = e.getChannel().write(response); + + // Close the non-keep-alive connection after the write operation is + // done. + if(!keepAlive) { + future.addListener(ChannelFutureListener.CLOSE); + } + } + + public static void handleInternalServerError(MessageEvent e, boolean keepAlive) { + // 1. Create the Response object + HttpResponse response = new DefaultHttpResponse(HTTP_1_1, INTERNAL_SERVER_ERROR); + + // Write the response to the Netty Channel + ChannelFuture future = e.getChannel().write(response); + + // Close the non-keep-alive connection after the write operation is + // done. + if(!keepAlive) { + future.addListener(ChannelFutureListener.CLOSE); + } + } +} diff --git a/src/java/voldemort/coordinator/SampleRESTClient.java b/src/java/voldemort/coordinator/SampleRESTClient.java index 07cc71ee80..b0d6a6583d 100644 --- a/src/java/voldemort/coordinator/SampleRESTClient.java +++ b/src/java/voldemort/coordinator/SampleRESTClient.java @@ -9,7 +9,8 @@ public static void main(String[] args) { "test"); // Sample put - clientStore.put("a", "Be Meinnnnnnnn !!!"); + clientStore.put("a", + "Hola Senior !!! Bonjournooo sdafasdfsdfasadf sadfasdfasdfasdfsad fsad fsadfsadfsF!!!!"); // Do a sample operation: System.out.println("Received response : " + clientStore.get("a")); diff --git a/src/java/voldemort/coordinator/VectorClockWrapper.java b/src/java/voldemort/coordinator/VectorClockWrapper.java new file mode 100644 index 0000000000..1b2b984cf3 --- /dev/null +++ b/src/java/voldemort/coordinator/VectorClockWrapper.java @@ -0,0 +1,38 @@ +package voldemort.coordinator; + +import java.util.ArrayList; +import java.util.List; + +import voldemort.versioning.ClockEntry; +import voldemort.versioning.VectorClock; + +public class VectorClockWrapper { + + private List versions; + private long timestamp; + + public VectorClockWrapper() { + this.versions = new ArrayList(); + } + + public VectorClockWrapper(VectorClock vc) { + this.versions = vc.getEntries(); + this.setTimestamp(vc.getTimestamp()); + } + + public List getVersions() { + return versions; + } + + public void setVersions(List vectorClock) { + versions = vectorClock; + } + + public long getTimestamp() { + return timestamp; + } + + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/src/java/voldemort/coordinator/HttpRequestHandler.java b/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java similarity index 63% rename from src/java/voldemort/coordinator/HttpRequestHandler.java rename to src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java index 13b73bd809..0ef070d547 100644 --- a/src/java/voldemort/coordinator/HttpRequestHandler.java +++ b/src/java/voldemort/coordinator/VoldemortHttpRequestHandler.java @@ -1,26 +1,11 @@ package voldemort.coordinator; -/* - * Copyright 2009 Red Hat, Inc. - * - * Red Hat licenses this file to you under the Apache License, version 2.0 (the - * "License"); you may not use this file except in compliance with the License. - * You may obtain a copy of the License at: - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - import static org.jboss.netty.handler.codec.http.HttpHeaders.isKeepAlive; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_LENGTH; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.COOKIE; import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.SET_COOKIE; +import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST; import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK; import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1; @@ -28,8 +13,9 @@ import java.util.Map; import java.util.Set; +import org.apache.commons.codec.binary.Base64; +import org.apache.log4j.Logger; import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; import org.jboss.netty.channel.ChannelHandlerContext; @@ -42,58 +28,45 @@ import org.jboss.netty.handler.codec.http.DefaultHttpResponse; import org.jboss.netty.handler.codec.http.HttpChunk; import org.jboss.netty.handler.codec.http.HttpChunkTrailer; +import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.HttpResponse; -import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.jboss.netty.util.CharsetUtil; import voldemort.utils.ByteArray; import voldemort.versioning.VectorClock; import voldemort.versioning.Versioned; -/** - * @author The Netty Project - * @author Andy Taylor (andy.taylor@jboss.org) - * @author Trustin Lee - * - * @version $Rev: 2288 $, $Date: 2010-05-27 21:40:50 +0900 (Thu, 27 May 2010) $ - */ -public class HttpRequestHandler extends SimpleChannelUpstreamHandler { +public class VoldemortHttpRequestHandler extends SimpleChannelUpstreamHandler { public HttpRequest request; private boolean readingChunks; /** Buffer that stores the response content */ private final StringBuilder buf = new StringBuilder(); public ChannelBuffer responseContent; - private final static String STORE_NAME = "store_name"; - private FatClientWrapper fatClientWrapper = null; + private Map fatClientMap; + private final Logger logger = Logger.getLogger(VoldemortHttpRequestHandler.class); public static enum OP_TYPE { GET, PUT } - public HttpRequestHandler() { - String[] bootstrapURLs = new String[1]; - bootstrapURLs[0] = "tcp://localhost:6666"; - this.fatClientWrapper = new FatClientWrapper("test", bootstrapURLs); + // Implicit constructor defined for the derived classes + public VoldemortHttpRequestHandler() {} + + public VoldemortHttpRequestHandler(Map fatClientMap) { + this.fatClientMap = fatClientMap; } - public OP_TYPE getOperationType(String path) { - if(path.equals("/put")) { + public OP_TYPE getOperationType(HttpMethod httpMethod) { + if(httpMethod.equals(HttpMethod.PUT)) { return OP_TYPE.PUT; } return OP_TYPE.GET; } - public ByteArray readKey(ChannelBuffer content) { - int keySize = content.readInt(); - byte[] key = new byte[keySize]; - content.readBytes(key); - return new ByteArray(key); - } - public void writeResults(List> values) { responseContent.writeInt(values.size()); for(Versioned v: values) { @@ -105,60 +78,63 @@ public void writeResults(List> values) { } } - private byte[] readValue(ChannelBuffer content) { - int valueSize = content.readInt(); - byte[] value = new byte[valueSize]; - content.readBytes(value); - return value; - } - @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { + String storeName = ""; + if(!readingChunks) { HttpRequest request = this.request = (HttpRequest) e.getMessage(); - QueryStringDecoder queryStringDecoder = new QueryStringDecoder(request.getUri()); - - // Decode the operation type - OP_TYPE operation = getOperationType(queryStringDecoder.getPath()); - - Map> params = queryStringDecoder.getParameters(); - if(params == null || !params.containsKey(STORE_NAME)) { - System.err.println("Store Name missing. Critical error"); - this.responseContent = ChannelBuffers.copiedBuffer("Store Name missing. Critical error".getBytes()); + OP_TYPE operation = getOperationType(this.request.getMethod()); + String requestURI = this.request.getUri(); + logger.info(requestURI); + + storeName = getStoreName(requestURI); + if(storeName == null) { + String errorMessage = "Invalid store name. Critical error."; + // this.responseContent = + // ChannelBuffers.copiedBuffer("Invalid store name. Critical error.".getBytes()); + logger.error(errorMessage); + RESTErrorHandler.handleError(BAD_REQUEST, e, false, errorMessage); return; - // TODO: Return the right error code here } if(request.isChunked()) { readingChunks = true; } else { - ChannelBuffer content = request.getContent(); - if(!content.readable()) { - System.err.println("Contents not readable"); - this.responseContent = ChannelBuffers.copiedBuffer("Contents not readable".getBytes()); - return; - } - // TODO: Check for correct number of parameters and Decoding switch(operation) { case GET: - // System.out.println("GET operation"); - ByteArray getKey = readKey(content); - this.fatClientWrapper.submitGetRequest(getKey, e); + ByteArray getKey = readKey(requestURI); + this.fatClientMap.get(storeName).submitGetRequest(getKey, e); break; case PUT: - // System.out.println("PUT operation"); - ByteArray putKey = readKey(content); + ChannelBuffer content = request.getContent(); + if(!content.readable()) { + String errorMessage = "Contents not readable"; + // this.responseContent = + // ChannelBuffers.copiedBuffer("Contents not readable".getBytes()); + logger.error(errorMessage); + RESTErrorHandler.handleError(BAD_REQUEST, + e, + isKeepAlive(request), + errorMessage); + return; + } + + ByteArray putKey = readKey(requestURI); byte[] putValue = readValue(content); - this.fatClientWrapper.submitPutRequest(putKey, putValue, e); + this.fatClientMap.get(storeName).submitPutRequest(putKey, putValue, e); break; default: - System.err.println("Illegal operation."); - this.responseContent = ChannelBuffers.copiedBuffer("Illegal operation.".getBytes()); - writeResponse(e); + String errorMessage = "Illegal operation."; + logger.error(errorMessage); + RESTErrorHandler.handleError(BAD_REQUEST, + e, + isKeepAlive(request), + errorMessage); return; } @@ -180,7 +156,6 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex buf.append("\r\n"); } - writeResponse(e); } else { buf.append("CHUNK: " + chunk.getContent().toString(CharsetUtil.UTF_8) + "\r\n"); } @@ -188,6 +163,32 @@ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Ex } } + private byte[] readValue(ChannelBuffer content) { + byte[] value = new byte[content.capacity()]; + content.readBytes(value); + return value; + } + + private ByteArray readKey(String requestURI) { + ByteArray key = null; + String[] parts = requestURI.split("/"); + if(parts.length > 2) { + String base64Key = parts[2]; + key = new ByteArray(Base64.decodeBase64(base64Key.getBytes())); + } + return key; + } + + private String getStoreName(String requestURI) { + String storeName = null; + String[] parts = requestURI.split("/"); + if(parts.length > 1 && this.fatClientMap.containsKey(parts[1])) { + storeName = parts[1]; + } + + return storeName; + } + public void writeResponse(MessageEvent e) { // Decide whether to close the connection or not. boolean keepAlive = isKeepAlive(request); diff --git a/src/java/voldemort/versioning/ClockEntry.java b/src/java/voldemort/versioning/ClockEntry.java index 481026cfa3..27bc0cac19 100644 --- a/src/java/voldemort/versioning/ClockEntry.java +++ b/src/java/voldemort/versioning/ClockEntry.java @@ -32,8 +32,16 @@ public final class ClockEntry implements Cloneable, Serializable { private static final long serialVersionUID = 1; - private final short nodeId; - private final long version; + private short nodeId; + private long version; + + /** + * Default constructor + */ + public ClockEntry() { + this.nodeId = -1; + this.version = -1; + } /** * Create a new Version from constituate parts @@ -99,4 +107,12 @@ public String toString() { return nodeId + ":" + version; } + public void setNodeId(short nodeId) { + this.nodeId = nodeId; + } + + public void setVersion(long version) { + this.version = version; + } + }