Skip to content

Commit

Permalink
util-core: Introduce Reader.readAllItems for generic types
Browse files Browse the repository at this point in the history
Problem:

Reader.readAll was Buf biased, and it should be in BufReader.

Solution:

Move `readAll(r: Reader[Buf])` to `BufReader`, while introduce
`readAllItems[A](r: Reader[A])` to the `Reader` object.

JIRA Issues: CSL-8869

Differential Revision: https://phabricator.twitter.biz/D391346
  • Loading branch information
yufangong authored and jenkins committed Oct 30, 2019
1 parent e6970ed commit a47a219
Show file tree
Hide file tree
Showing 11 changed files with 113 additions and 59 deletions.
12 changes: 9 additions & 3 deletions CHANGELOG.rst
Expand Up @@ -7,12 +7,11 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com
Unreleased
----------

* util: Add initial support for JDK 11 compatibility. ``PHAB_ID=D365075``


New Features
~~~~~~~~~~~~

* util: Add initial support for JDK 11 compatibility. ``PHAB_ID=D365075``

* util-core: Created public method Closable.stopCollectClosablesThread that stops CollectClosables
thread. ``PHAB_ID=D382800``

Expand All @@ -21,6 +20,13 @@ Runtime Behavior Changes

* util: Upgrade to caffeine 2.8.0 ``PHAB_ID=D384592``

Breaking API Changes
~~~~~~~~~~~~~~~~~~~~

* util-core: Add `c.t.io.BufReader.readAll` to consume a `Reader[Buf]` and concat values to a Buf.
Replace `c.t.io.Reader.readAll` with `Reader.readAllItems`, the new API consumes a generic Reader[T],
and return a Seq of items. ``PHAB_ID=D391346``

19.10.0
-------

Expand Down
23 changes: 23 additions & 0 deletions util-core/src/main/java/com/twitter/io/BufReaders.java
@@ -0,0 +1,23 @@
package com.twitter.io;

import com.twitter.util.Future;

/**
* Better Java APIs of BufReader for Scala 2.11.
* This will be removed when drop Scala 2.11 support.
*
* @see com.twitter.io.BufReader
*/
public final class BufReaders {

private BufReaders() {
throw new IllegalStateException();
}

/**
* See {@code com.twitter.io.BufReader.readAll}.
*/
public static Future<Buf> readAll(Reader<Buf> r) {
return BufReader$.MODULE$.readAll(r);
}
}
10 changes: 2 additions & 8 deletions util-core/src/main/java/com/twitter/io/Readers.java
Expand Up @@ -14,7 +14,8 @@
import scala.runtime.BoxedUnit;

/**
* Java APIs for Reader.
* Better Java APIs of Reader for Scala 2.11.
* This will be removed when drop Scala 2.11 support.
*
* @see com.twitter.io.Reader
*/
Expand All @@ -33,13 +34,6 @@ public static Reader<Buf> newBufReader(Buf buf, int chunkSize) {
return Reader$.MODULE$.fromBuf(buf, chunkSize);
}

/**
* See {@code com.twitter.io.Reader.readAll}.
*/
public static Future<Buf> readAll(Reader<Buf> r) {
return Reader$.MODULE$.readAll(r);
}

/**
* See {@code com.twitter.io.Reader.concat}.
*/
Expand Down
13 changes: 13 additions & 0 deletions util-core/src/main/scala/com/twitter/io/BufReader.scala
Expand Up @@ -52,4 +52,17 @@ object BufReader {
*/
def apply(buf: Buf, chunkSize: Int): Reader[Buf] =
if (buf.isEmpty) Reader.empty[Buf] else new BufReader(buf, chunkSize)

/**
* Read the entire bytestream presented by `r`.
*/
def readAll(r: Reader[Buf]): Future[Buf] = {
def loop(left: Buf): Future[Buf] =
r.read().flatMap {
case Some(right) => loop(left.concat(right))
case _ => Future.value(left)
}

loop(Buf.Empty)
}
}
20 changes: 11 additions & 9 deletions util-core/src/main/scala/com/twitter/io/Reader.scala
Expand Up @@ -248,16 +248,18 @@ object Reader {
def exception[A](e: Throwable): Reader[A] = fromFuture[A](Future.exception(e))

/**
* Read the entire bytestream presented by `r`.
* Read all items from the Reader r.
* @return A Sequence of items.
*/
def readAll(r: Reader[Buf]): Future[Buf] = {
def loop(left: Buf): Future[Buf] =
r.read().flatMap {
case Some(right) => loop(left concat right)
case _ => Future.value(left)
}

loop(Buf.Empty)
def readAllItems[A](r: Reader[A]): Future[Seq[A]] = {
val acc = List.newBuilder[A]
def loop(): Future[List[A]] = r.read().flatMap {
case None => Future.value(acc.result())
case Some(t) =>
acc += t
loop()
}
loop()
}

/**
Expand Down
@@ -1,8 +1,8 @@
package com.twitter.io.exp

import com.twitter.io.{InputStreamReader, Buf, Reader}
import com.twitter.io.{Buf, BufReader, InputStreamReader}
import com.twitter.util._
import java.io.{FileInputStream, File}
import java.io.{File, FileInputStream}
import java.lang.ref.{ReferenceQueue, WeakReference}
import java.util.concurrent.atomic.AtomicBoolean
import java.util.HashMap
Expand Down Expand Up @@ -138,7 +138,7 @@ class FilePollingActivitySource private[exp] (
InputStreamReader.DefaultMaxBufferSize,
pool
)
Reader.readAll(reader) respond {
BufReader.readAll(reader) respond {
case Return(buf) =>
value() = Activity.Ok(buf)
case Throw(cause) =>
Expand Down Expand Up @@ -187,7 +187,7 @@ class ClassLoaderActivitySource private[exp] (classLoader: ClassLoader, pool: Fu
case stream =>
val reader =
new InputStreamReader(stream, InputStreamReader.DefaultMaxBufferSize, pool)
Reader.readAll(reader) respond {
BufReader.readAll(reader) respond {
case Return(buf) =>
p.setValue(Activity.Ok(buf))
case Throw(cause) =>
Expand Down
Expand Up @@ -18,7 +18,7 @@ public void testNewBufReader() {

@Test
public void testReadAll() {
Readers.readAll(Readers.newEmptyReader());
BufReaders.readAll(Readers.newEmptyReader());
}

@Test
Expand Down
8 changes: 7 additions & 1 deletion util-core/src/test/scala/com/twitter/io/BufReaderTest.scala
Expand Up @@ -13,7 +13,7 @@ class BufReaderTest extends FunSuite with Checkers {
check { bytes: String =>
val buf = Buf.Utf8(bytes)
val r = Reader.fromBuf(buf, 8)
await(Reader.readAll(r)) == buf
await(BufReader.readAll(r)) == buf
}
}

Expand All @@ -30,4 +30,10 @@ class BufReaderTest extends FunSuite with Checkers {
)
}
}
test("BufReader - readAll") {
check { bytes: String =>
val r = Reader.fromBuf(Buf.Utf8(bytes))
await(BufReader.readAll(r)) == Buf.Utf8(bytes)
}
}
}
Expand Up @@ -62,16 +62,16 @@ class InputStreamReaderTest extends FunSuite {
val r1 = new InputStreamReader(s1, 100)
val r2 = new InputStreamReader(s2, 500)

val f1 = Reader.readAll(r1)
val f1 = BufReader.readAll(r1)
assert(Await.result(f1, 5.seconds) == buf(0, 250))

val f2 = Reader.readAll(r1)
val f2 = BufReader.readAll(r1)
assert(Await.result(f2, 5.seconds).isEmpty)

val f3 = Reader.readAll(r2)
val f3 = BufReader.readAll(r2)
assert(Await.result(f3, 5.seconds) == buf(0, 250))

val f4 = Reader.readAll(r2)
val f4 = BufReader.readAll(r2)
assert(Await.result(f4, 5.seconds).isEmpty)
}

Expand Down Expand Up @@ -108,7 +108,7 @@ class InputStreamReaderTest extends FunSuite {
test("reading EOF closes the InputStream") {
val in = spy(new ByteArrayInputStream(arr(0, 10)))
val reader = new InputStreamReader(in, 100, FuturePool.immediatePool)
val f = Reader.readAll(reader)
val f = BufReader.readAll(reader)
assert(Await.result(f, 5.seconds) == buf(0, 10))
verify(in, times(1)).close()
}
Expand Down
4 changes: 2 additions & 2 deletions util-core/src/test/scala/com/twitter/io/PipeTest.scala
Expand Up @@ -72,7 +72,7 @@ class PipeTest extends FunSuite with Matchers {

test("Reader.readAll") {
val rw = new Pipe[Buf]
val all = Reader.readAll(rw)
val all = BufReader.readAll(rw)
assert(!all.isDefined)
assertWrite(rw, 0, 3)
assertWrite(rw, 3, 6)
Expand Down Expand Up @@ -430,7 +430,7 @@ class PipeTest extends FunSuite with Matchers {
rw.close(1.second)

assert(!writef.isDefined)
assert(await(Reader.readAll(rw)) == buf)
assert(await(BufReader.readAll(rw)) == buf)
assertReadNone(rw)
assert(writef.isDefined)

Expand Down
62 changes: 36 additions & 26 deletions util-core/src/test/scala/com/twitter/io/ReaderTest.scala
Expand Up @@ -3,7 +3,7 @@ package com.twitter.io
import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.DurationOps._
import com.twitter.conversions.StorageUnitOps._
import com.twitter.util.{Await, Awaitable, Future, Promise, Return, Try}
import com.twitter.util.{Await, Awaitable, Future, Promise}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.{StandardCharsets => JChar}
import java.util.concurrent.atomic.AtomicBoolean
Expand Down Expand Up @@ -55,16 +55,6 @@ class ReaderTest
}
}

private def readAllString(r: Reader[String]): Future[String] = {
def loop(left: StringBuilder): Future[String] = {
r.read.flatMap {
case Some(right) => loop(left.append(right))
case _ => Future.value(left.toString())
}
}
loop(StringBuilder.newBuilder)
}

private def writeLoop[T](from: List[T], to: Writer[T]): Future[Unit] =
from match {
case h :: t => to.write(h).flatMap(_ => writeLoop(t, to))
Expand Down Expand Up @@ -150,9 +140,10 @@ class ReaderTest

test("Reader.concat") {
forAll { ss: List[String] =>
val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16))
val buf = Reader.readAll(Reader.concat(AsyncStream.fromSeq(readers)))
assert(await(buf) == Buf.Utf8(ss.mkString))
val readers: List[Reader[String]] = ss.map(s => Reader.value(s))
val concatedReaders = Reader.concat(AsyncStream.fromSeq(readers))
val values = Reader.readAllItems(concatedReaders)
assert(await(values) == ss)
}
}

Expand Down Expand Up @@ -205,7 +196,7 @@ class ReaderTest
}

val combined = Reader.concat(head +:: tail)
val buf = Reader.readAll(combined)
val buf = Reader.readAllItems(combined)
intercept[Exception] {
await(buf)
}
Expand All @@ -214,17 +205,17 @@ class ReaderTest

test("Reader.flatten") {
forAll { ss: List[String] =>
val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16))
val buf = Reader.readAll(Reader.flatten(Reader.fromSeq(readers)))
assert(await(buf) == Buf.Utf8(ss.mkString))
val readers: List[Reader[String]] = ss.map(s => Reader.value(s))
val value = Reader.readAllItems(Reader.flatten(Reader.fromSeq(readers)))
assert(await(value) == ss)
}
}

test("Reader#flatten") {
forAll { ss: List[String] =>
val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16))
val buf = Reader.readAll((Reader.fromSeq(readers).flatten))
assert(await(buf) == Buf.Utf8(ss.mkString))
val readers: List[Reader[String]] = ss.map(s => Reader.value(s))
val buf = Reader.readAllItems((Reader.fromSeq(readers).flatten))
assert(await(buf) == ss)
}
}

Expand Down Expand Up @@ -337,8 +328,8 @@ class ReaderTest

test("Reader.fromStream closes resources on EOF read") {
val in = spy(new ByteArrayInputStream(arr(0, 10)))
val r = Reader.fromStream(in, 4)
val f = Reader.readAll(r)
val r: Reader[Buf] = Reader.fromStream(in, 4)
val f = BufReader.readAll(r)
assert(await(f) == buf(0, 10))
eventually {
verify(in).close()
Expand All @@ -362,7 +353,7 @@ class ReaderTest
test("Reader.fromAsyncStream fails on exceptional stream") {
val as = AsyncStream.exception(new Exception())
val r = Reader.fromAsyncStream(as)
val f = Reader.readAll(r)
val f = Reader.readAllItems(r)
intercept[Exception] {
await(f)
}
Expand Down Expand Up @@ -441,7 +432,7 @@ class ReaderTest
pipe
}

assert(Buf.decodeString(await(Reader.readAll(reader2)), JChar.UTF_8) == s)
assert(Buf.decodeString(await(BufReader.readAll(reader2)), JChar.UTF_8) == s)
}
}

Expand Down Expand Up @@ -564,7 +555,7 @@ class ReaderTest
val pipe = new Pipe[Int]
writeLoop(l, pipe)
val reader2 = pipe.map(_.toString)
assert(await(readAllString(reader2)) == l.mkString)
assert(await(Reader.readAllItems(reader2)) == l.map(_.toString))
}
}

Expand All @@ -586,6 +577,25 @@ class ReaderTest
}
assert(await(reader.onClose) == StreamTermination.Discarded)
}

test("Reader.readAll") {
val genBuf: Gen[Buf] = {
for {
// limit arrays to a few kilobytes, otherwise we may generate a very large amount of data
numBytes <- Gen.choose(0.bytes.inBytes, 2.kilobytes.inBytes)
bytes <- Gen.containerOfN[Array, Byte](numBytes.toInt, Arbitrary.arbitrary[Byte])
} yield {
Buf.ByteArray.Owned(bytes)
}
}

val genList =
Gen.listOf(Gen.oneOf(Gen.alphaLowerStr, Gen.alphaNumStr, Gen.choose(1, 100), genBuf))
forAll(genList) { l =>
val r = Reader.fromSeq(l)
assert(await(Reader.readAllItems(r)) == l)
}
}
}

object ReaderTest {
Expand Down

0 comments on commit a47a219

Please sign in to comment.