Permalink
Browse files

corrupted index rebuild test

  • Loading branch information...
1 parent 350b8f8 commit a6222b48b91e855d657ae19cf460d5caeb82b57c Przemek committed Jan 19, 2012
@@ -48,7 +48,7 @@ class Bytecask(val dir: String, name: String = Utils.randomString(8), maxFileSiz
checkArgument(key.length > 0, "Key cannot be empty")
checkArgument(value.length > 0, "Value cannot be empty")
val entry = index.get(key)
- val (pos, length, timestamp) = io.appendEntry(key, processor.before(value))
+ val (pos, length, timestamp) = io.appendDataEntry(key, processor.before(value))
if (!entry.isEmpty && entry.get.isInactive) merger.entryChanged(entry.get)
index.update(key, pos, length, timestamp)
if (io.pos > maxFileSize) split()
@@ -64,7 +64,7 @@ class Bytecask(val dir: String, name: String = Utils.randomString(8), maxFileSiz
checkArgument(key.length > 0, "Key cannot be empty")
val entry = index.get(key)
if (!entry.isEmpty) {
- io.appendEntry(key, TOMBSTONE_VALUE)
+ io.appendDataEntry(key, TOMBSTONE_VALUE)
index.delete(key)
if (entry.get.isInactive) merger.entryChanged(entry.get)
Some(entry)
@@ -32,9 +32,9 @@ object IO extends Logging {
val HEADER_SIZE = 14 //crc, ts, ks, vs -> 4 + 4 + 2 + 4 bytes
val DEFAULT_MAX_FILE_SIZE = Int.MaxValue // 2GB
val ACTIVE_FILE_NAME = "0"
- val FILE_REGEX = "^[0-9]+$"
+ val DATA_FILE_REGEX = "^[0-9]+$"
- def appendEntry(appender: RandomAccessFile, key: Bytes, value: Bytes) = appender.synchronized {
+ def appendDataEntry(appender: RandomAccessFile, key: Bytes, value: Bytes) = appender.synchronized {
val pos = appender.getFilePointer
val timestamp = (Utils.now / 1000).intValue()
val keySize = key.size
@@ -55,11 +55,15 @@ object IO extends Logging {
(pos.toInt, length, timestamp)
}
+ def appendHintEntry(entry: IndexEntry) {
+
+ }
+
/*
Indexed read
*/
- def readEntry(reader: RandomAccessFile, entry: IndexEntry) = {
+ def readDataEntry(reader: RandomAccessFile, entry: IndexEntry) = {
reader.getChannel.position(entry.pos)
val buffer = ByteBuffer.allocate(entry.length)
val read = reader.getChannel.read(buffer)
@@ -78,14 +82,14 @@ object IO extends Logging {
Array.copy(a, IO.HEADER_SIZE, key, 0, keySize)
val value = new Array[Byte](valueSize)
Array.copy(a, IO.HEADER_SIZE + keySize, value, 0, valueSize)
- FileEntry(entry.pos, actualCrc, keySize, valueSize, timestamp, key, value)
+ DataEntry(entry.pos, actualCrc, keySize, valueSize, timestamp, key, value)
}
/*
Iterative non-indexed read
*/
- def readEntry(reader: RandomAccessFile) = {
+ def readDataEntry(reader: RandomAccessFile) = {
val pos = reader.getFilePointer
val header = new Array[Byte](IO.HEADER_SIZE)
reader.readOrThrow(header, "Failed to read chunk of %s bytes".format(IO.HEADER_SIZE))
@@ -103,22 +107,22 @@ object IO extends Logging {
crc.update(value, 0, valueSize)
val actualCrc = crc.getValue.toInt
if (expectedCrc != actualCrc) throw new IOException("CRC check failed: %s != %s".format(expectedCrc, actualCrc))
- FileEntry(pos.toInt, actualCrc, keySize, valueSize, timestamp, key, value)
+ DataEntry(pos.toInt, actualCrc, keySize, valueSize, timestamp, key, value)
}
@inline
- def readEntry(pool: RandomAccessFilePool, dir: String, entry: IndexEntry): FileEntry = {
+ def readEntry(pool: RandomAccessFilePool, dir: String, entry: IndexEntry): DataEntry = {
withPooled(pool, dir + "/" + entry.file) {
- reader => readEntry(reader, entry)
+ reader => readDataEntry(reader, entry)
}
}
- def readEntries(file: File, callback: (File, FileEntry) => Any): Boolean = {
+ def readDataEntries(file: File, callback: (File, DataEntry) => Any): Boolean = {
val length = file.length()
val reader = new RandomAccessFile(file, "r")
try {
while (reader.getFilePointer < length) {
- val entry = readEntry(reader)
+ val entry = readDataEntry(reader)
callback(file, entry)
}
true
@@ -131,8 +135,8 @@ object IO extends Logging {
}
}
- private def readAll(file: File, reader: RandomAccessFile, callback: (File, FileEntry) => Any) {
- val entry = readEntry(reader)
+ private def readAll(file: File, reader: RandomAccessFile, callback: (File, DataEntry) => Any) {
+ val entry = readDataEntry(reader)
callback(file, entry)
readAll(file, reader, callback)
}
@@ -174,8 +178,8 @@ final class IO(val dir: String, maxConcurrentReaders: Int = 10) extends Closeabl
var appender = createAppender()
lazy val readers = new RandomAccessFilePool(maxConcurrentReaders)
- def appendEntry(key: Bytes, value: Bytes) = {
- IO.appendEntry(appender, key, value)
+ def appendDataEntry(key: Bytes, value: Bytes) = {
+ IO.appendDataEntry(appender, key, value)
}
def readValue(entry: IndexEntry): Array[Byte] = {
@@ -201,7 +205,7 @@ final class IO(val dir: String, maxConcurrentReaders: Int = 10) extends Closeabl
*/
private def nextFile() = {
- val files = ls(dir).filter(f => f.isFile && f.getName.matches(IO.FILE_REGEX)).map(_.getName.toInt).sortWith(_ < _)
+ val files = ls(dir).filter(f => f.isFile && f.getName.matches(IO.DATA_FILE_REGEX)).map(_.getName.toInt).sortWith(_ < _)
val slot = firstSlot(files)
val next = if (!slot.isEmpty) slot.get else (files.last + 1)
(dir / next).mkFile
@@ -228,6 +232,6 @@ final class IO(val dir: String, maxConcurrentReaders: Int = 10) extends Closeabl
}
}
-final case class FileEntry(pos: Int, crc: Int, keySize: Int, valueSize: Int, timestamp: Int, key: Array[Byte], value: Array[Byte]) {
+final case class DataEntry(pos: Int, crc: Int, keySize: Int, valueSize: Int, timestamp: Int, key: Array[Byte], value: Array[Byte]) {
def size = IO.HEADER_SIZE + key.length + value.length
}
@@ -39,10 +39,10 @@ final class Index(io: IO) extends Logging with Locking with Tracking {
}
private def indexFile(file: File) = {
- IO.readEntries(file, addEntry)
+ IO.readDataEntries(file, addEntry)
}
- private def addEntry(file: File, entry: FileEntry) = writeLock {
+ private def addEntry(file: File, entry: DataEntry) = writeLock {
index.put(entry.key, IndexEntry(file.getName, entry.pos, entry.size, entry.timestamp))
}
@@ -68,7 +68,7 @@ final class Index(io: IO) extends Logging with Locking with Tracking {
}
}
- def hasEntry(entry: FileEntry) = {
+ def hasEntry(entry: DataEntry) = {
val e = index.get(entry.key)
//debug("hasEntry: " + e + " -> " + entry)
!e.isEmpty && e.get.timestamp == entry.timestamp
@@ -53,19 +53,27 @@ final class Merger(io: IO, index: Index) extends Logging {
merge(files)
}
+ /**
+ * Merges files and creates a "hint" file out of the compacted data file
+ */
+
private def merge(files: Iterable[String]) = {
if (files.size > 1) {
val target = files.head
debug("Merging files: %s -> '%s'".format(collToString(files), target))
- val tmp = temporary(target)
+ val tmp = temporaryFile(target)
+ val hint = hintFile(target)
val subIndex = Map[Bytes, IndexEntry]()
- withResource(new RandomAccessFile(tmp, "rw")) {
- appender =>
+ //hint: ts,ks,vs,vpos,key
+ withResources(new RandomAccessFile(tmp, "rw"), new RandomAccessFile(hint, "rw")) {
+ (a1, a2) =>
files.foreach {
- file => IO.readEntries(dbFile(file), (file: File, entry: FileEntry) => {
+ file => IO.readDataEntries(dbFile(file), (file: File, entry: DataEntry) => {
if (entry.valueSize > 0 && index.hasEntry(entry)) {
- val (pos, length, timestamp) = IO.appendEntry(appender, entry.key, entry.value)
- subIndex.put(entry.key, IndexEntry(file.getName, pos, length, timestamp))
+ val (pos, length, timestamp) = IO.appendDataEntry(a1, entry.key, entry.value)
+ val indexEntry = IndexEntry(file.getName, pos, length, timestamp)
+ subIndex.put(entry.key, indexEntry)
+ IO.appendHintEntry(indexEntry)
}
})
}
@@ -87,15 +95,15 @@ final class Merger(io: IO, index: Index) extends Logging {
merge(ls(io.dir).map(_.getName).filter(_ != IO.ACTIVE_FILE_NAME).map(_.toInt).sortWith(_ < _).map(_.toString))
}
- private def temporary(file: String) = (io.dir + "/" + file + "_").mkFile
+ private def temporaryFile(file: String) = (io.dir + "/" + file + "_").mkFile
- private def hint(file: String) = (io.dir + "/" + file + "h").mkFile
+ private def hintFile(file: String) = (io.dir + "/" + file + "h").mkFile
private def dbFile(file: String) = (io.dir + "/" + file).mkFile
}
/*
-Represents change measure for a file - how many entries and how much space
+Represents "change measure" for a file - how many entries / how much space
is to be potentially regained
*/
@@ -122,6 +122,15 @@ object Utils {
}
}
+ def withResources[X <: {def close()}, A](resource1: X, resource2: X)(f: (X, X) => A) = {
+ try {
+ f(resource1, resource2)
+ } finally {
+ resource1.close()
+ resource2.close()
+ }
+ }
+
def withPooled[A](pool: RandomAccessFilePool, file: String)(f: RandomAccessFile => A) = {
val reader = pool.get(file)
try {
@@ -44,7 +44,7 @@ class MergeSuite extends FunSuite with ShouldMatchers with BeforeAndAfterEach {
ls(db.dir).size should be(4)
db.merge()
val s1 = dirSize(db.dir)
- ls(db.dir).size should be(2)
+ ls(db.dir).size should be(2 + 1)
println("size before: %s, after: %s ".format(s0, s1))
assert(s1 < s0)
assert(db.get("foo2").isEmpty)

0 comments on commit a6222b4

Please sign in to comment.