Skip to content

Commit

Permalink
[split] Merge branch 'master' of http://git.local.twitter.com/birdcage
Browse files Browse the repository at this point in the history
…into b3receiver
  • Loading branch information
johanoskarsson committed Apr 27, 2011
2 parents 4c13a0c + 3d6b087 commit cca72e7
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 46 deletions.
Expand Up @@ -13,46 +13,60 @@ import org.jboss.netty.buffer.ChannelBuffer
import org.jboss.netty.handler.codec.http._

import com.twitter.finagle.ChannelBufferUsageException
import com.twitter.util.StorageUnit
import com.twitter.conversions.storage._

class ChannelBufferUsageTracker(
limit: StorageUnit,
statsReceiver: StatsReceiver = NullStatsReceiver
) {
private[this] object state {
var currentUsage = 0L
var maxUsage = 0L
var usageLimit = limit
}

class ChannelBufferUsageTracker(limit: Long, statsReceiver: StatsReceiver = NullStatsReceiver) {
private[this] var currentUsage = 0L
private[this] var maxUsage = 0L
private[this] var usageLimit = limit

// It is probably not necessary to use synchronized methods here. We can change this if there is a performance problem.
private[this] val currentUsageStat = statsReceiver.addGauge("channel_buffer_current_usage") { currentUsage() }
private[this] val maxUsageStat = statsReceiver.addGauge("channel_buffer_max_usage") { maxUsage() }
// It is probably not necessary to use synchronized methods here. We
// can change this if there is a performance problem.
private[this] val currentUsageStat =
statsReceiver.addGauge("channel_buffer_current_usage") { currentUsage.inBytes }
private[this] val maxUsageStat =
statsReceiver.addGauge("channel_buffer_max_usage") { maxUsage.inBytes }

def currentUsage(): Long = synchronized { currentUsage}
def currentUsage: StorageUnit = synchronized { state.currentUsage.bytes }

def maxUsage(): Long = synchronized { maxUsage }
def maxUsage: StorageUnit = synchronized { state.maxUsage.bytes }

def usageLimit(): Long = synchronized { usageLimit }
def usageLimit(): StorageUnit = synchronized { state.usageLimit }

def setUsageLimit(limit: Long) = synchronized { usageLimit = limit }
def setUsageLimit(limit: StorageUnit) = synchronized { state.usageLimit = limit }

def increase(size: Long) = synchronized {
if (currentUsage + size > usageLimit) {
if (state.currentUsage + size > state.usageLimit.inBytes) {
throw new ChannelBufferUsageException(
"Channel buffer usage exceeded limit (" + currentUsage + ", " + size + " vs. " + usageLimit + ")")
"Channel buffer usage exceeded limit ("
+ currentUsage + ", " + size + " vs. " + usageLimit + ")")
} else {
currentUsage += size
state.currentUsage += size
if (currentUsage > maxUsage)
maxUsage = currentUsage
state.maxUsage = state.currentUsage
}
}

def decrease(size: Long) = synchronized {
if (currentUsage < size) {
if (state.currentUsage < size) {
throw new ChannelBufferUsageException(
"invalid ChannelBufferUsageTracker decrease operation (" + size + " vs. " + currentUsage + ")")
"invalid ChannelBufferUsageTracker decrease operation ("
+ size + " vs. " + currentUsage + ")")
} else {
currentUsage -= size
state.currentUsage -= size
}
}
}

class ChannelBufferManager(usageTracker: ChannelBufferUsageTracker) extends SimpleChannelHandler {
class ChannelBufferManager(usageTracker: ChannelBufferUsageTracker)
extends SimpleChannelHandler
{
private[this] var bufferUsage = 0L

override def messageReceived(ctx: ChannelHandlerContext, e: MessageEvent) {
Expand Down
Expand Up @@ -10,6 +10,7 @@ import org.jboss.netty.handler.codec.http._
import org.jboss.netty.buffer.ChannelBuffers

import com.twitter.finagle.ChannelBufferUsageException
import com.twitter.conversions.storage._

object ChannelBufferManagerSpec extends Specification with Mockito {
val me = mock[MessageEvent]
Expand All @@ -25,71 +26,71 @@ object ChannelBufferManagerSpec extends Specification with Mockito {
}

"the channel buffer manager" should {
val usageTracker = new ChannelBufferUsageTracker(1000)
val usageTracker = new ChannelBufferUsageTracker(1000.bytes)
val handler = new ChannelBufferManager(usageTracker)
usageTracker.usageLimit must be_==(1000)
usageTracker.usageLimit must be_==(1000.bytes)

"track the capacity of the channel buffer" in {
makeGetMessage(256)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(256)
usageTracker.currentUsage must be_==(256.bytes)

makeGetMessage(512)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(768)
usageTracker.currentUsage must be_==(768.bytes)

handler.writeComplete(ctx, wce)
usageTracker.currentUsage() must be_==(0)
usageTracker.currentUsage must be_==(0.bytes)

makeGetMessage(128)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(128)
usageTracker.currentUsage must be_==(128.bytes)

handler.channelClosed(ctx, e)
usageTracker.currentUsage() must be_==(0)
usageTracker.maxUsage() must be_==(768)
usageTracker.currentUsage must be_==(0.bytes)
usageTracker.maxUsage must be_==(768.bytes)
}

"throw exception if usage exceeds limit at the beginning of the request" in {
usageTracker.setUsageLimit(10)
usageTracker.usageLimit() must be_==(10)
usageTracker.currentUsage() must be_==(0)
usageTracker.setUsageLimit(10.bytes)
usageTracker.usageLimit must be_==(10.bytes)
usageTracker.currentUsage must be_==(0.bytes)

makeGetMessage(20)
handler.messageReceived(ctx, me) must throwA[ChannelBufferUsageException]
usageTracker.currentUsage() must be_==(0)
usageTracker.currentUsage must be_==(0.bytes)

handler.channelClosed(ctx, e)
usageTracker.currentUsage() must be_==(0)
usageTracker.maxUsage() must be_==(0)
usageTracker.currentUsage must be_==(0.bytes)
usageTracker.maxUsage must be_==(0.bytes)
}

"throw exception if usage exceeds limit in the middle of the request" in {
usageTracker.setUsageLimit(300)
usageTracker.usageLimit() must be_==(300)
usageTracker.currentUsage() must be_==(0)
usageTracker.setUsageLimit(300.bytes)
usageTracker.usageLimit must be_==(300.bytes)
usageTracker.currentUsage must be_==(0.bytes)

makeGetMessage(100)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(100)
usageTracker.currentUsage must be_==(100.bytes)

makeGetMessage(350)
handler.messageReceived(ctx, me) must throwA[ChannelBufferUsageException]
usageTracker.currentUsage() must be_==(100)
usageTracker.maxUsage() must be_==(100)
usageTracker.currentUsage must be_==(100.bytes)
usageTracker.maxUsage must be_==(100.bytes)

makeGetMessage(50)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(150)
usageTracker.maxUsage() must be_==(150)
usageTracker.currentUsage must be_==(150.bytes)
usageTracker.maxUsage must be_==(150.bytes)

makeGetMessage(150)
handler.messageReceived(ctx, me)
usageTracker.currentUsage() must be_==(300)
usageTracker.currentUsage must be_==(300.bytes)

handler.channelClosed(ctx, e)
usageTracker.currentUsage() must be_==(0)
usageTracker.maxUsage() must be_==(300)
usageTracker.currentUsage must be_==(0.bytes)
usageTracker.maxUsage must be_==(300.bytes)
}
}
}
Expand Up @@ -6,7 +6,7 @@ import org.apache.thrift.protocol.TProtocolFactory
import com.twitter.finagle.Codec

class ThriftClientBufferedCodec(protocolFactory: TProtocolFactory)
extends ThriftClientFramedCodec
extends ThriftClientFramedCodec(protocolFactory)
{
override def pipelineFactory = {
val framedPipelineFactory = super.pipelineFactory
Expand Down

0 comments on commit cca72e7

Please sign in to comment.