Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/libkestrel_mmap_experiment' into…
Browse files Browse the repository at this point in the history
… halp

Conflicts:
	project/build/KestrelProject.scala
  • Loading branch information
Robey Pointer committed Mar 7, 2012
2 parents 8bb9efd + 1d98944 commit a7ef67e
Show file tree
Hide file tree
Showing 13 changed files with 134 additions and 92 deletions.
5 changes: 3 additions & 2 deletions src/main/scala/net/lag/kestrel/KestrelHandler.scala
Expand Up @@ -23,6 +23,7 @@ import com.twitter.logging.Logger
import com.twitter.ostrich.admin.{BackgroundProcess, ServiceTracker}
import com.twitter.ostrich.stats.Stats
import com.twitter.util._
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.collection.Set
Expand Down Expand Up @@ -217,8 +218,8 @@ abstract class KestrelHandler(
Stats.incr("cmd_get_open_dropped", cancelAllPendingReads())
}

def setItem(key: String, flags: Int, expiry: Option[Time], data: Array[Byte]) = {
log.debug("set -> q=%s flags=%d expiry=%s size=%d", key, flags, expiry, data.length)
def setItem(key: String, flags: Int, expiry: Option[Time], data: ByteBuffer) = {
log.debug("set -> q=%s flags=%d expiry=%s size=%d", key, flags, expiry, data.remaining)
Stats.incr("cmd_set")
val (rv, nsec) = Duration.inNanoseconds {
queues.add(key, data, expiry, Time.now)
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/net/lag/kestrel/MemcacheHandler.scala
Expand Up @@ -164,7 +164,7 @@ class MemcacheHandler(
case None =>
new MemcacheResponse("END")
case Some(item) =>
new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data))
new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.remaining), Some(item.data))
}
}
} catch {
Expand All @@ -184,7 +184,7 @@ class MemcacheHandler(
case None =>
channel.send(new MemcacheResponse("END") then Codec.EndStream)
case Some(item) =>
channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.length), Some(item.data)))
channel.send(new MemcacheResponse("VALUE %s 0 %d".format(key, item.data.remaining), Some(item.data)))
}
}
new MemcacheResponse("") then Codec.Stream(channel)
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/net/lag/kestrel/QueueCollection.scala
Expand Up @@ -18,6 +18,7 @@
package net.lag.kestrel

import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.{CountDownLatch, ScheduledExecutorService}
import scala.collection.mutable
import com.twitter.conversions.time._
Expand Down Expand Up @@ -157,7 +158,7 @@ class QueueCollection(
*
* @return true if the item was added; false if the server is shutting down
*/
def add(key: String, data: Array[Byte], expiry: Option[Time], addTime: Time): Boolean = {
def add(key: String, data: ByteBuffer, expiry: Option[Time], addTime: Time): Boolean = {
writer(key) flatMap { q =>
q.put(data, addTime, expiry) map { future =>
future map { _ => Stats.incr("total_items") }
Expand All @@ -166,8 +167,8 @@ class QueueCollection(
} getOrElse(false)
}

def add(key: String, item: Array[Byte]): Boolean = add(key, item, None, Time.now)
def add(key: String, item: Array[Byte], expiry: Option[Time]): Boolean = add(key, item, expiry, Time.now)
def add(key: String, item: ByteBuffer): Boolean = add(key, item, None, Time.now)
def add(key: String, item: ByteBuffer, expiry: Option[Time]): Boolean = add(key, item, expiry, Time.now)

/**
* Retrieve an item from a queue and pass it to a continuation. If no item is available within
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/net/lag/kestrel/TextHandler.scala
Expand Up @@ -26,6 +26,7 @@ import codec.{MemcacheResponse, MemcacheRequest}
import com.twitter.naggati.Stages._
import org.jboss.netty.buffer.{ChannelBuffer, ChannelBuffers}
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import com.twitter.finagle.{ClientConnection, Service}
import com.twitter.util.{Future, Duration, Time}

Expand All @@ -38,18 +39,18 @@ object TextCodec {
val read = readLine(true, "ISO-8859-1") { line =>
if (line.endsWith(":")) {
val segments = line.substring(0, line.length - 1).split(" ")
readData(segments, new mutable.ListBuffer[Array[Byte]])
readData(segments, new mutable.ListBuffer[ByteBuffer])
} else {
val segments = line.split(" ")
emit(TextRequest(segments(0).toLowerCase, segments.drop(1).toList, Nil))
}
}

private def readData(segments: Seq[String], items: mutable.ListBuffer[Array[Byte]]): Stage = readLine(true, "ISO-8859-1") { line =>
private def readData(segments: Seq[String], items: mutable.ListBuffer[ByteBuffer]): Stage = readLine(true, "ISO-8859-1") { line =>
if (line == "" || (items.size >= MAX_PUT_BUFFER)) {
emit(TextRequest(segments(0).toLowerCase, segments.drop(1).toList, items.toList))
} else {
items += line.getBytes("UTF-8")
items += ByteBuffer.wrap(line.getBytes("UTF-8"))
readData(segments, items)
}
}
Expand All @@ -59,7 +60,7 @@ object TextCodec {
}
}

case class TextRequest(command: String, args: List[String], items: List[Array[Byte]])
case class TextRequest(command: String, args: List[String], items: List[ByteBuffer])

object TextResponse {
val NO_ITEM = Some(ChannelBuffers.wrappedBuffer("*\n".getBytes))
Expand All @@ -70,11 +71,11 @@ object TextResponse {
abstract class TextResponse extends Codec.Signalling {
def toBuffer: Option[ChannelBuffer]
}
case class ItemResponse(data: Option[Array[Byte]]) extends TextResponse {
case class ItemResponse(data: Option[ByteBuffer]) extends TextResponse {
def toBuffer = {
if (data.isDefined) {
val bytes = data.get
val buffer = ChannelBuffers.buffer(bytes.size + 2)
val buffer = ChannelBuffers.buffer(bytes.remaining + 2)
buffer.writeByte(TextResponse.COLON)
buffer.writeBytes(bytes)
buffer.writeByte(TextResponse.LF)
Expand Down
8 changes: 3 additions & 5 deletions src/main/scala/net/lag/kestrel/ThriftHandler.scala
Expand Up @@ -167,9 +167,7 @@ class ThriftHandler (
def put(queueName: String, items: Seq[ByteBuffer], expirationMsec: Int): Future[Int] = {
var count = 0
var expiry = if (expirationMsec == 0) None else Some(expirationMsec.milliseconds.fromNow)
items.foreach { item =>
val data = new Array[Byte](item.remaining)
item.get(data)
items.foreach { data =>
if (!handler.setItem(queueName, 0, expiry, data)) return Future(count)
count += 1
}
Expand All @@ -187,7 +185,7 @@ class ThriftHandler (
}
case Some(item) => {
val externalXid = externalXidOption.getOrElse(0L)
rv += new thrift.Item(ByteBuffer.wrap(item.data), externalXid)
rv += new thrift.Item(item.data, externalXid)
}
}
}
Expand Down Expand Up @@ -217,7 +215,7 @@ class ThriftHandler (

def peek(queueName: String): Future[thrift.QueueInfo] = {
handler.getItem(queueName, None, false, true).map { itemOption =>
val data = itemOption.map { item => ByteBuffer.wrap(item.data) }
val data = itemOption.map { item => item.data }
queueCollection.reader(queueName) map { queue =>
new thrift.QueueInfo(data, queue.items, queue.bytes, queue.writer.journalBytes,
queue.age.inMilliseconds, queue.waiterCount, queue.openItems)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/net/lag/kestrel/oldjournal/Journal.scala
Expand Up @@ -99,7 +99,7 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
def this(fullPath: String) = this(fullPath, Duration.MaxValue)

private def open(file: File) {
writer = new PeriodicSyncFile(file, syncScheduler, syncJournal)
writer = new PeriodicSyncFile(file, syncScheduler, syncJournal, None)
}

def open() {
Expand Down
4 changes: 3 additions & 1 deletion src/main/scala/net/lag/kestrel/tools/QueueConverter.scala
Expand Up @@ -23,6 +23,7 @@ import com.twitter.conversions.string._
import com.twitter.libkestrel
import com.twitter.util.Duration
import java.io.{File, FileNotFoundException, IOException}
import java.nio.ByteBuffer
import java.security.MessageDigest
import net.lag.kestrel.oldjournal
import scala.collection.mutable
Expand Down Expand Up @@ -76,7 +77,8 @@ object QueueConverter {
def addItem(item: oldjournal.QItem): String = {
val hash = md5.digest(item.data).hexlify
if (allowDupes || !(seenIds contains hash)) {
val qitem = newJournal.put(item.data, item.addTime, item.expiry).map {
val buffer = ByteBuffer.wrap(item.data)
val qitem = newJournal.put(buffer, item.addTime, item.expiry).map {
case (qitem, sync) => qitem
}.get()
seenIds(hash) = qitem.id
Expand Down
47 changes: 25 additions & 22 deletions src/test/scala/net/lag/kestrel/KestrelHandlerSpec.scala
Expand Up @@ -18,6 +18,7 @@
package net.lag.kestrel

import java.io.{File, FileInputStream}
import java.nio.ByteBuffer
import java.util.concurrent.ScheduledThreadPoolExecutor
import scala.collection.mutable
import scala.util.Sorting
Expand All @@ -34,11 +35,13 @@ class FakeKestrelHandler(queues: QueueCollection, maxOpenTransactions: Int)
extends KestrelHandler(queues, maxOpenTransactions, () => "none", 0) with SimplePendingReads

class KestrelHandlerSpec extends Specification with TempFolder with TestLogging {
import TestBuffers.{stringToBuffer, bufferToString}

val config = new QueueBuilder { name = "test" }

case class beString(expected: String) extends Matcher[Option[QueueItem]]() {
def apply(v: => Option[QueueItem]) = {
val actual = v.map { item => new String(item.data) }
val actual = v.map { item => bufferToString(item.data) }
(actual == Some(expected), "ok", "item " + actual + " != " + expected)
}
}
Expand All @@ -56,8 +59,8 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
handler.setItem("test", 0, None, stringToBuffer("one"))
handler.setItem("test", 0, None, stringToBuffer("two"))
handler.getItem("test", None, false, false).get() must beString("one")
handler.getItem("test", None, false, false).get() must beString("two")
}
Expand All @@ -74,7 +77,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
Stats.getCounter("get_hits")() mustEqual 0
Stats.getCounter("get_misses")() mustEqual 0

handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, stringToBuffer("one"))
Stats.getCounter("cmd_set")() mustEqual 1
Stats.getCounter("cmd_get")() mustEqual 0

Expand All @@ -96,7 +99,7 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, stringToBuffer("one"))
handler.getItem("test", None, true, false)() must beString("one")
handler.getItem("test", None, true, false)() mustEqual None
handler.abortRead("test") mustEqual true
Expand All @@ -111,9 +114,9 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("test", 0, None, "one".getBytes)
handler.setItem("test", 0, None, "two".getBytes)
handler.setItem("test", 0, None, "three".getBytes)
handler.setItem("test", 0, None, stringToBuffer("one"))
handler.setItem("test", 0, None, stringToBuffer("two"))
handler.setItem("test", 0, None, stringToBuffer("three"))
handler.getItem("test", None, true, false)() must beString("one")
handler.getItem("test", None, true, false)() must beString("two")
handler.abortRead("test") mustEqual true
Expand All @@ -129,12 +132,12 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 10)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
handler.setItem("green", 0, None, "green1".getBytes)
handler.setItem("green", 0, None, "green2".getBytes)
handler.setItem("blue", 0, None, "blue1".getBytes)
handler.setItem("blue", 0, None, "blue2".getBytes)
handler.setItem("red", 0, None, stringToBuffer("red1"))
handler.setItem("red", 0, None, stringToBuffer("red2"))
handler.setItem("green", 0, None, stringToBuffer("green1"))
handler.setItem("green", 0, None, stringToBuffer("green2"))
handler.setItem("blue", 0, None, stringToBuffer("blue1"))
handler.setItem("blue", 0, None, stringToBuffer("blue2"))

handler.getItem("red", None, true, false)() must beString("red1")
handler.getItem("green", None, true, false)() must beString("green1")
Expand All @@ -158,8 +161,8 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 1)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
handler.setItem("red", 0, None, stringToBuffer("red1"))
handler.setItem("red", 0, None, stringToBuffer("red2"))
handler.getItem("red", None, true, false)() must beString("red1")
handler.getItem("red", None, true, false)() must throwA[TooManyOpenReadsException]
}
Expand All @@ -170,22 +173,22 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 5)
val got = new mutable.ListBuffer[QueueItem]()
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
handler.setItem("red", 0, None, "red3".getBytes)
handler.setItem("red", 0, None, stringToBuffer("red1"))
handler.setItem("red", 0, None, stringToBuffer("red2"))
handler.setItem("red", 0, None, stringToBuffer("red3"))
handler.monitorUntil("red", Some(1.hour.fromNow), 2, true) { (itemOption, _) =>
itemOption.foreach { got += _ }
}
got.toList.map { x => new String(x.data) } mustEqual List("red1", "red2")
got.toList.map { x => bufferToString(x.data) } mustEqual List("red1", "red2")
}
}

"close all reads" in {
withTempFolder {
queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
val handler = new FakeKestrelHandler(queues, 2)
handler.setItem("red", 0, None, "red1".getBytes)
handler.setItem("red", 0, None, "red2".getBytes)
handler.setItem("red", 0, None, stringToBuffer("red1"))
handler.setItem("red", 0, None, stringToBuffer("red2"))
handler.getItem("red", None, true, false)() must beString("red1")
handler.getItem("red", None, true, false)() must beString("red2")
handler.closeAllReads("red") mustEqual 2
Expand Down

0 comments on commit a7ef67e

Please sign in to comment.