Skip to content

Commit

Permalink
Add ZSink.splitLines and ZSink.splitLinesChunk
Browse files Browse the repository at this point in the history
  • Loading branch information
iravid committed Aug 17, 2019
1 parent 37a8985 commit 52a3859
Show file tree
Hide file tree
Showing 3 changed files with 161 additions and 1 deletion.
78 changes: 77 additions & 1 deletion streams-tests/jvm/src/test/scala/zio/stream/SinkSpec.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package zio.stream

import org.scalacheck.Arbitrary
import org.scalacheck.{ Arbitrary, Gen }
import org.specs2.ScalaCheck
import scala.{ Stream => _ }
import zio._
Expand Down Expand Up @@ -222,6 +222,14 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRunt

fromOutputStream $fromOutputStream

splitLines
preserves data $splitLines
handles leftovers $splitLinesLeftovers
transduces $splitLinesTransduce
single newline edgecase $splitLinesEdgecase
no newlines in data $splitLinesNoNewlines
\r\n on the boundary $splitLinesBoundary

throttleEnforce $throttleEnforce
with burst $throttleEnforceWithBurst

Expand Down Expand Up @@ -1203,6 +1211,74 @@ class SinkSpec(implicit ee: org.specs2.concurrent.ExecutionEnv) extends TestRunt
}
}

private def splitLines =
prop { (lines: List[String]) =>
val data = lines.mkString("\n")

unsafeRun {
for {
initial <- ZSink.splitLines.initial.map(Step.state(_))
middle <- ZSink.splitLines.step(initial, data)
result <- ZSink.splitLines.extract(Step.state(middle))
sinkLeftover = Step.leftover(middle)
} yield ((result ++ sinkLeftover).toArray[String].mkString("\n") must_=== lines.mkString("\n"))
}
}.setGen(
Gen
.listOf(Gen.asciiStr.map(_.filterNot(c => c == '\n' || c == '\r')))
.map(l => if (l.nonEmpty && l.last == "") l ++ List("a") else l)
)

private def splitLinesLeftovers = unsafeRun {
for {
initial <- ZSink.splitLines.initial.map(Step.state(_))
middle <- ZSink.splitLines.step(initial, "abc\nbc")
result <- ZSink.splitLines.extract(Step.state(middle))
sinkLeftover = Step.leftover(middle)
} yield (result.toArray[String].mkString("\n") must_=== "abc") and (sinkLeftover
.toArray[String]
.mkString must_=== "bc")
}

private def splitLinesTransduce = unsafeRun {
Stream("abc", "\n", "bc", "\n", "bcd", "bcd")
.transduce(ZSink.splitLines)
.runCollect
.map {
_ must_=== List(Chunk("abc"), Chunk("bc"), Chunk("bcdbcd"))
}
}

private def splitLinesEdgecase = unsafeRun {
Stream("\n")
.transduce(ZSink.splitLines)
.mapConcat(identity)
.runCollect
.map {
_ must_=== List("")
}
}

private def splitLinesNoNewlines = unsafeRun {
Stream("abc", "abc", "abc")
.transduce(ZSink.splitLines)
.mapConcat(identity)
.runCollect
.map {
_ must_=== List("abcabcabc")
}
}

private def splitLinesBoundary = unsafeRun {
Stream("abc\r", "\nabc")
.transduce(ZSink.splitLines)
.mapConcat(identity)
.runCollect
.map {
_ must_=== List("abc", "abc")
}
}

private def throttleEnforce = {

def sinkTest(sink: ZSink[Clock, Nothing, Nothing, Int, Option[Int]]) =
Expand Down
10 changes: 10 additions & 0 deletions streams/shared/src/main/scala/zio/stream/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,16 @@ object Sink {
final def read1[E, A](e: Option[A] => E)(p: A => Boolean): Sink[E, A, A, A] =
ZSink.read1(e)(p)

/**
* see [[ZSink.splitLines]]
*/
final val splitLines: Sink[Nothing, String, String, Chunk[String]] = ZSink.splitLines

/**
* see [[ZSink.splitLinesChunk]]
*/
final val splitLinesChunk: Sink[Nothing, Chunk[String], Chunk[String], Chunk[String]] = ZSink.splitLinesChunk

/**
* see [[ZSink.succeed]]
*/
Expand Down
74 changes: 74 additions & 0 deletions streams/shared/src/main/scala/zio/stream/ZSink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package zio.stream
import zio._
import zio.clock.Clock
import zio.duration.Duration

import scala.collection.mutable
import scala.language.postfixOps

/**
Expand Down Expand Up @@ -1293,6 +1295,78 @@ object ZSink extends ZSinkPlatformSpecific {
}
}

/**
* Splits strings on newlines. Handles both `\r\n` and `\n`.
*/
final val splitLines: ZSink[Any, Nothing, String, String, Chunk[String]] =
new SinkPure[Nothing, String, String, Chunk[String]] {
type State = (Chunk[String], Option[String], Boolean)

override val initialPure: Step[State, Nothing] = Step.more((Chunk.empty, None, false))

override def stepPure(s: State, a: String): Step[State, String] = {
val accumulatedLines = s._1
val concat = s._2.getOrElse("") + a
val wasSplitCRLF = s._3

if (concat.isEmpty) Step.more(s)
else {
val buf = mutable.ArrayBuffer[String]()

var i =
// If we had a split CRLF, we start reading from the last character of the
// leftover (which was the '\r')
if (wasSplitCRLF) s._2.map(_.length).getOrElse(1) - 1
// Otherwise we just skip over the entire previous leftover as it doesn't
// contain a newline.
else s._2.map(_.length).getOrElse(0)

var sliceStart = 0
var splitCRLF = false

while (i < concat.length) {
if (concat(i) == '\n') {
buf += concat.substring(sliceStart, i)
i += 1
sliceStart = i
} else if (concat(i) == '\r' && (i + 1 < concat.length) && (concat(i + 1) == '\n')) {
buf += concat.substring(sliceStart, i)
i += 2
sliceStart = i
} else if (concat(i) == '\r' && (i == concat.length - 1)) {
splitCRLF = true
i += 1
} else {
i += 1
}
}

if (buf.isEmpty) Step.more((accumulatedLines, Some(concat), splitCRLF))
else {
val newLines = Chunk.fromArray(buf.toArray[String])
val leftover = concat.substring(sliceStart, concat.length)

if (splitCRLF) Step.more((accumulatedLines ++ newLines, Some(leftover), splitCRLF))
else
Step.done(
(accumulatedLines ++ newLines, None, splitCRLF),
if (leftover.nonEmpty) Chunk.single(leftover) else Chunk.empty
)
}
}
}

override def extractPure(s: State): Either[Nothing, Chunk[String]] =
Right(s._1 ++ s._2.map(Chunk.single(_)).getOrElse(Chunk.empty))
}

/**
* Merges chunks of strings and splits them on newlines. Handles both
* `\r\n` and `\n`.
*/
final val splitLinesChunk: ZSink[Any, Nothing, Chunk[String], Chunk[String], Chunk[String]] =
splitLines.contramap[Chunk[String]](_.mkString).mapRemainder(Chunk.single)

/**
* Creates a single-value sink from a value.
*/
Expand Down

0 comments on commit 52a3859

Please sign in to comment.