-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
5d6db0d
commit cd0995f
Showing
5 changed files
with
155 additions
and
43 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Circe ZIO Streams | ||
This project integrates the streaming JSON parsing capabilities of the [Jawn](https://github.com/typelevel/jawn) parser | ||
with the [Circe library](https://github.com/circe/circe/tree/series/0.14.x/modules/jawn) and the | ||
[ZIO Streams library](https://zio.dev/docs/datatypes/datatypes_stream) allowing for the parsing of JSON streams. | ||
|
||
[![Latest Version](https://jitpack.io/v/kaizen-solutions/circe-zio-streams.svg)](https://jitpack.io/#kaizen-solutions/circe-zio-streams) | ||
|
||
```sbt | ||
libraryDependencies += "com.github.kaizen-solutions.circe-zio-streams" %% "circe-zio-streams" % "Tag" | ||
``` | ||
|
||
## Examples | ||
See the [examples](src/test/scala/io/kaizensolutions/zio/streams/circe/examples/Examples.scala) directory for examples | ||
of how to use this library. | ||
|
||
### Recommended Usage | ||
* Use the `jsonStreamPipeline` to parse newline separated JSON values | ||
* Use the `jsonArrayStreamPipeline` to parse JSON arrays | ||
|
||
## Notes | ||
Please ensure you select the right parser based on your JSON content. For example, do not use the `jsonStreamPipeline` if | ||
your content is a JSON array. The `jsonStreamPipeline` is for newline separated JSON values. Similarly, do not use the | ||
`jsonArrayStreamPipeline` if your content is newline separated JSON values. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
39 changes: 0 additions & 39 deletions
39
src/test/scala/io/kaizensolutions/zio/streams/circe/ExampleJsonArrayStream.scala
This file was deleted.
Oops, something went wrong.
81 changes: 81 additions & 0 deletions
81
src/test/scala/io/kaizensolutions/zio/streams/circe/ParserSpec.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,81 @@ | ||
package io.kaizensolutions.zio.streams.circe | ||
|
||
import io.circe._ | ||
import io.circe.generic.semiauto._ | ||
import io.circe.syntax._ | ||
import org.typelevel.jawn.ParseException | ||
import zio.{System => _, _} | ||
import zio.stream._ | ||
import zio.test._ | ||
import zio.test.magnolia.DeriveGen | ||
|
||
object ParserSpec extends ZIOSpecDefault { | ||
override def spec = | ||
suite("Parser Specification")( | ||
suite("Streaming JSON Arrays")( | ||
test("parses valid streaming bodies with no spaces")( | ||
testcase(Parser.jsonArrayPipeline)(JsonStream.jsonArrayStream(_.noSpaces)) | ||
) + | ||
test("parses valid streaming bodies with spaces and new lines")( | ||
testcase(Parser.jsonArrayPipeline)(JsonStream.jsonArrayStream(_.spaces2)) | ||
) + | ||
test("parses valid streaming bodies with more spaces and new lines")( | ||
testcase(Parser.jsonArrayPipeline)(JsonStream.jsonArrayStream(_.spaces4)) | ||
) | ||
) + | ||
suite("Streaming JSON bodies separated by new lines")( | ||
test("parses valid streaming bodies with no spaces in JSON")( | ||
testcase(Parser.jsonStreamPipeline)(JsonStream.valueStream(_.noSpaces)) | ||
) + | ||
test("parses valid streaming bodies with spaces and new lines")( | ||
testcase(Parser.jsonStreamPipeline)(JsonStream.valueStream(_.spaces2)) | ||
) + | ||
test("parses valid streaming bodies with more spaces and new lines")( | ||
testcase(Parser.jsonStreamPipeline)(JsonStream.valueStream(_.spaces4)) | ||
) | ||
) | ||
) | ||
|
||
private def testcase( | ||
pipeline: ZPipeline[Any, ParseException, Byte, Json] | ||
)(fn: UStream[Example] => ZStream[Any, Throwable, Byte]): Task[TestResult] = | ||
check(Example.genStream) { examples => | ||
val expected = examples.runCollect | ||
val actual = | ||
fn(examples) | ||
.via(pipeline) | ||
.mapChunks(_.map(_.as[Example]).collect { case Right(value) => value }) | ||
.runCollect | ||
|
||
for { | ||
actual <- actual | ||
expected <- expected | ||
} yield assertTrue(actual == expected) | ||
} | ||
|
||
} | ||
|
||
final case class Example(a: Int, b: String, c: Boolean) | ||
object Example { | ||
implicit val exampleCodec: Codec[Example] = deriveCodec[Example] | ||
|
||
val gen: Gen[Any, Example] = DeriveGen[Example] | ||
|
||
val genStream: Gen[Any, UStream[Example]] = | ||
Gen.chunkOf1(gen).map(ZStream.fromChunk(_)) | ||
} | ||
|
||
object JsonStream { | ||
def valueStream[A: Encoder](stringifyJson: Json => String)(stream: UStream[A]): ZStream[Any, Throwable, Byte] = | ||
stream | ||
.mapChunks(_.map(e => stringifyJson(e.asJson))) | ||
.intersperse(System.lineSeparator()) | ||
.via(ZPipeline.utf8Encode) | ||
|
||
def jsonArrayStream[A: Encoder](stringifyJson: Json => String)(stream: UStream[A]): ZStream[Any, Throwable, Byte] = | ||
( | ||
ZStream("[") ++ | ||
stream.map(e => stringifyJson(e.asJson)).intersperse(",") ++ | ||
ZStream("]") | ||
).via(ZPipeline.utf8Encode) | ||
} |
42 changes: 42 additions & 0 deletions
42
src/test/scala/io/kaizensolutions/zio/streams/circe/examples/Examples.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package io.kaizensolutions.zio.streams.circe.examples | ||
|
||
import io.kaizensolutions.zio.streams.circe.Parser | ||
import zio.stream._ | ||
import zio.{System => _, _} | ||
|
||
object Examples { | ||
object ExampleJsonArrayStream extends ZIOAppDefault { | ||
val jsonArrayStream: UStream[String] = { | ||
val begin = ZStream("[") | ||
val end = ZStream("]") | ||
val json = | ||
(ZStream("""{"foo": "bar"}""") ++ ZStream(",") ++ ZStream("""{"bar": "baz"}""") ++ ZStream(",")) | ||
.repeat(Schedule.recurs(1000)) ++ ZStream("""{"baz": "qux"}""") | ||
begin ++ json ++ end | ||
} | ||
|
||
override val run = | ||
jsonArrayStream | ||
.throttleShape(10, 1.second)(_.length.toLong) | ||
.via(ZPipeline.utf8Encode) | ||
.via(Parser.jsonArrayPipeline) | ||
.map(_.spaces2) | ||
.debug("emit>") | ||
.runDrain | ||
} | ||
|
||
object ExampleJsonValuesStream extends ZIOAppDefault { | ||
val jsonStream: UStream[String] = | ||
ZStream("""{"foo": "bar"}""", System.lineSeparator(), """{"bar": "baz"}""") | ||
.repeat(Schedule.recurs(1000)) | ||
|
||
override val run = | ||
jsonStream | ||
.throttleShape(10, 1.second)(_.length.toLong) | ||
.via(ZPipeline.utf8Encode) | ||
.via(Parser.jsonStreamPipeline) | ||
.map(_.spaces2) | ||
.debug("emit>") | ||
.runDrain | ||
} | ||
} |