Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Adds random order transaction closing/aborting. To retain backwards c…

…ompatibility, new options to GET were added: syn, ack, fail, analogous to open, close, abort respectively. Also ack/fail require id=xid parameter. XIDs are returned when using syn option.
  • Loading branch information...
commit 085d7cb3e7f8cfbd8aeb416ae9e56eed18d38bcb 1 parent 552df6a
Jason Jackson authored
View
84 src/main/scala/net/lag/kestrel/KestrelHandler.scala
@@ -35,6 +35,37 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
val sessionId = Kestrel.sessionId.incrementAndGet()
+
+ object pendingRATransactions { // pending Random Access Transactions.
+ // used for syn, ack, fail
+ private val transactions = new mutable.HashMap[String, mutable.HashSet[Int]] {
+ override def default(key: String) = {
+ val rv = new mutable.HashSet[Int]
+ this(key) = rv
+ rv
+ }
+ }
+
+ def remove(name: String, xid: Int): Boolean = synchronized {
+ transactions(name).remove(xid)
+ }
+
+ def add(name: String, xid: Int) = synchronized {
+ transactions(name) += xid
+ }
+
+ def size(name: String): Int = synchronized { transactions(name).size }
+
+ def cancelAll() {
+ synchronized {
+ transactions.foreach { case (name, xids) =>
+ xids.foreach { xid => queues.unremove(name, xid) }
+ }
+ transactions.clear()
+ }
+ }
+ }
+
object pendingTransactions {
private val transactions = new mutable.HashMap[String, mutable.ListBuffer[Int]] {
override def default(key: String) = {
@@ -98,6 +129,32 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
queues.queueNames.foreach { qName => queues.flush(qName) }
}
+ def failTransaction(key: String, xid: Int): Boolean = {
+ pendingRATransactions.remove(key, xid) match {
+ case true =>
+ log.debug("fail -> q=%s, xid=%d", key, xid)
+ queues.unremove(key, xid)
+ true
+ case false =>
+ log.warning("Attempt to fail a non-existent transaction on '%s [xid=%d]' (sid %d, %s)",
+ key, xid, sessionId, clientDescription)
+ false
+ }
+ }
+
+ def ackTransaction(key: String, xid: Int): Boolean = {
+ pendingRATransactions.remove(key, xid) match {
+ case true =>
+ log.debug("ack -> q=%s, xid=%d", key, xid)
+ queues.confirmRemove(key, xid)
+ true
+ case false =>
+ log.warning("Attempt to ack a non-existent transaction on '%s [xid=%d]' (sid %d, %s)",
+ key, xid, sessionId, clientDescription)
+ false
+ }
+ }
+
// returns true if a transaction was actually aborted.
def abortTransaction(key: String): Boolean = {
pendingTransactions.pop(key) match {
@@ -137,12 +194,14 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
def closeAllTransactions(key: String): Int = {
val xids = pendingTransactions.popAll(key)
xids.foreach { xid => queues.confirmRemove(key, xid) }
+ // TODO: Not implemented for pendingRATransactions
xids.size
}
// will do a continuous transactional fetch on a queue until time runs out or transactions are full.
final def monitorUntil(key: String, timeLimit: Time)(f: Option[QItem] => Unit) {
- if (timeLimit <= Time.now || pendingTransactions.size(key) >= maxOpenTransactions) {
+ if (timeLimit <= Time.now
+ || (pendingTransactions.size(key) + pendingRATransactions.size(key)) >= maxOpenTransactions) {
f(None)
} else {
queues.remove(key, Some(timeLimit), true, false) {
@@ -156,8 +215,28 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
}
}
+ def getItemSyn(key: String, timeout: Option[Time])(f: Option[QItem] => Unit) {
+ if (pendingTransactions.size(key) + pendingRATransactions.size(key) >= maxOpenTransactions) {
+ log.warning("Attempt to open too many transactions on '%s' (sid %d, %s)", key, sessionId,
+ clientDescription)
+ throw TooManyOpenTransactionsException
+ }
+
+ log.debug("get -> q=%s t=%s syn=true", key, timeout)
+ Stats.incr("cmd_get")
+
+ queues.remove(key, timeout, true, false) {
+ case None =>
+ f(None)
+ case Some(item) =>
+ log.debug("get <- %s", item)
+ pendingRATransactions.add(key, item.xid)
+ f(Some(item))
+ }
+ }
+
def getItem(key: String, timeout: Option[Time], opening: Boolean, peeking: Boolean)(f: Option[QItem] => Unit) {
- if (opening && pendingTransactions.size(key) >= maxOpenTransactions) {
+ if (opening && (pendingTransactions.size(key) + pendingRATransactions.size(key) >= maxOpenTransactions)) {
log.warning("Attempt to open too many transactions on '%s' (sid %d, %s)", key, sessionId,
clientDescription)
throw TooManyOpenTransactionsException
@@ -181,6 +260,7 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
protected def abortAnyTransaction() = {
pendingTransactions.cancelAll()
+ pendingRATransactions.cancelAll()
}
def setItem(key: String, flags: Int, expiry: Option[Time], data: Array[Byte]) = {
View
52 src/main/scala/net/lag/kestrel/MemcacheHandler.scala
@@ -48,6 +48,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
} else {
channel.write(new MemcacheResponse("ERROR"))
}
+
case "set" =>
val now = Time.now
val expiry = request.line(3).toInt
@@ -116,10 +117,14 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
private def get(name: String): Unit = {
var key = name
var timeout: Option[Time] = None
+ var xid: Int = -1
var closing = false
var opening = false
var aborting = false
var peeking = false
+ var ack = false
+ var fail = false
+ var syn = false
if (name contains '/') {
val options = name.split("/")
@@ -129,10 +134,17 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
if (opt startsWith "t=") {
timeout = Some(opt.substring(2).toInt.milliseconds.fromNow)
}
+ if (opt startsWith "id=") {
+ xid = opt.substring(3).toInt
+ }
if (opt == "close") closing = true
if (opt == "open") opening = true
if (opt == "abort") aborting = true
if (opt == "peek") peeking = true
+
+ if (opt == "syn") syn = true
+ if (opt == "ack") ack = true
+ if (opt == "fail") fail = true
}
}
@@ -142,15 +154,51 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
return
}
+ if ((syn || ack || fail) && (opening || closing || peeking || aborting)
+ || (ack && fail)) {
+ channel.write(new MemcacheResponse("CLIENT_ERROR"))
+ channel.close()
+ return
+ }
+
if (aborting) {
abortTransaction(key)
channel.write(new MemcacheResponse("END"))
+ } else if (fail) {
+ if (xid == -1) {
+ log.warning("Attempt to fail transaction without xid");
+ return
+ }
+ failTransaction(key, xid)
+ channel.write(new MemcacheResponse("END"))
} else {
if (closing) {
closeTransaction(key)
- if (!opening) channel.write(new MemcacheResponse("END"))
+ if (!opening && !syn) channel.write(new MemcacheResponse("END"))
}
- if (opening || !closing) {
+ if (ack) {
+ if (xid == -1) {
+ log.warning("Attempt to ack transaction without xid");
+ return
+ }
+ ackTransaction(key, xid)
+ if (!opening && !syn) channel.write(new MemcacheResponse("END"))
+ }
+ if (syn) {
+ try {
+ getItemSyn(key, timeout) {
+ case None =>
+ channel.write(new MemcacheResponse("END"))
+ case Some(item) =>
+ channel.write(new MemcacheResponse("VALUE %s 0 %d\nID %d".format(key, item.data.length, item.xid), item.data))
+ }
+ } catch {
+ case e: TooManyOpenTransactionsException =>
+ channel.write(new MemcacheResponse("ERROR"))
+ channel.close()
+ return
+ }
+ } else if (opening || (!closing && !ack)) {
if (pendingTransactions.size(key) > 0 && !peeking && !opening) {
log.warning("Attempt to perform a non-transactional fetch with an open transaction on " +
" '%s' (sid %d, %s)", key, sessionId, clientDescription)
Please sign in to comment.
Something went wrong with that request. Please try again.