Skip to content
Permalink
Browse files

Small improvements to the stream decoding algo for framed buffers.

  • Loading branch information...
jtownson committed Jun 16, 2019
1 parent 1aef0fb commit 99077a4472d79bb3fd81964efc8a5bb043921125
@@ -61,6 +61,7 @@ object scalanet extends ScoverageModule with PublishModule {
object test extends ScoverageTests {
def ivyDeps = Agg(
ivy"org.scalatest::scalatest:3.0.5",
ivy"org.scalacheck::scalacheck:1.14.0",
ivy"ch.qos.logback:logback-core:1.2.3",
ivy"ch.qos.logback:logback-classic:1.2.3"
)
@@ -4,19 +4,18 @@ import java.nio.ByteBuffer

import io.iohk.scalanet.format.StreamDecoder1.State._
import io.iohk.scalanet.format.StreamDecoder1._
import io.netty.buffer.{CompositeByteBuf, Unpooled}

import scala.collection.mutable

class StreamDecoder1 {
val cb: CompositeByteBuf = Unpooled.compositeBuffer()

var state: State = LengthExpected
var length: Int = 0
var db: ByteBuffer = null

val nlb = ByteBuffer.allocate(4)

def fmfn(b: ByteBuffer): Seq[ByteBuffer] = {
def streamDecode(b: ByteBuffer): Seq[ByteBuffer] = {

val s = mutable.ListBuffer[ByteBuffer]()
while (b.remaining() > 0) {
@@ -27,23 +26,22 @@ class StreamDecoder1 {
nlb.put(b.get())
}
if (nlb.position() == 4) {
// nlb.position(0)
length = nlb.getInt(0)
nlb.clear()
db = ByteBuffer.allocate(length)
state = BytesExpected
}

// else
case BytesExpected =>

// we expect a length field, N and there are 4 or more bytes remaining
// we expect a length field, N and there are less than 4 bytes remaining

// we expect N bytes and there are M = N remaining

// we expect N bytes and there are M < N remaining

// we expect N bytes and there are M > N remaining
val remainingBytes = length - db.position()
if (b.remaining() >= remainingBytes) {
read(remainingBytes, b, db)
db.position(0)
s += db
state = LengthExpected
} else { // (b.remaining() < remainingBytes)
read(b.remaining(), b, db)
}
}
}
s
@@ -62,4 +60,11 @@ object StreamDecoder1 {

}

}
private[StreamDecoder1] def read(n: Int, from: ByteBuffer, to: ByteBuffer): Unit = {
var m: Int = 0
while (m < n) {
to.put(from.get())
m += 1
}
}
}
@@ -8,14 +8,18 @@ import org.scalatest.Matchers._

import scala.util.Random

// TODO this could all be a single property test
// just need to simulate TCP's fixed size write
// buffer (poss for arbitrary write buffer size)
// whose contents are sent whenever full.
class StreamDecoder1Spec extends FlatSpec {

behavior of "StreamDecoder1"

it should "handle an empty buffer" in {
val d = new StreamDecoder1

val buffers: Seq[ByteBuffer] = d.fmfn(buffFromBytes())
val buffers: Seq[ByteBuffer] = d.streamDecode(buffFromBytes())

buffers shouldBe Seq.empty
d.state shouldBe StreamDecoder1.State.LengthExpected
@@ -24,7 +28,7 @@ class StreamDecoder1Spec extends FlatSpec {
it should "handle a one byte buffer" in {
val d = new StreamDecoder1

val buffers: Seq[ByteBuffer] = d.fmfn(buffFromBytes(0.toByte))
val buffers: Seq[ByteBuffer] = d.streamDecode(buffFromBytes(0.toByte))

buffers shouldBe Seq.empty
d.nlb.get(0) shouldBe 0.toByte
@@ -34,7 +38,7 @@ class StreamDecoder1Spec extends FlatSpec {
it should "handle a two byte buffer" in {
val d = new StreamDecoder1

val buffers: Seq[ByteBuffer] = d.fmfn(buffFromBytes(0.toByte, 1.toByte))
val buffers: Seq[ByteBuffer] = d.streamDecode(buffFromBytes(0.toByte, 1.toByte))

buffers shouldBe Seq.empty
d.nlb.get(0) shouldBe 0.toByte
@@ -45,7 +49,7 @@ class StreamDecoder1Spec extends FlatSpec {
it should "handle a four byte buffer" in {
val d = new StreamDecoder1

val buffers: Seq[ByteBuffer] = d.fmfn(buffFrom(1))
val buffers: Seq[ByteBuffer] = d.streamDecode(buffFrom(1))

buffers shouldBe Seq.empty
d.length shouldBe 1
@@ -55,16 +59,54 @@ class StreamDecoder1Spec extends FlatSpec {
it should "handle a buffer with less than a complete message" in {
val d = new StreamDecoder1

val buffers: Seq[ByteBuffer] = d.fmfn(generatePartMessage(1024))
val buffers: Seq[ByteBuffer] = d.streamDecode(generatePartMessage(1024))

buffers shouldBe Seq.empty
d.length shouldBe 1024
}

it should "handle a buffer with a complete message" in {
val d = new StreamDecoder1
val message = generateMessage(1024)

val buffers: Seq[ByteBuffer] = d.streamDecode(message)

buffers shouldBe Seq(subset(4, 1028, message))
}

it should "handle a buffer with a complete message plus a bit" in {
val d = new StreamDecoder1
val message = generateMessagePlus(1024, 512)

val buffers: Seq[ByteBuffer] = d.streamDecode(message)

buffers shouldBe Seq(subset(4, 1028, message))
}

it should "handle a buffer with a complete message and a bit more" in {}
it should "handle a message split over several packets" in {
val d = new StreamDecoder1
val sourceMessage = generateMessage(12)

val packets = split(sourceMessage, 4)
val decode0 = d.streamDecode(packets(0))
val decode1 = d.streamDecode(packets(1))
val decode2 = d.streamDecode(packets(2))
val decode3 = d.streamDecode(packets(3))
val decode4 = d.streamDecode(ByteBuffer.allocate(0))

decode0 shouldBe Seq.empty
decode1 shouldBe Seq.empty
decode2 shouldBe Seq.empty
decode3 shouldBe Seq(subset(4, 16, sourceMessage))
decode4 shouldBe Seq.empty
}

it should "handle a buffer with two complete messages and a bit more" in {}
private def subset(start: Int, end: Int, b: ByteBuffer): ByteBuffer = {
b.position(0)
val bytes = NetUtils.toArray(b)
val slice = bytes.slice(start, end)
ByteBuffer.wrap(slice)
}

private def buffFrom(i: Int): ByteBuffer = {
val bb = ByteBuffer.allocate(4)
@@ -89,13 +131,25 @@ class StreamDecoder1Spec extends FlatSpec {
bb
}

private def generateMessage(messageLength: Int, garbageLength: Int): ByteBuffer = {
val bb = ByteBuffer.allocate(4 + messageLength + garbageLength)
private def generateMessage(messageLength: Int): ByteBuffer = {
val bb = ByteBuffer.allocate(4 + messageLength)
bb.putInt(messageLength)
bb.put(NetUtils.randomBytes(messageLength))
bb.clear()
bb
}

private def generateMessagePlus(messageLength: Int, garbageLength: Int): ByteBuffer = {
val bb = ByteBuffer.allocate(4 + messageLength + 4 + garbageLength - 1)
bb.putInt(messageLength)
bb.put(NetUtils.randomBytes(messageLength))
bb.putInt(garbageLength)
bb.put(NetUtils.randomBytes(garbageLength))
bb.put(NetUtils.randomBytes(garbageLength - 1))
bb.clear()
bb
}

private def split(buffer: ByteBuffer, packetSize: Int): Seq[ByteBuffer] = {
buffer.array().grouped(packetSize).map(chunk => ByteBuffer.wrap(chunk)).toSeq
}
}

0 comments on commit 99077a4

Please sign in to comment.
You can’t perform that action at this time.