Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge remote branch 'trunk/master' into li-r1008

  • Loading branch information...
commit 68feb27dec7e6940afa61f086fa311adb762a226 2 parents 9dce097 + d5ad745
@afeinberg afeinberg authored
View
94 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/ConfigurableSocketFactory.java
@@ -0,0 +1,94 @@
+package voldemort.store.readonly.fetcher;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.UnknownHostException;
+import java.nio.channels.SocketChannel;
+
+import javax.net.SocketFactory;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+/**
+ * A lot of boilerplate code to tell hadoop to use a non-standard socket buffer
+ * size. This is necessary for good performance over a higher latency link such
+ * as between datacenters.
+ *
+ * @author jay
+ *
+ */
+class ConfigurableSocketFactory extends SocketFactory implements Configurable {
+
+ private static final Logger logger = Logger.getLogger(ConfigurableSocketFactory.class);
+
+ static {
+ logger.info("----- Using configurable socket factory -------");
+ }
+
+ private Configuration configuration;
+ private int socketReceiveBufferSize = -1;
+
+ @Override
+ public Socket createSocket() throws IOException {
+ return applySettings(SocketChannel.open().socket());
+ }
+
+ @Override
+ public Socket createSocket(InetAddress address,
+ int port,
+ InetAddress localAddress,
+ int localPort) throws IOException {
+ Socket socket = applySettings(createSocket());
+ socket.bind(new InetSocketAddress(address, localPort));
+ socket.connect(new InetSocketAddress(localAddress, port));
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(InetAddress host, int port) throws IOException {
+ Socket socket = applySettings(createSocket());
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port, InetAddress localHost, int localPort)
+ throws IOException, UnknownHostException {
+ Socket socket = applySettings(createSocket());
+ socket.bind(new InetSocketAddress(host, localPort));
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ @Override
+ public Socket createSocket(String host, int port) throws IOException, UnknownHostException {
+ Socket socket = applySettings(createSocket());
+ socket.connect(new InetSocketAddress(host, port));
+ return socket;
+ }
+
+ public Configuration getConf() {
+ return this.configuration;
+ }
+
+ public void setConf(Configuration conf) {
+ this.configuration = conf;
+ this.socketReceiveBufferSize = conf.getInt("io.socket.receive.buffer", 100 * 1024 * 1024);
+ }
+
+ private Socket applySettings(Socket s) throws IOException {
+ if(logger.isDebugEnabled())
+ logger.debug("Attempting to set socket receive buffer of "
+ + this.socketReceiveBufferSize + " bytes");
+ s.setReceiveBufferSize(socketReceiveBufferSize);
+ if(logger.isDebugEnabled())
+ logger.info("Actually set socket receive buffer to " + s.getReceiveBufferSize()
+ + " bytes");
+ return s;
+ }
+
+}
View
25 contrib/hadoop-store-builder/src/java/voldemort/store/readonly/fetcher/HdfsFetcher.java
@@ -88,7 +88,9 @@ public HdfsFetcher(Long maxBytesPerSecond, File tempDir, int bufferSize) {
public File fetch(String fileUrl) throws IOException {
Path path = new Path(fileUrl);
Configuration config = new Configuration();
- config.setInt("io.file.buffer.size", bufferSize);
+ config.setInt("io.socket.receive.buffer", bufferSize);
+ config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+ ConfigurableSocketFactory.class.getName());
FileSystem fs = path.getFileSystem(config);
EventThrottler throttler = null;
if(maxBytesPerSecond != null)
@@ -204,7 +206,7 @@ public long getTotalBytesCopied() {
@JmxGetter(name = "bytesPerSecond", description = "The rate of the transfer in bytes/second.")
public double getBytesPerSecond() {
- double ellapsedSecs = System.nanoTime() - lastReportNs / (double) Time.NS_PER_SECOND;
+ double ellapsedSecs = (System.nanoTime() - lastReportNs) / (double) Time.NS_PER_SECOND;
return bytesSinceLastReport / ellapsedSecs;
}
@@ -232,24 +234,31 @@ else if(fs1.getPath().getName().endsWith(".index"))
else
return 0;
}
-
}
/*
* Main method for testing fetching
*/
public static void main(String[] args) throws Exception {
- if(args.length != 2)
- Utils.croak("USAGE: java " + HdfsFetcher.class.getName() + " url maxBytesPerSec");
+ if(args.length != 1)
+ Utils.croak("USAGE: java " + HdfsFetcher.class.getName() + " url");
String url = args[0];
- long maxBytesPerSec = Long.parseLong(args[1]);
+ long maxBytesPerSec = 1024 * 1024 * 1024;
Path p = new Path(url);
- FileStatus status = p.getFileSystem(new Configuration()).getFileStatus(p);
+ Configuration config = new Configuration();
+ config.setInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
+ config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
+ ConfigurableSocketFactory.class.getName());
+ config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
+ FileStatus status = p.getFileSystem(config).getFileStatus(p);
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec, null, DEFAULT_BUFFER_SIZE);
long start = System.currentTimeMillis();
File location = fetcher.fetch(url);
double rate = size * Time.MS_PER_SECOND / (double) (System.currentTimeMillis() - start);
- System.out.println("Fetch to " + location + " completed: " + rate + " bytes/sec.");
+ NumberFormat nf = NumberFormat.getInstance();
+ nf.setMaximumFractionDigits(2);
+ System.out.println("Fetch to " + location + " completed: "
+ + nf.format(rate / (1024.0 * 1024.0)) + " MB/sec.");
}
}
View
2  src/java/voldemort/server/http/HttpService.java
@@ -91,7 +91,7 @@ public void startInner() {
httpServer.setSendServerVersion(false);
httpServer.setSendDateHeader(false);
Context context = new Context(httpServer, "/", Context.NO_SESSIONS);
- context.setAttribute(VoldemortServletContextListener.SERVER_CONFIG_KEY, server);
+ context.setAttribute(VoldemortServletContextListener.SERVER_KEY, server);
context.setAttribute(VoldemortServletContextListener.VELOCITY_ENGINE_KEY,
velocityEngine);
context.addServlet(new ServletHolder(new AdminServlet(server, velocityEngine)),
View
2  src/java/voldemort/server/http/StoreServlet.java
@@ -62,7 +62,7 @@ public void init() throws ServletException {
// servlet context
if(this.requestHandler == null) {
ServletContext context = this.getServletContext();
- VoldemortServer server = (VoldemortServer) Utils.notNull(context.getAttribute(VoldemortServletContextListener.SERVER_CONFIG_KEY));
+ VoldemortServer server = (VoldemortServer) Utils.notNull(context.getAttribute(VoldemortServletContextListener.SERVER_KEY));
HttpService httpService = (HttpService) server.getService(ServiceType.HTTP);
this.requestHandler = httpService.getRequestHandler();
}
View
2  src/java/voldemort/server/http/gui/AdminServlet.java
@@ -54,7 +54,7 @@ public AdminServlet(VoldemortServer server, VelocityEngine engine) {
@Override
public void init() throws ServletException {
super.init();
- this.server = (VoldemortServer) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.SERVER_CONFIG_KEY));
+ this.server = (VoldemortServer) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.SERVER_KEY));
this.velocityEngine = (VelocityEngine) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.VELOCITY_ENGINE_KEY));
}
View
2  src/java/voldemort/server/http/gui/ReadOnlyStoreManagementServlet.java
@@ -77,7 +77,7 @@ public ReadOnlyStoreManagementServlet(VoldemortServer server, VelocityEngine eng
@Override
public void init() throws ServletException {
super.init();
- VoldemortServer server = (VoldemortServer) getServletContext().getAttribute(VoldemortServletContextListener.SERVER_CONFIG_KEY);
+ VoldemortServer server = (VoldemortServer) getServletContext().getAttribute(VoldemortServletContextListener.SERVER_KEY);
this.stores = getReadOnlyStores(server);
this.velocityEngine = (VelocityEngine) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.VELOCITY_ENGINE_KEY));
View
2  src/java/voldemort/server/http/gui/StatusServlet.java
@@ -64,7 +64,7 @@ public StatusServlet(VoldemortServer server, VelocityEngine engine) {
@Override
public void init() throws ServletException {
super.init();
- this.server = (VoldemortServer) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.SERVER_CONFIG_KEY));
+ this.server = (VoldemortServer) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.SERVER_KEY));
this.velocityEngine = (VelocityEngine) Utils.notNull(getServletContext().getAttribute(VoldemortServletContextListener.VELOCITY_ENGINE_KEY));
}
View
19 src/java/voldemort/server/protocol/admin/AdminServiceRequestHandler.java
@@ -244,9 +244,10 @@ private void fetchEntries(StorageEngine<ByteArray, byte[]> storageEngine,
counter++;
if(0 == counter % 100000) {
long totalTime = (System.currentTimeMillis() - startTime) / 1000;
- logger.debug("fetchEntries() scanned " + counter + " entries, fetched "
- + fetched + " entries for store:" + storageEngine.getName()
- + " partition:" + partitionList + " in " + totalTime + " s");
+ if(logger.isDebugEnabled())
+ logger.debug("fetchEntries() scanned " + counter + " entries, fetched "
+ + fetched + " entries for store:" + storageEngine.getName()
+ + " partition:" + partitionList + " in " + totalTime + " s");
}
}
} finally {
@@ -287,9 +288,10 @@ private void fetchKeys(StorageEngine<ByteArray, byte[]> storageEngine,
counter++;
if(0 == counter % 100000) {
long totalTime = (System.currentTimeMillis() - startTime) / 1000;
- logger.debug("fetchKeys() scanned " + counter + " keys, fetched " + fetched
- + " keys for store:" + storageEngine.getName() + " partition:"
- + partitionList + " in " + totalTime + " s");
+ if(logger.isDebugEnabled())
+ logger.debug("fetchKeys() scanned " + counter + " keys, fetched " + fetched
+ + " keys for store:" + storageEngine.getName() + " partition:"
+ + partitionList + " in " + totalTime + " s");
}
}
} finally {
@@ -336,8 +338,9 @@ public void handleUpdatePartitionEntries(VAdminProto.UpdatePartitionEntriesReque
counter++;
if(0 == counter % 100000) {
long totalTime = (System.currentTimeMillis() - startTime) / 1000;
- logger.debug("updateEntries() updated " + counter + " entries for store:"
- + storageEngine.getName() + " in " + totalTime + " s");
+ if(logger.isDebugEnabled())
+ logger.debug("updateEntries() updated " + counter + " entries for store:"
+ + storageEngine.getName() + " in " + totalTime + " s");
}
int size = inputStream.readInt();
View
18 src/java/voldemort/store/rebalancing/RedirectingStore.java
@@ -16,6 +16,7 @@
package voldemort.store.rebalancing;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -41,9 +42,6 @@
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
-
/**
* The RedirectingStore extends {@link DelegatingStore}
* <p>
@@ -120,13 +118,13 @@ private boolean redirectingKey(ByteArray key) {
@Override
public Map<ByteArray, List<Versioned<byte[]>>> getAll(Iterable<ByteArray> keys)
throws VoldemortException {
-
- proxyGetAllAndLocalPut(Iterables.filter(keys, new Predicate<ByteArray>() {
-
- public boolean apply(ByteArray key) {
- return redirectingKey(key);
- }
- }));
+ List<ByteArray> redirectingKeys = new ArrayList<ByteArray>();
+ for (ByteArray key: keys) {
+ if (redirectingKey(key))
+ redirectingKeys.add(key);
+ }
+ if (!redirectingKeys.isEmpty())
+ proxyGetAllAndLocalPut(redirectingKeys);
return getInnerStore().getAll(keys);
}
Please sign in to comment.
Something went wrong with that request. Please try again.