Permalink
Browse files

Merge branch 'master' of git://github.com/voldemort/voldemort

  • Loading branch information...
2 parents 5e52308 + 62f6228 commit e7dc90a3e5c729a20388e91faa6d0de20ae06aa8 @readams readams committed Jun 15, 2009
View
72 src/java/voldemort/store/readonly/ChunkedFileSet.java
@@ -12,20 +12,30 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.log4j.Logger;
+
import voldemort.VoldemortException;
import voldemort.store.PersistenceFailureException;
import voldemort.utils.Utils;
+/**
+ * A set of chunked data and index files for a read-only store
+ *
+ * @author jay
+ *
+ */
public class ChunkedFileSet {
+ private static Logger logger = Logger.getLogger(ChunkedFileSet.class);
+
private final int numChunks;
private final int numBuffersPerChunk;
private final File baseDir;
private final long bufferWaitTimeoutMs;
private final List<Integer> indexFileSizes;
private final List<Integer> dataFileSizes;
private final List<BlockingQueue<MappedByteBuffer>> indexFiles;
- private final List<BlockingQueue<MappedByteBuffer>> dataFiles;
+ private final List<FileChannel> dataFiles;
public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTimeoutMs) {
this.baseDir = directory;
@@ -37,13 +47,14 @@ public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTim
this.indexFileSizes = new ArrayList<Integer>();
this.dataFileSizes = new ArrayList<Integer>();
this.indexFiles = new ArrayList<BlockingQueue<MappedByteBuffer>>();
- this.dataFiles = new ArrayList<BlockingQueue<MappedByteBuffer>>();
+ this.dataFiles = new ArrayList<FileChannel>();
// if the directory is empty create empty files
if(baseDir.list() != null && baseDir.list().length == 0) {
try {
new File(baseDir, "0.index").createNewFile();
new File(baseDir, "0.data").createNewFile();
+ logger.info("No index or data files found, creating empty files 0.index and 0.data.");
} catch(IOException e) {
throw new VoldemortException("Error creating empty read-only files.", e);
}
@@ -59,19 +70,25 @@ public ChunkedFileSet(File directory, int numBuffersPerChunk, long bufferWaitTim
else if(index.exists() ^ data.exists())
throw new VoldemortException("One of the following does not exist: "
+ index.toString() + " and " + data.toString() + ".");
+
+ /* Deal with file sizes */
long indexLength = index.length();
long dataLength = data.length();
validateFileSizes(indexLength, dataLength);
indexFileSizes.add((int) indexLength);
dataFileSizes.add((int) dataLength);
+
+ /* Add the file channel for data */
+ dataFiles.add(openChannel(data));
+
+ /*
+ * Add multiple MappedByteBuffers for the index since we cannot
+ * share handles
+ */
BlockingQueue<MappedByteBuffer> indexFds = new ArrayBlockingQueue<MappedByteBuffer>(numBuffersPerChunk);
- BlockingQueue<MappedByteBuffer> dataFds = new ArrayBlockingQueue<MappedByteBuffer>(numBuffersPerChunk);
- for(int i = 0; i < numBuffersPerChunk; i++) {
+ for(int i = 0; i < numBuffersPerChunk; i++)
indexFds.add(mapFile(index));
- dataFds.add(mapFile(data));
- }
indexFiles.add(indexFds);
- dataFiles.add(dataFds);
chunkId++;
}
if(chunkId == 0)
@@ -98,11 +115,24 @@ public void close() {
for(int chunk = 0; chunk < this.numChunks; chunk++) {
for(int i = 0; i < this.numBuffersPerChunk; i++) {
checkoutIndexFile(chunk);
- checkoutDataFile(chunk);
+ FileChannel channel = getDataFile(chunk);
+ try {
+ channel.close();
+ } catch(IOException e) {
+ logger.error("Error while closing file.", e);
+ }
}
}
}
+ private FileChannel openChannel(File file) {
+ try {
+ return new FileInputStream(file).getChannel();
+ } catch(IOException e) {
+ throw new VoldemortException(e);
+ }
+ }
+
private MappedByteBuffer mapFile(File file) {
try {
FileChannel channel = new FileInputStream(file).getChannel();
@@ -123,19 +153,15 @@ public int getChunkForKey(byte[] key) {
}
public MappedByteBuffer checkoutIndexFile(int chunk) {
- return checkoutFile(indexFiles.get(chunk));
- }
-
- public void checkinIndexFile(MappedByteBuffer mmap, int chunk) {
- checkinFile(mmap, indexFiles.get(chunk));
+ return checkout(indexFiles.get(chunk));
}
- public MappedByteBuffer checkoutDataFile(int chunk) {
- return checkoutFile(dataFiles.get(chunk));
+ public void checkinIndexFile(MappedByteBuffer file, int chunk) {
+ checkin(file, indexFiles.get(chunk));
}
- public void checkinDataFile(MappedByteBuffer mmap, int chunk) {
- checkinFile(mmap, dataFiles.get(chunk));
+ public FileChannel getDataFile(int chunk) {
+ return dataFiles.get(chunk);
}
public int getIndexFileSize(int chunk) {
@@ -146,22 +172,22 @@ public int getDataFileSize(int chunk) {
return this.indexFileSizes.get(chunk);
}
- private void checkinFile(MappedByteBuffer map, BlockingQueue<MappedByteBuffer> mmaps) {
+ private <T> void checkin(T item, BlockingQueue<T> items) {
try {
- mmaps.put(map);
+ items.put(item);
} catch(InterruptedException e) {
throw new VoldemortException("Interrupted while waiting for file to checking.");
}
}
- private MappedByteBuffer checkoutFile(BlockingQueue<MappedByteBuffer> mmaps) {
+ private <T> T checkout(BlockingQueue<T> pool) {
try {
- MappedByteBuffer map = mmaps.poll(bufferWaitTimeoutMs, TimeUnit.MILLISECONDS);
- if(map == null)
+ T item = pool.poll(bufferWaitTimeoutMs, TimeUnit.MILLISECONDS);
+ if(item == null)
throw new VoldemortException("Timeout after waiting for " + bufferWaitTimeoutMs
+ " ms to acquire file descriptor");
else
- return map;
+ return item;
} catch(InterruptedException e) {
throw new PersistenceFailureException("Interrupted while waiting for file descriptor.",
e);
View
31 src/java/voldemort/store/readonly/ReadOnlyStorageEngine.java
@@ -18,7 +18,9 @@
import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collections;
@@ -330,15 +332,16 @@ private void shiftBackupsRight(int beginShift) {
}
private byte[] readValue(int chunk, int valueLocation) {
- MappedByteBuffer data = fileSet.checkoutDataFile(chunk);
+ FileChannel dataFile = fileSet.getDataFile(chunk);
try {
- data.position(valueLocation);
- int size = data.getInt();
- byte[] value = new byte[size];
- data.get(value);
- return value;
- } finally {
- fileSet.checkinDataFile(data, chunk);
+ ByteBuffer sizeBuffer = ByteBuffer.allocate(4);
+ dataFile.read(sizeBuffer, valueLocation);
+ int size = sizeBuffer.getInt(0);
+ ByteBuffer valueBuffer = ByteBuffer.allocate(size);
+ dataFile.read(valueBuffer, valueLocation + 4);
+ return valueBuffer.array();
+ } catch(IOException e) {
+ throw new VoldemortException(e);
}
}
@@ -384,19 +387,11 @@ private int getValueLocation(int chunk, byte[] keyMd5) {
* Read the key, potentially from the cache
*/
private byte[] readKey(MappedByteBuffer index, int indexByteOffset, byte[] foundKey) {
- readFrom(index, indexByteOffset, foundKey);
+ index.position(indexByteOffset);
+ index.get(foundKey);
return foundKey;
}
- /*
- * Seek to the given object and read into the buffer exactly buffer.length
- * bytes
- */
- private static void readFrom(MappedByteBuffer file, int indexByteOffset, byte[] buffer) {
- file.position(indexByteOffset);
- file.get(buffer);
- }
-
/**
* Not supported, throws UnsupportedOperationException if called
*/
View
168 test/integration/voldemort/performance/MysqlBench.java
@@ -16,14 +16,27 @@
package voldemort.performance;
+import static java.util.Arrays.asList;
+
+import java.io.BufferedReader;
+import java.io.FileReader;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.util.Set;
import javax.sql.DataSource;
+import joptsimple.OptionParser;
+import joptsimple.OptionSet;
+
import org.apache.commons.dbcp.BasicDataSource;
+import voldemort.utils.CmdUtils;
+import voldemort.utils.Utils;
+
+import com.google.common.base.Join;
+
/**
* A simple MySQL benchmark
*
@@ -33,46 +46,97 @@
public class MysqlBench {
private final DataSource dataSource;
+ private final String table;
+ private final String requestFile;
private final int numRequests;
private final int numThreads;
-
- private static void croak(String message) {
- System.err.println(message);
- System.err.println("USAGE: java MysqlBench jdbc-url db-user db-password num-requests num-threads");
- System.exit(1);
- }
+ private final boolean doReads;
+ private final boolean doWrites;
public static void main(String[] args) throws Exception {
- if(args.length != 5)
- croak("Invalid number of command line arguments: expected 5 but got " + args.length
- + ".");
- String jdbcUrl = args[0];
- String user = args[1];
- String password = args[2];
- int numRequests = Integer.parseInt(args[3]);
- int numThreads = Integer.parseInt(args[4]);
- MysqlBench bench = new MysqlBench(numThreads, numRequests, jdbcUrl, user, password);
+ OptionParser parser = new OptionParser();
+ parser.accepts("help", "print usage information");
+ parser.acceptsAll(asList("r", "reads"), "Enable reads.");
+ parser.acceptsAll(asList("w", "writes"), "Enable writes.");
+ parser.acceptsAll(asList("d", "deletes"), "Enable deletes.");
+ parser.accepts("table", "Table name").withRequiredArg();
+ parser.accepts("db", "Database name").withRequiredArg();
+ parser.acceptsAll(asList("u", "user"), "DB username.").withRequiredArg();
+ parser.acceptsAll(asList("P", "password"), "DB password").withRequiredArg();
+ parser.acceptsAll(asList("p", "port"), "DB port").withRequiredArg();
+ parser.acceptsAll(asList("h", "host"), "DB host").withRequiredArg();
+ parser.accepts("requests").withRequiredArg().ofType(Integer.class);
+ parser.accepts("request-file").withRequiredArg();
+ parser.accepts("threads").withRequiredArg().ofType(Integer.class);
+ OptionSet options = parser.parse(args);
+
+ if(options.has("help")) {
+ parser.printHelpOn(System.out);
+ System.exit(0);
+ }
+
+ Set<String> missing = CmdUtils.missing(options, "table", "requests", "db");
+ if(missing.size() > 0)
+ Utils.croak("Missing required arguments: " + Join.join(", ", missing));
+
+ String host = CmdUtils.valueOf(options, "host", "localhost");
+ String table = (String) options.valueOf("table");
+ int port = CmdUtils.valueOf(options, "port", 3306);
+ String database = (String) options.valueOf("db");
+ String jdbcUrl = "jdbc:mysql://" + host + ":" + port + "/" + database;
+ String user = CmdUtils.valueOf(options, "user", "root");
+ String password = CmdUtils.valueOf(options, "password", "");
+ String requestFile = (String) options.valueOf("request-file");
+ int numRequests = (Integer) options.valueOf("requests");
+ int numThreads = CmdUtils.valueOf(options, "threads", 10);
+ boolean doReads = false;
+ boolean doWrites = false;
+ if(options.has("reads") || options.has("writes")) {
+ doReads = options.has("reads");
+ doWrites = options.has("writes");
+ } else {
+ doReads = true;
+ doWrites = true;
+ }
+ MysqlBench bench = new MysqlBench(table,
+ numThreads,
+ numRequests,
+ jdbcUrl,
+ user,
+ password,
+ requestFile,
+ doReads,
+ doWrites);
bench.benchmark();
}
- public MysqlBench(int numThreads,
+ public MysqlBench(String table,
+ int numThreads,
int numRequests,
String connectionString,
String username,
- String password) {
+ String password,
+ String requestFile,
+ boolean doReads,
+ boolean doWrites) {
+ this.table = table;
this.numThreads = numThreads;
this.numRequests = numRequests;
BasicDataSource ds = new BasicDataSource();
ds.setDriverClassName("com.mysql.jdbc.Driver");
ds.setUsername(username);
ds.setPassword(password);
ds.setUrl(connectionString);
+ this.requestFile = requestFile;
this.dataSource = ds;
+ this.doReads = doReads;
+ this.doWrites = doWrites;
}
private void upsert(String key, String value) throws Exception {
Connection conn = dataSource.getConnection();
- String upsert = "insert into test_table (key_, value_) values (?, ?) on duplicate key update value_ = ?";
+ String upsert = "insert into " + table
+ + " (key_, val_) values (?, ?) on duplicate key update val_ = ?";
PreparedStatement stmt = conn.prepareStatement(upsert);
try {
stmt.setString(1, key);
@@ -91,7 +155,7 @@ private void upsert(String key, String value) throws Exception {
private void deleteAll() throws Exception {
Connection conn = dataSource.getConnection();
- String delete = "delete from test_table";
+ String delete = "delete from " + table;
PreparedStatement stmt = conn.prepareStatement(delete);
try {
stmt.executeUpdate();
@@ -107,7 +171,7 @@ private void deleteAll() throws Exception {
private String select(String key) throws Exception {
Connection conn = dataSource.getConnection();
- String upsert = "select value_ from test_table where key_ = ?";
+ String upsert = "select val_ from " + table + " where key_ = ?";
PreparedStatement stmt = conn.prepareStatement(upsert);
ResultSet results = null;
try {
@@ -128,31 +192,47 @@ private String select(String key) throws Exception {
}
public void benchmark() throws Exception {
- System.out.println("WRITE TEST");
- PerformanceTest writeTest = new PerformanceTest() {
-
- @Override
- public void doOperation(int index) throws Exception {
- upsert(Integer.toString(index), Integer.toString(index));
- }
- };
- writeTest.run(numRequests, numThreads);
- writeTest.printStats();
-
- System.out.println();
-
- System.out.println("READ TEST");
- PerformanceTest readTest = new PerformanceTest() {
+ if(doWrites) {
+ deleteAll();
+ System.out.println("WRITE TEST");
+ PerformanceTest writeTest = new PerformanceTest() {
+
+ @Override
+ public void doOperation(int index) throws Exception {
+ upsert(Integer.toString(index), Integer.toString(index));
+ }
+ };
+ writeTest.run(numRequests, numThreads);
+ writeTest.printStats();
+
+ System.out.println();
+ }
- @Override
- public void doOperation(int index) throws Exception {
- select(Integer.toString(index));
+ if(doReads) {
+ System.out.println("READ TEST");
+
+ PerformanceTest readTest;
+ if(this.requestFile == null) {
+ readTest = new PerformanceTest() {
+
+ @Override
+ public void doOperation(int index) throws Exception {
+ select(Integer.toString(index));
+ }
+ };
+ } else {
+ final BufferedReader reader = new BufferedReader(new FileReader(requestFile),
+ 1024 * 1024);
+ readTest = new PerformanceTest() {
+
+ @Override
+ public void doOperation(int index) throws Exception {
+ select(reader.readLine().trim());
+ }
+ };
}
- };
- readTest.run(numRequests, numThreads);
- readTest.printStats();
-
- deleteAll();
+ readTest.run(numRequests, numThreads);
+ readTest.printStats();
+ }
}
-
}
View
4 test/integration/voldemort/performance/ReadOnlyStorePerformanceTest.java
@@ -96,6 +96,7 @@ public static void main(String[] args) throws FileNotFoundException, IOException
requestGenerator = new Runnable() {
public void run() {
+ System.out.println("Generating random requests.");
Random random = new Random();
try {
while(true)
@@ -110,8 +111,9 @@ public void run() {
public void run() {
try {
+ System.out.println("Using request file to generate requests.");
BufferedReader reader = new BufferedReader(new FileReader(inputFile),
- 100000);
+ 1000000);
while(true) {
String line = reader.readLine();
if(line == null)

0 comments on commit e7dc90a

Please sign in to comment.