Permalink
Browse files

auto-transaction-fail on session

  • Loading branch information...
1 parent f264c80 commit 411095f930c0235fed4534dae39a436b97180f1a Jason Jackson committed Sep 22, 2011
Showing with 60 additions and 27 deletions.
  1. +60 −27 src/main/scala/com/twitter/kestrelthrift/KestrelthriftServiceImpl.scala
@@ -1,6 +1,6 @@
package com.twitter.kestrelthrift
-import scala.collection.mutable.Map
+import scala.collection.mutable
import scala.collection.Set
import com.twitter.util._
import config._
@@ -13,60 +13,93 @@ import com.twitter.finagle.{Service}
-class KestrelthriftServiceImpl(qs: QueueCollection) extends KestrelthriftService{
- println("New Connection")
+class KestrelthriftServiceImpl(queues: QueueCollection) extends KestrelthriftService{
+ object pendingRATransactions { // pending Random Access Transactions.
+ // used for syn, ack, fail
+ private val transactions = new mutable.HashMap[String, mutable.HashSet[Int]] {
+ override def default(key: String) = {
+ val rv = new mutable.HashSet[Int]
+ this(key) = rv
+ rv
+ }
+ }
- def release() {
- println("Close Connection")
+ def remove(name: String, xid: Int): Boolean = synchronized {
+ transactions(name).remove(xid)
+ }
+ def add(name: String, xid: Int) = synchronized {
+ transactions(name) += xid
+ }
+
+ def size(name: String): Int = synchronized { transactions(name).size }
+
+ def cancelAll() {
+ println("cancelAll")
+ synchronized {
+ transactions.foreach { case (name, xids) =>
+ xids.foreach { xid => queues.unremove(name, xid) }
+ }
+ transactions.clear()
+ }
+ }
}
- def get(queueName: String, transaction: Boolean) = {
- qs.remove(queueName, None, transaction).map { item =>
+ println("New Session")
+
+ def release() {
+ pendingRATransactions.cancelAll()
+ println("Session Ended")
+ }
+
+ def get(key: String, transaction: Boolean) = {
+ queues.remove(key, None, transaction).map { item =>
item match {
case None => null
- case Some(item) => new Item(ByteBuffer.wrap(item.data), item.xid)
+ case Some(item) => {
+ pendingRATransactions.add(key, item.xid)
+ new Item(ByteBuffer.wrap(item.data), item.xid)
+ }
}
}
}
- def multiget(queueName: String, maxItems: Int, transaction: Boolean) = {
+ def multiget(key: String, maxItems: Int, transaction: Boolean) = {
val futureList = for(i <- 1 to maxItems)
- yield qs.remove(queueName, None, transaction).map { item =>
- item match {
- case None => null
- case Some(item) => new Item(ByteBuffer.wrap(item.data), item.xid)
- }
- }
+ yield get(key, transaction)
val agg = Future.collect(futureList.toSeq)
agg.map(seq => seq.filter(_ != null))
}
- def put(queueName: String, item: ByteBuffer) = {
- qs.add(queueName, item.array)
+ def put(key: String, item: ByteBuffer) = {
+ queues.add(key, item.array)
Future.void
}
- def multiput(queueName: String, items: Seq[ByteBuffer]) = {
+ def multiput(key: String, items: Seq[ByteBuffer]) = {
for(i <- items)
- qs.add(queueName, i.array)
+ queues.add(key, i.array)
Future.void
}
- def ack(queueName: String, xids: Set[Int]) = {
- for(id <- xids)
- qs.confirmRemove(queueName, id)
+ def ack(key: String, xids: Set[Int]) = {
+ for(xid <- xids) {
+ pendingRATransactions.remove(key, xid)
+ queues.confirmRemove(key, xid)
+ }
Future.void
}
- def fail(queueName: String, xids: Set[Int]) = {
- for(id <- xids)
- qs.unremove(queueName, id)
+ def fail(key: String, xids: Set[Int]) = {
+ for(xid <- xids) {
+ pendingRATransactions.remove(key, xid)
+ queues.unremove(key, xid)
+ }
Future.void
}
- def flush(queueName: String) = {
- qs.flush(queueName)
+ def flush(key: String) = {
+ queues.flush(key)
Future.void
}

0 comments on commit 411095f

Please sign in to comment.