Skip to content

Commit

Permalink
util-core: Move chunked and framed to BufReader
Browse files Browse the repository at this point in the history
Problem/Solution:

Reader.chunked and Reader.framed are Buf specific, move
them to BufReader.

JIRA Issues: CSL-6756

Differential Revision: https://phabricator.twitter.biz/D392198
  • Loading branch information
yufangong authored and jenkins committed Oct 31, 2019
1 parent 5e73766 commit 459038f
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 190 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.rst
Expand Up @@ -32,6 +32,9 @@ Breaking API Changes
Replace `c.t.io.Reader.readAll` with `Reader.readAllItems`, the new API consumes a generic Reader[T],
and return a Seq of items. ``PHAB_ID=D391346``

* util-core: Moved `c.t.io.Reader.chunked` to `c.t.io.BufReader.chunked`, and `Reader.framed` to
`BufReader.framed`. ``PHAB_ID=D392198``

19.10.0
-------

Expand Down
74 changes: 74 additions & 0 deletions util-core/src/main/scala/com/twitter/io/BufReader.scala
Expand Up @@ -2,6 +2,8 @@ package com.twitter.io

import com.twitter.util.Future
import java.util.NoSuchElementException
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

object BufReader {

Expand Down Expand Up @@ -49,4 +51,76 @@ object BufReader {

loop(Buf.Empty)
}

// see BufReader.chunked
private final class ChunkedFramer(chunkSize: Int) extends (Buf => Seq[Buf]) {
require(chunkSize > 0, s"chunkSize should be > 0 but was $chunkSize")

@tailrec
private def loop(acc: ListBuffer[Buf], in: Buf): Seq[Buf] = {
if (in.length < chunkSize) (acc :+ in).toSeq
else {
loop(
acc :+ in.slice(0, chunkSize),
in.slice(chunkSize, in.length)
)
}
}

def apply(in: Buf): Seq[Buf] = {
loop(ListBuffer(), in)
}
}

// see BufReader.framed
private final class Framed(r: Reader[Buf], framer: Buf => Seq[Buf])
extends Reader[Buf]
with (Option[Buf] => Future[Option[Buf]]) {

private[this] var frames: Seq[Buf] = Nil

// we only enter here when `frames` is empty.
def apply(in: Option[Buf]): Future[Option[Buf]] = synchronized {
in match {
case Some(data) =>
frames = framer(data)
read()
case None =>
Future.None
}
}

def read(): Future[Option[Buf]] = synchronized {
if (frames.isEmpty) {
// flatMap to `this` to prevent allocating
r.read().flatMap(this)
} else {
val nextFrame = frames.head
frames = frames.tail
Future.value(Some(nextFrame))
}
}

def discard(): Unit = synchronized {
frames = Seq.empty
r.discard()
}

def onClose: Future[StreamTermination] = r.onClose
}

/**
* Chunk the output of a given [[Reader]] by at most `chunkSize` (bytes). This consumes the
* reader.
*/
def chunked(r: Reader[Buf], chunkSize: Int): Reader[Buf] =
new Framed(r, new ChunkedFramer(chunkSize))

/**
* Wraps a [[ Reader[Buf] ]] and emits frames as decided by `framer`.
*
* @note The returned `Reader` may not be thread safe depending on the behavior
* of the framer.
*/
def framed(r: Reader[Buf], framer: Buf => Seq[Buf]): Reader[Buf] = new Framed(r, framer)
}
74 changes: 0 additions & 74 deletions util-core/src/main/scala/com/twitter/io/Reader.scala
Expand Up @@ -4,8 +4,6 @@ import com.twitter.concurrent.AsyncStream
import com.twitter.util.Promise.Detachable
import com.twitter.util._
import java.io.{File, FileInputStream, InputStream}
import scala.annotation.tailrec
import scala.collection.mutable.ListBuffer

/**
* A reader exposes a pull-based API to model a potentially infinite stream of arbitrary elements.
Expand Down Expand Up @@ -172,63 +170,6 @@ object Reader {
def onClose: Future[StreamTermination] = closep
}

// see Reader.chunked
private final class ChunkedFramer(chunkSize: Int) extends (Buf => Seq[Buf]) {
require(chunkSize > 0, s"chunkSize should be > 0 but was $chunkSize")

@tailrec
private def loop(acc: ListBuffer[Buf], in: Buf): Seq[Buf] = {
if (in.length < chunkSize) (acc :+ in).toSeq
else {
loop(
acc :+ in.slice(0, chunkSize),
in.slice(chunkSize, in.length)
)
}
}

def apply(in: Buf): Seq[Buf] = {
loop(ListBuffer(), in)
}
}

// see Reader.framed
private final class Framed(r: Reader[Buf], framer: Buf => Seq[Buf])
extends Reader[Buf]
with (Option[Buf] => Future[Option[Buf]]) {

private[this] var frames: Seq[Buf] = Nil

// we only enter here when `frames` is empty.
def apply(in: Option[Buf]): Future[Option[Buf]] = synchronized {
in match {
case Some(data) =>
frames = framer(data)
read()
case None =>
Future.None
}
}

def read(): Future[Option[Buf]] = synchronized {
if (frames.isEmpty) {
// flatMap to `this` to prevent allocating
r.read().flatMap(this)
} else {
val nextFrame = frames.head
frames = frames.tail
Future.value(Some(nextFrame))
}
}

def discard(): Unit = synchronized {
frames = Seq.empty
r.discard()
}

def onClose: Future[StreamTermination] = r.onClose
}

/**
* Construct a `Reader` from a `Future`
*
Expand Down Expand Up @@ -262,13 +203,6 @@ object Reader {
loop()
}

/**
* Chunk the output of a given [[Reader]] by at most `chunkSize` (bytes). This consumes the
* reader.
*/
def chunked(r: Reader[Buf], chunkSize: Int): Reader[Buf] =
new Framed(r, new ChunkedFramer(chunkSize))

/**
* Create a new [[Reader]] from a given [[Buf]]. The output of a returned reader is chunked by
* a least `chunkSize` (bytes).
Expand Down Expand Up @@ -504,12 +438,4 @@ object Reader {
p.setInterruptHandler { case _ => r.discard() }
p
}

/**
* Wraps a [[ Reader[Buf] ]] and emits frames as decided by `framer`.
*
* @note The returned `Reader` may not be thread safe depending on the behavior
* of the framer.
*/
def framed(r: Reader[Buf], framer: Buf => Seq[Buf]): Reader[Buf] = new Framed(r, framer)
}
139 changes: 109 additions & 30 deletions util-core/src/test/scala/com/twitter/io/BufReaderTest.scala
@@ -1,69 +1,148 @@
package com.twitter.io

import com.twitter.conversions.DurationOps._
import com.twitter.conversions.StorageUnitOps._
import com.twitter.util.{Await, Future}
import org.scalacheck.Prop.forAll
import org.scalacheck.Gen
import org.scalacheck.{Arbitrary, Gen}
import org.scalatest.FunSuite
import org.scalatestplus.scalacheck.Checkers
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
import scala.annotation.tailrec

class BufReaderTest extends FunSuite with Checkers {
class BufReaderTest extends FunSuite with ScalaCheckDrivenPropertyChecks {

private def await[A](f: Future[A]): A = Await.result(f, 5.seconds)

test("BufReader") {
check { bytes: String =>
forAll { bytes: String =>
val buf = Buf.Utf8(bytes)
val r = Reader.fromBuf(buf, 8)
await(BufReader.readAll(r)) == buf
assert(await(BufReader.readAll(r)) == buf)
}
}

test("BufReader - discard") {
val alphaString = for {
val stringAndChunk = for {
s <- Gen.alphaStr
} yield s
val positiveInt = for {
n <- Gen.posNum[Int]
} yield n
i <- Gen.posNum[Int].suchThat(_ <= s.length)
} yield (s, i)

check {
forAll(alphaString, positiveInt) { (bytes, n) =>
forAll(stringAndChunk) {
case (bytes, n) =>
val r = Reader.fromBuf(Buf.Utf8(bytes), n)
r.discard()

bytes.length == 0 ||
Await
.ready(r.read(), 5.seconds).poll.exists(
_.throwable.isInstanceOf[ReaderDiscardedException]
)
}
assert(
bytes.length == 0 ||
Await
.ready(r.read(), 5.seconds).poll
.exists(_.throwable.isInstanceOf[ReaderDiscardedException]))
}
}

test("BufReader - iterator") {
val alphaString = for {
val stringAndChunk = for {
s <- Gen.alphaStr
} yield s
val positiveInt = for {
n <- Gen.posNum[Int]
} yield n
i <- Gen.posNum[Int].suchThat(_ <= s.length)
} yield (s, i)

check {
forAll(alphaString, positiveInt) { (bytes, n) =>
forAll(stringAndChunk) {
case (bytes, n) =>
var result = Buf.Empty
val iterator = BufReader.iterator(Buf.Utf8(bytes), n)
while (iterator.hasNext) {
result = result.concat(iterator.next)
}
Buf.Utf8(bytes) == result
}
assert(Buf.Utf8(bytes) == result)
}
}

test("BufReader - readAll") {
check { bytes: String =>
forAll { bytes: String =>
val r = Reader.fromBuf(Buf.Utf8(bytes))
await(BufReader.readAll(r)) == Buf.Utf8(bytes)
assert(await(BufReader.readAll(r)) == Buf.Utf8(bytes))
}
}

test("BufReader.chunked") {
val stringAndChunk = for {
s <- Gen.alphaStr
i <- Gen.posNum[Int].suchThat(_ <= s.length)
} yield (s, i)

forAll(stringAndChunk) {
case (s, i) =>
val r = BufReader.chunked(Reader.fromBuf(Buf.Utf8(s), 32), i)

def readLoop(): Unit = await(r.read()) match {
case Some(b) =>
assert(b.length <= i)
readLoop()
case None => ()
}

readLoop()
}
}

test("BufReader.framed reads framed data") {
val getByteArrays: Gen[Seq[Buf]] = Gen.listOf(
for {
// limit arrays to a few kilobytes, otherwise we may generate a very large amount of data
numBytes <- Gen.choose(0.bytes.inBytes, 2.kilobytes.inBytes)
bytes <- Gen.containerOfN[Array, Byte](numBytes.toInt, Arbitrary.arbitrary[Byte])
} yield Buf.ByteArray.Owned(bytes)
)

forAll(getByteArrays) { buffers: Seq[Buf] =>
val buffersWithLength = buffers.map(buf => Buf.U32BE(buf.length).concat(buf))

val r = BufReader.framed(BufReader(Buf(buffersWithLength)), new BufReaderTest.U32BEFramer())

// read all of the frames
buffers.foreach { buf =>
assert(await(r.read()).contains(buf))
}

// make sure the reader signals EOF
assert(await(r.read()).isEmpty)
}
}

test("BufReader.framed reads empty frames") {
val r = BufReader.framed(BufReader(Buf.U32BE(0)), new BufReaderTest.U32BEFramer())
assert(await(r.read()).contains(Buf.Empty))
assert(await(r.read()).isEmpty)
}

test("Framed works on non-list collections") {
val r = BufReader.framed(BufReader(Buf.U32BE(0)), i => Vector(i))
assert(await(r.read()).isDefined)
}
}

object BufReaderTest {

/**
* Used to test BufReader.framed, extract fields in terms of
* frames, signified by a 32-bit BE value preceding
* each frame.
*/
private class U32BEFramer() extends (Buf => Seq[Buf]) {
var state: Buf = Buf.Empty

@tailrec
private def loop(acc: Seq[Buf], buf: Buf): Seq[Buf] = {
buf match {
case Buf.U32BE(l, d) if d.length >= l =>
loop(acc :+ d.slice(0, l), d.slice(l, d.length))
case _ =>
state = buf
acc
}
}

def apply(buf: Buf): Seq[Buf] = synchronized {
loop(Seq.empty, state concat buf)
}
}
}

0 comments on commit 459038f

Please sign in to comment.