Skip to content

Commit

Permalink
newlines not whitespace
Browse files Browse the repository at this point in the history
  • Loading branch information
calvinlfer committed May 18, 2023
1 parent 41b3aef commit cd23a52
Showing 1 changed file with 13 additions and 10 deletions.
23 changes: 13 additions & 10 deletions src/main/scala/io/kaizensolutions/zio/streams/circe/Parser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ object Parser {
private val supportParser: CirceSupportParser = new CirceSupportParser(maxValueSize = None, allowDuplicateKeys = true)

private def go(parser: AsyncParser[Json]): ZChannel[Any, ParseException, Chunk[Byte], Any, ParseException, Chunk[Json], Any] =
ZChannel.readWith[Any, ParseException, Chunk[Byte], Any, ParseException, Chunk[Json], Any](chunkByte =>
parseWith(parser)(chunkByte) match {
case Left(error) =>
ZChannel.fail(error)

case Right(jsonChunk) =>
ZChannel.write(jsonChunk) *> go(parser)
},
ZChannel.readWith[Any, ParseException, Chunk[Byte], Any, ParseException, Chunk[Json], Any](
chunkByte =>
parseWith(parser)(chunkByte) match {
case Left(error) =>
ZChannel.fail(error)

case Right(jsonChunk) =>
ZChannel.write(jsonChunk) *> go(parser)
},
error => ZChannel.fail(error),
done => ZChannel.succeed(done)
)
Expand All @@ -26,7 +27,8 @@ object Parser {
parser.absorb(in.toArray)(supportParser.facade).map(Chunk.fromIterable(_))

private def configuredPipeline(mode: AsyncParser.Mode): ZPipeline[Any, ParseException, Byte, Json] =
ZChannel.fromZIO(ZIO.succeed(supportParser.async(mode)))
ZChannel
.fromZIO(ZIO.succeed(supportParser.async(mode)))
.flatMap(go)
.toPipeline

Expand All @@ -39,7 +41,8 @@ object Parser {
configuredPipeline(AsyncParser.UnwrapArray)

/**
* Use this pipeline when you have a stream of JSON values separated by whitespace
* Use this pipeline when you have a stream of JSON values separated by new
* lines
*
* @return
*/
Expand Down

0 comments on commit cd23a52

Please sign in to comment.