diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 4e00d11572..af22c70826 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -7,12 +7,11 @@ Note that ``PHAB_ID=#`` and ``RB_ID=#`` correspond to associated messages in com Unreleased ---------- -* util: Add initial support for JDK 11 compatibility. ``PHAB_ID=D365075`` - - New Features ~~~~~~~~~~~~ +* util: Add initial support for JDK 11 compatibility. ``PHAB_ID=D365075`` + * util-core: Created public method Closable.stopCollectClosablesThread that stops CollectClosables thread. ``PHAB_ID=D382800`` @@ -21,6 +20,13 @@ Runtime Behavior Changes * util: Upgrade to caffeine 2.8.0 ``PHAB_ID=D384592`` +Breaking API Changes +~~~~~~~~~~~~~~~~~~~~ + +* util-core: Add `c.t.io.BufReader.readAll` to consume a `Reader[Buf]` and concat values to a Buf. + Replace `c.t.io.Reader.readAll` with `Reader.readAllItems`, the new API consumes a generic Reader[T], + and return a Seq of items. ``PHAB_ID=D391346`` + 19.10.0 ------- diff --git a/util-core/src/main/java/com/twitter/io/BufReaders.java b/util-core/src/main/java/com/twitter/io/BufReaders.java new file mode 100644 index 0000000000..a6c5b53ae6 --- /dev/null +++ b/util-core/src/main/java/com/twitter/io/BufReaders.java @@ -0,0 +1,23 @@ +package com.twitter.io; + +import com.twitter.util.Future; + +/** + * Better Java APIs of BufReader for Scala 2.11. + * This will be removed when drop Scala 2.11 support. + * + * @see com.twitter.io.BufReader + */ +public final class BufReaders { + + private BufReaders() { + throw new IllegalStateException(); + } + + /** + * See {@code com.twitter.io.BufReader.readAll}. + */ + public static Future readAll(Reader r) { + return BufReader$.MODULE$.readAll(r); + } +} diff --git a/util-core/src/main/java/com/twitter/io/Readers.java b/util-core/src/main/java/com/twitter/io/Readers.java index 89ba0c2543..e746e508eb 100644 --- a/util-core/src/main/java/com/twitter/io/Readers.java +++ b/util-core/src/main/java/com/twitter/io/Readers.java @@ -14,7 +14,8 @@ import scala.runtime.BoxedUnit; /** - * Java APIs for Reader. + * Better Java APIs of Reader for Scala 2.11. + * This will be removed when drop Scala 2.11 support. * * @see com.twitter.io.Reader */ @@ -33,13 +34,6 @@ public static Reader newBufReader(Buf buf, int chunkSize) { return Reader$.MODULE$.fromBuf(buf, chunkSize); } - /** - * See {@code com.twitter.io.Reader.readAll}. - */ - public static Future readAll(Reader r) { - return Reader$.MODULE$.readAll(r); - } - /** * See {@code com.twitter.io.Reader.concat}. */ diff --git a/util-core/src/main/scala/com/twitter/io/BufReader.scala b/util-core/src/main/scala/com/twitter/io/BufReader.scala index 357afca546..445efebf74 100644 --- a/util-core/src/main/scala/com/twitter/io/BufReader.scala +++ b/util-core/src/main/scala/com/twitter/io/BufReader.scala @@ -52,4 +52,17 @@ object BufReader { */ def apply(buf: Buf, chunkSize: Int): Reader[Buf] = if (buf.isEmpty) Reader.empty[Buf] else new BufReader(buf, chunkSize) + + /** + * Read the entire bytestream presented by `r`. + */ + def readAll(r: Reader[Buf]): Future[Buf] = { + def loop(left: Buf): Future[Buf] = + r.read().flatMap { + case Some(right) => loop(left.concat(right)) + case _ => Future.value(left) + } + + loop(Buf.Empty) + } } diff --git a/util-core/src/main/scala/com/twitter/io/Reader.scala b/util-core/src/main/scala/com/twitter/io/Reader.scala index 6e404c29a5..d61f8afe8d 100644 --- a/util-core/src/main/scala/com/twitter/io/Reader.scala +++ b/util-core/src/main/scala/com/twitter/io/Reader.scala @@ -248,16 +248,18 @@ object Reader { def exception[A](e: Throwable): Reader[A] = fromFuture[A](Future.exception(e)) /** - * Read the entire bytestream presented by `r`. + * Read all items from the Reader r. + * @return A Sequence of items. */ - def readAll(r: Reader[Buf]): Future[Buf] = { - def loop(left: Buf): Future[Buf] = - r.read().flatMap { - case Some(right) => loop(left concat right) - case _ => Future.value(left) - } - - loop(Buf.Empty) + def readAllItems[A](r: Reader[A]): Future[Seq[A]] = { + val acc = List.newBuilder[A] + def loop(): Future[List[A]] = r.read().flatMap { + case None => Future.value(acc.result()) + case Some(t) => + acc += t + loop() + } + loop() } /** diff --git a/util-core/src/main/scala/com/twitter/io/exp/ActivitySource.scala b/util-core/src/main/scala/com/twitter/io/exp/ActivitySource.scala index 4c24508126..ed939121b7 100644 --- a/util-core/src/main/scala/com/twitter/io/exp/ActivitySource.scala +++ b/util-core/src/main/scala/com/twitter/io/exp/ActivitySource.scala @@ -1,8 +1,8 @@ package com.twitter.io.exp -import com.twitter.io.{InputStreamReader, Buf, Reader} +import com.twitter.io.{Buf, BufReader, InputStreamReader} import com.twitter.util._ -import java.io.{FileInputStream, File} +import java.io.{File, FileInputStream} import java.lang.ref.{ReferenceQueue, WeakReference} import java.util.concurrent.atomic.AtomicBoolean import java.util.HashMap @@ -138,7 +138,7 @@ class FilePollingActivitySource private[exp] ( InputStreamReader.DefaultMaxBufferSize, pool ) - Reader.readAll(reader) respond { + BufReader.readAll(reader) respond { case Return(buf) => value() = Activity.Ok(buf) case Throw(cause) => @@ -187,7 +187,7 @@ class ClassLoaderActivitySource private[exp] (classLoader: ClassLoader, pool: Fu case stream => val reader = new InputStreamReader(stream, InputStreamReader.DefaultMaxBufferSize, pool) - Reader.readAll(reader) respond { + BufReader.readAll(reader) respond { case Return(buf) => p.setValue(Activity.Ok(buf)) case Throw(cause) => diff --git a/util-core/src/test/java/com/twitter/io/ReaderCompilationTest.java b/util-core/src/test/java/com/twitter/io/ReaderCompilationTest.java index 4d1ec8db43..61c6d54180 100644 --- a/util-core/src/test/java/com/twitter/io/ReaderCompilationTest.java +++ b/util-core/src/test/java/com/twitter/io/ReaderCompilationTest.java @@ -18,7 +18,7 @@ public void testNewBufReader() { @Test public void testReadAll() { - Readers.readAll(Readers.newEmptyReader()); + BufReaders.readAll(Readers.newEmptyReader()); } @Test diff --git a/util-core/src/test/scala/com/twitter/io/BufReaderTest.scala b/util-core/src/test/scala/com/twitter/io/BufReaderTest.scala index f66780c60a..bb32b91eaf 100644 --- a/util-core/src/test/scala/com/twitter/io/BufReaderTest.scala +++ b/util-core/src/test/scala/com/twitter/io/BufReaderTest.scala @@ -13,7 +13,7 @@ class BufReaderTest extends FunSuite with Checkers { check { bytes: String => val buf = Buf.Utf8(bytes) val r = Reader.fromBuf(buf, 8) - await(Reader.readAll(r)) == buf + await(BufReader.readAll(r)) == buf } } @@ -30,4 +30,10 @@ class BufReaderTest extends FunSuite with Checkers { ) } } + test("BufReader - readAll") { + check { bytes: String => + val r = Reader.fromBuf(Buf.Utf8(bytes)) + await(BufReader.readAll(r)) == Buf.Utf8(bytes) + } + } } diff --git a/util-core/src/test/scala/com/twitter/io/InputStreamReaderTest.scala b/util-core/src/test/scala/com/twitter/io/InputStreamReaderTest.scala index be8458e7a7..40bd4e1102 100644 --- a/util-core/src/test/scala/com/twitter/io/InputStreamReaderTest.scala +++ b/util-core/src/test/scala/com/twitter/io/InputStreamReaderTest.scala @@ -62,16 +62,16 @@ class InputStreamReaderTest extends FunSuite { val r1 = new InputStreamReader(s1, 100) val r2 = new InputStreamReader(s2, 500) - val f1 = Reader.readAll(r1) + val f1 = BufReader.readAll(r1) assert(Await.result(f1, 5.seconds) == buf(0, 250)) - val f2 = Reader.readAll(r1) + val f2 = BufReader.readAll(r1) assert(Await.result(f2, 5.seconds).isEmpty) - val f3 = Reader.readAll(r2) + val f3 = BufReader.readAll(r2) assert(Await.result(f3, 5.seconds) == buf(0, 250)) - val f4 = Reader.readAll(r2) + val f4 = BufReader.readAll(r2) assert(Await.result(f4, 5.seconds).isEmpty) } @@ -108,7 +108,7 @@ class InputStreamReaderTest extends FunSuite { test("reading EOF closes the InputStream") { val in = spy(new ByteArrayInputStream(arr(0, 10))) val reader = new InputStreamReader(in, 100, FuturePool.immediatePool) - val f = Reader.readAll(reader) + val f = BufReader.readAll(reader) assert(Await.result(f, 5.seconds) == buf(0, 10)) verify(in, times(1)).close() } diff --git a/util-core/src/test/scala/com/twitter/io/PipeTest.scala b/util-core/src/test/scala/com/twitter/io/PipeTest.scala index d2b79646b0..4420f00776 100644 --- a/util-core/src/test/scala/com/twitter/io/PipeTest.scala +++ b/util-core/src/test/scala/com/twitter/io/PipeTest.scala @@ -72,7 +72,7 @@ class PipeTest extends FunSuite with Matchers { test("Reader.readAll") { val rw = new Pipe[Buf] - val all = Reader.readAll(rw) + val all = BufReader.readAll(rw) assert(!all.isDefined) assertWrite(rw, 0, 3) assertWrite(rw, 3, 6) @@ -430,7 +430,7 @@ class PipeTest extends FunSuite with Matchers { rw.close(1.second) assert(!writef.isDefined) - assert(await(Reader.readAll(rw)) == buf) + assert(await(BufReader.readAll(rw)) == buf) assertReadNone(rw) assert(writef.isDefined) diff --git a/util-core/src/test/scala/com/twitter/io/ReaderTest.scala b/util-core/src/test/scala/com/twitter/io/ReaderTest.scala index 8b756546df..1093e82aef 100644 --- a/util-core/src/test/scala/com/twitter/io/ReaderTest.scala +++ b/util-core/src/test/scala/com/twitter/io/ReaderTest.scala @@ -3,7 +3,7 @@ package com.twitter.io import com.twitter.concurrent.AsyncStream import com.twitter.conversions.DurationOps._ import com.twitter.conversions.StorageUnitOps._ -import com.twitter.util.{Await, Awaitable, Future, Promise, Return, Try} +import com.twitter.util.{Await, Awaitable, Future, Promise} import java.io.{ByteArrayInputStream, ByteArrayOutputStream} import java.nio.charset.{StandardCharsets => JChar} import java.util.concurrent.atomic.AtomicBoolean @@ -55,16 +55,6 @@ class ReaderTest } } - private def readAllString(r: Reader[String]): Future[String] = { - def loop(left: StringBuilder): Future[String] = { - r.read.flatMap { - case Some(right) => loop(left.append(right)) - case _ => Future.value(left.toString()) - } - } - loop(StringBuilder.newBuilder) - } - private def writeLoop[T](from: List[T], to: Writer[T]): Future[Unit] = from match { case h :: t => to.write(h).flatMap(_ => writeLoop(t, to)) @@ -150,9 +140,10 @@ class ReaderTest test("Reader.concat") { forAll { ss: List[String] => - val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16)) - val buf = Reader.readAll(Reader.concat(AsyncStream.fromSeq(readers))) - assert(await(buf) == Buf.Utf8(ss.mkString)) + val readers: List[Reader[String]] = ss.map(s => Reader.value(s)) + val concatedReaders = Reader.concat(AsyncStream.fromSeq(readers)) + val values = Reader.readAllItems(concatedReaders) + assert(await(values) == ss) } } @@ -205,7 +196,7 @@ class ReaderTest } val combined = Reader.concat(head +:: tail) - val buf = Reader.readAll(combined) + val buf = Reader.readAllItems(combined) intercept[Exception] { await(buf) } @@ -214,17 +205,17 @@ class ReaderTest test("Reader.flatten") { forAll { ss: List[String] => - val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16)) - val buf = Reader.readAll(Reader.flatten(Reader.fromSeq(readers))) - assert(await(buf) == Buf.Utf8(ss.mkString)) + val readers: List[Reader[String]] = ss.map(s => Reader.value(s)) + val value = Reader.readAllItems(Reader.flatten(Reader.fromSeq(readers))) + assert(await(value) == ss) } } test("Reader#flatten") { forAll { ss: List[String] => - val readers = ss.map(s => Reader.fromBuf(Buf.Utf8(s), 16)) - val buf = Reader.readAll((Reader.fromSeq(readers).flatten)) - assert(await(buf) == Buf.Utf8(ss.mkString)) + val readers: List[Reader[String]] = ss.map(s => Reader.value(s)) + val buf = Reader.readAllItems((Reader.fromSeq(readers).flatten)) + assert(await(buf) == ss) } } @@ -337,8 +328,8 @@ class ReaderTest test("Reader.fromStream closes resources on EOF read") { val in = spy(new ByteArrayInputStream(arr(0, 10))) - val r = Reader.fromStream(in, 4) - val f = Reader.readAll(r) + val r: Reader[Buf] = Reader.fromStream(in, 4) + val f = BufReader.readAll(r) assert(await(f) == buf(0, 10)) eventually { verify(in).close() @@ -362,7 +353,7 @@ class ReaderTest test("Reader.fromAsyncStream fails on exceptional stream") { val as = AsyncStream.exception(new Exception()) val r = Reader.fromAsyncStream(as) - val f = Reader.readAll(r) + val f = Reader.readAllItems(r) intercept[Exception] { await(f) } @@ -441,7 +432,7 @@ class ReaderTest pipe } - assert(Buf.decodeString(await(Reader.readAll(reader2)), JChar.UTF_8) == s) + assert(Buf.decodeString(await(BufReader.readAll(reader2)), JChar.UTF_8) == s) } } @@ -564,7 +555,7 @@ class ReaderTest val pipe = new Pipe[Int] writeLoop(l, pipe) val reader2 = pipe.map(_.toString) - assert(await(readAllString(reader2)) == l.mkString) + assert(await(Reader.readAllItems(reader2)) == l.map(_.toString)) } } @@ -586,6 +577,25 @@ class ReaderTest } assert(await(reader.onClose) == StreamTermination.Discarded) } + + test("Reader.readAll") { + val genBuf: Gen[Buf] = { + for { + // limit arrays to a few kilobytes, otherwise we may generate a very large amount of data + numBytes <- Gen.choose(0.bytes.inBytes, 2.kilobytes.inBytes) + bytes <- Gen.containerOfN[Array, Byte](numBytes.toInt, Arbitrary.arbitrary[Byte]) + } yield { + Buf.ByteArray.Owned(bytes) + } + } + + val genList = + Gen.listOf(Gen.oneOf(Gen.alphaLowerStr, Gen.alphaNumStr, Gen.choose(1, 100), genBuf)) + forAll(genList) { l => + val r = Reader.fromSeq(l) + assert(await(Reader.readAllItems(r)) == l) + } + } } object ReaderTest {