Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
First cut at integrating transactions from Jonathan.
Browse files Browse the repository at this point in the history
  • Loading branch information
John Kalucki committed Nov 29, 2010
1 parent e67816e commit 39e3f27
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 24 deletions.
2 changes: 1 addition & 1 deletion ivy/ivy.xml
@@ -1,7 +1,7 @@
<ivy-module version="1.0" xmlns:e="http://ant.apache.org/ivy/extra">
<info organisation="com.twitter"
module="grabbyhands"
revision="1.1"
revision="1.2"
e:buildpackage="com.twitter.grabbyhands"
e:testclass="com.twitter.grabbyhands.TestRunner"
e:stresstestclass="com.twitter.grabbyhands.StressTestRunner"
Expand Down
6 changes: 2 additions & 4 deletions src/main/scala/com/twitter/grabbyhands/ConnectionRecv.scala
Expand Up @@ -37,7 +37,7 @@ protected[grabbyhands] class ConnectionRecv(
val command = new StringBuffer("get ")
command.append(queueName)
command.append("/t=").append(grabbyHands.config.kestrelReadTimeoutMs)
if(queue.transactional) command.append("/close/open")
if (queue.transactional) command.append("/close/open")
command.append("\r\n")

val abortCommand = new StringBuffer("get ")
Expand Down Expand Up @@ -197,9 +197,7 @@ protected[grabbyhands] class ConnectionRecv(
def abortRead() {
// Send request
abortRequest.rewind()
if (!writeBuffer(abortRequest)) {
return false
}
writeBuffer(abortRequest)
}

// readTimeoutMs is assumed to be the saftey factor accounting for typical glitches, etc.
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/grabbyhands/GrabbyHands.scala
Expand Up @@ -48,13 +48,13 @@ class GrabbyHands(val config: Config) {

/** Returns an internal queue that delivers new messages from Kestrel. */
def getRecvQueue(queue: String): BlockingQueue[ByteBuffer] = {
if(config.recvTransactional) throw new IllegalStateException("Transactional read set")
if (config.recvTransactional) throw new IllegalStateException("Transactional read set")
queues(queue).recvQueue
}

/** Returns an internal queue that delivers new messages from Kestrel. */
def getRecvTransQueue(queue: String): BlockingQueue[Read] = {
if (!config.recvTransactional) throw new IllegalStateException("No transactional read")
if (!config.recvTransactional) throw new IllegalStateException("Transactional read not set")
queues(queue).transRecvQueue
}

Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/com/twitter/grabbyhands/Read.scala
Expand Up @@ -20,7 +20,7 @@ import java.nio.ByteBuffer
import java.util.concurrent.{CountDownLatch, TimeUnit}

/** Wraps outgoing messages. */
class Read(val message: ByteBuffer, val connection:ConnectionRecv )
class Read(val message: ByteBuffer, val connection: ConnectionRecv)
{
def this(str: String) = this(ByteBuffer.wrap(str.getBytes()), null)
def this(bytes: Array[Byte]) = this(ByteBuffer.wrap(bytes), null)
Expand All @@ -30,7 +30,6 @@ class Read(val message: ByteBuffer, val connection:ConnectionRecv )
def getMessage(): ByteBuffer = message
def getConnection(): ConnectionRecv = connection


def awaitComplete(timeout: Int, units: TimeUnit) {
completedLatch.await(timeout, units)
}
Expand All @@ -42,12 +41,15 @@ class Read(val message: ByteBuffer, val connection:ConnectionRecv )

/** Returns true if transaction is completed or aborted */
def completed(): Boolean = {
completedLatch.getCount == 0
completedLatch.getCount() == 0
}

protected[grabbyhands] def close(success:Boolean) {
if(success) completedLatch.countDown()
else abort()
if (success) {
completedLatch.countDown()
} else {
abort()
}
}

/** Cancels a write waiting in the local queue. */
Expand All @@ -60,5 +62,4 @@ class Read(val message: ByteBuffer, val connection:ConnectionRecv )
def cancelled(): Boolean = {
abortLatch.getCount == 0
}

}
4 changes: 2 additions & 2 deletions src/main/scala/com/twitter/grabbyhands/Write.scala
Expand Up @@ -21,8 +21,8 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}

/** Wraps outgoing messages. */
class Write(val message: ByteBuffer, val watcher: Boolean => Unit) {
def this(read:Read ) = this(read.message, read.close(_:Boolean))
def this(message:ByteBuffer ) = this(message, (Boolean) => false)
def this(read:Read) = this(read.message, read.close(_:Boolean))
def this(message:ByteBuffer) = this(message, (Boolean) => false)
def this(str: String) = this(ByteBuffer.wrap(str.getBytes()))
def this(bytes: Array[Byte]) = this(ByteBuffer.wrap(bytes))
protected val writtenLatch = new CountDownLatch(1)
Expand Down
2 changes: 1 addition & 1 deletion src/test/scala/com/twitter/grabbyhands/LifecycleSpec.scala
Expand Up @@ -105,7 +105,7 @@ object LifecycleSpec extends SpecBase(3) {
config.addServer(host + ":" + port)
config.sendNumConnections = 1
config.recvTransactional = true;
config.addQueues(queues.slice(0, 1).force)
config.addQueues(queues.slice(0, 1).toArray)
config.queues.size must be_==(1)

grab = new GrabbyHands(config)
Expand Down
Expand Up @@ -21,9 +21,10 @@ import java.util.concurrent.{BlockingQueue,TimeUnit}
object TransactionalSpecStress extends SpecBase(50) {

def transactionalStress(testMessages:Int, testNumQueues: Int,
testConnectionsPerServer: Int, testLength: Int, serial: Boolean) {
log.fine("testNumQueues " + testNumQueues + " testConnectionsPerServer " +
testConnectionsPerServer + " serial " + serial)
testConnectionsPerServer: Int, testLength: Int, serial: Boolean) {
log.fine(
"testNumQueues " + testNumQueues + " testConnectionsPerServer " + testConnectionsPerServer +
" serial " + serial)
testMessages % testNumQueues must be_==(0)
config = new Config()
config.addServer(host + ":" + port)
Expand All @@ -33,7 +34,7 @@ object TransactionalSpecStress extends SpecBase(50) {
config.reconnectHolddownMs = 50
config.recvTransactional = true
testNumQueues must be_<=(numQueues)
val testQueues = queues.slice(0, testNumQueues).force
val testQueues = queues.slice(0, testNumQueues).toArray
val queueConfig = config.addQueues(testQueues)

ctor()
Expand Down Expand Up @@ -63,8 +64,9 @@ object TransactionalSpecStress extends SpecBase(50) {
}
var endMs = System.currentTimeMillis
var deltaMs = endMs - startMs
log.info("enqueue " + testMessages + " in " + deltaMs + " ms, " +
((1000*testMessages) / deltaMs) + " messages/sec")
log.info(
"enqueue " + testMessages + " in " + deltaMs + " ms, " + ((1000*testMessages) / deltaMs) +
" messages/sec")

startMs = System.currentTimeMillis
for (idx <- 1 to testMessages) {
Expand All @@ -79,8 +81,9 @@ object TransactionalSpecStress extends SpecBase(50) {
endMs = System.currentTimeMillis
deltaMs = endMs- startMs

log.info("dequeue " + testMessages + " in " + deltaMs + " ms, " +
((1000*testMessages) / deltaMs) + " messages/sec")
log.info(
"dequeue " + testMessages + " in " + deltaMs + " ms, " + ((1000*testMessages) / deltaMs) +
" messages/sec")
}

"basicstress" should {
Expand Down

0 comments on commit 39e3f27

Please sign in to comment.