Skip to content

Commit

Permalink
validate actual bytes written in MultiSetSend
Browse files Browse the repository at this point in the history
  • Loading branch information
Jun Rao committed May 23, 2011
1 parent 0a54ad4 commit ffa2cf2
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
19 changes: 14 additions & 5 deletions core/src/main/scala/kafka/network/Transmission.scala
Expand Up @@ -84,18 +84,27 @@ private[kafka] trait Send extends Transmission {
/**
* A set of composite sends, sent one after another
*/
class MultiSend[S <: Send](val sends: List[S]) extends Send {

abstract class MultiSend[S <: Send](val sends: List[S]) extends Send {
val expectedBytesToWrite: Int
private var current = sends

var totalWritten = 0

def writeTo(channel: WritableByteChannel): Int = {
expectIncomplete
val written = current.head.writeTo(channel)
totalWritten += written
if(current.head.complete)
current = current.tail
written
}

def complete = current == Nil

def complete: Boolean = {
if (current == Nil) {
if (totalWritten != expectedBytesToWrite)
logger.error("mismatch in sending bytes over socket; expected: " + expectedBytesToWrite + " actual: " + totalWritten)
return true
}
else
return false
}
}
4 changes: 3 additions & 1 deletion core/src/main/scala/kafka/server/MultiMessageSetSend.scala
Expand Up @@ -29,7 +29,9 @@ import kafka.utils._
private[server] class MultiMessageSetSend(val sets: List[MessageSetSend]) extends MultiSend(new ByteBufferSend(6) :: sets) {

val buffer = this.sends.head.asInstanceOf[ByteBufferSend].buffer
buffer.putInt(2 + sets.foldLeft(0)(_ + _.sendSize))
val allMessageSetSize: Int = sets.foldLeft(0)(_ + _.sendSize)
val expectedBytesToWrite: Int = 4 + 2 + allMessageSetSize
buffer.putInt(2 + allMessageSetSize)
buffer.putShort(0)
buffer.rewind()

Expand Down

0 comments on commit ffa2cf2

Please sign in to comment.