Skip to content

Commit

Permalink
Just a bit of refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
zygfryd committed Mar 4, 2019
1 parent b269770 commit 155a8cb
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion src/main/java/zygf/jackshaft/impl/JacksonMiddleware.java
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public J parseValue(final JsonParser jax) throws IOException {
}
}

public boolean parse(final JsonParser jax, final ParsingMode mode, final java.util.function.Consumer<J> consumer) throws IOException {
public boolean parseAsync(final JsonParser jax, final ParsingMode mode, final java.util.function.Consumer<J> consumer) throws IOException {
switch (mode) {
case VALUE:
if (depth != -2) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/zygf/jackshaft/impl/ParsingMiddleware.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ public abstract class ParsingMiddleware<J>
* @param consumer A callback that gets called for each JSON value
* @return True if the array has finished parsing, false otherwise
*/
public abstract boolean parse(JsonParser jax, ParsingMode mode, java.util.function.Consumer<J> consumer) throws IOException;
public abstract boolean parseAsync(JsonParser jax, ParsingMode mode, java.util.function.Consumer<J> consumer) throws IOException;

/**
* Parse a single JSON value.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
package zygf.jackshaft.impl

import java.nio.ByteBuffer
import java.util.function.{Consumer, Supplier}

import scala.language.higherKinds

import akka.util.ByteString
import com.fasterxml.jackson.core.JsonFactory
import com.fasterxml.jackson.core.async.ByteArrayFeeder

class ByteStringParser[J >: Null](val middleware: ParsingMiddleware[J])
class ByteBufferParser[J >: Null](val middleware: ParsingMiddleware[J])
{
import ByteStringParser._
import ByteBufferParser._

private val jax = factory.createNonBlockingByteArrayParser()
private val feeder = jax.getNonBlockingInputFeeder().asInstanceOf[ByteArrayFeeder]

/** Parse a single value from a single ByteString */
def parseValue(input: ByteString): J = {
val it = input.asByteBuffers.iterator
while (it.hasNext) {
val bb = it.next()
/** Parse a single value synchronously */
def parseValue(input: Iterator[ByteBuffer]): J = {
while (input.hasNext) {
val bb = input.next()
if (bb.hasArray) {
val off = bb.arrayOffset
feeder.feedInput(bb.array, off + (bb.position: Int), off + (bb.limit: Int))
Expand All @@ -44,19 +41,19 @@ class ByteStringParser[J >: Null](val middleware: ParsingMiddleware[J])
}
}
}

feeder.endOfInput()
middleware.parseValue(jax)
}

def parse(input: ByteString, mode: ParsingMode)(callback: Consumer[J]): Boolean = {
val it = input.asByteBuffers.iterator
while (it.hasNext) {
val bb = it.next()
/** Parse values from a stream of byte buffers asynchronously */
def parseAsync(input: Iterator[ByteBuffer], mode: ParsingMode)(callback: Consumer[J]): Boolean = {
while (input.hasNext) {
val bb = input.next()
if (bb.hasArray) {
val off = bb.arrayOffset
feeder.feedInput(bb.array, off + (bb.position: Int), off + (bb.limit: Int))
if (middleware.parse(jax, mode, callback)) {
if (middleware.parseAsync(jax, mode, callback)) {
return true
}
}
Expand All @@ -71,23 +68,23 @@ class ByteStringParser[J >: Null](val middleware: ParsingMiddleware[J])
left -= chunk

feeder.feedInput(buf, 0, chunk)
if (middleware.parse(jax, mode, callback)) {
if (middleware.parseAsync(jax, mode, callback)) {
return true
}
}
}
}

false
}

def finish(mode: ParsingMode)(callback: Consumer[J]): Boolean = {
def finishAsync(mode: ParsingMode)(callback: Consumer[J]): Boolean = {
feeder.endOfInput()
middleware.parse(jax, mode, callback)
middleware.parseAsync(jax, mode, callback)
}
}

object ByteStringParser
object ByteBufferParser
{
private val factory = (new JsonFactory)
.enable(JsonFactory.Feature.CANONICALIZE_FIELD_NAMES)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package zygf.jackshaft.impl
package zygf.jackshaft.impl.akka

import java.util.function.Consumer

Expand All @@ -10,8 +10,9 @@ import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import zygf.jackshaft.exceptions.UnexpectedEndOfInputException
import zygf.jackshaft.impl.{ByteBufferParser, ParsingMode}

abstract class JsonParserStage[J >: Null](val makeParser: () => ByteStringParser[J])
abstract class JsonParserStage[J >: Null](val makeParser: () => ByteBufferParser[J])
extends GraphStageWithMaterializedValue[SinkShape[ByteString], Future[J]]
{
private val bytesIn = Inlet[ByteString]("bytesIn")
Expand All @@ -38,7 +39,7 @@ abstract class JsonParserStage[J >: Null](val makeParser: () => ByteStringParser
override def onPush(): Unit = {
if (! promise.isCompleted) {
try {
parser.parse(grab(bytesIn), ParsingMode.VALUE)(consumer)
parser.parseAsync(grab(bytesIn).asByteBuffers.iterator, ParsingMode.VALUE)(consumer)
}
catch {
case NonFatal(e) =>
Expand All @@ -52,7 +53,7 @@ abstract class JsonParserStage[J >: Null](val makeParser: () => ByteStringParser

override def onUpstreamFinish(): Unit = {
try {
parser.finish(ParsingMode.VALUE)(consumer)
parser.finishAsync(ParsingMode.VALUE)(consumer)
}
catch {
case NonFatal(e) =>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package zygf.jackshaft.impl
package zygf.jackshaft.impl.akka

import java.util.function.Consumer

Expand All @@ -8,9 +8,10 @@ import scala.util.control.NonFatal
import akka.stream._
import akka.stream.stage._
import akka.util.ByteString
import zygf.jackshaft.impl.{ByteBufferParser, ParsingMode}

abstract class StreamingJsonParserStage[J >: Null](val mode: ParsingMode,
val makeParser: () => ByteStringParser[J])
val makeParser: () => ByteBufferParser[J])
extends GraphStage[FlowShape[ByteString, J]]
{
private val bytesIn = Inlet[ByteString]("bytesIn")
Expand All @@ -29,7 +30,7 @@ abstract class StreamingJsonParserStage[J >: Null](val mode: ParsingMode,

override def onPush(): Unit = {
try {
parser.parse(grab(bytesIn), mode)(consumer)
parser.parseAsync(grab(bytesIn).asByteBuffers.iterator, mode)(consumer)
}
catch {
case NonFatal(e) =>
Expand All @@ -56,7 +57,7 @@ abstract class StreamingJsonParserStage[J >: Null](val mode: ParsingMode,

override def onUpstreamFinish(): Unit = {
try {
parser.finish(mode)(consumer)
parser.finishAsync(mode)(consumer)
}
catch {
case NonFatal(e) =>
Expand Down
13 changes: 7 additions & 6 deletions src/main/scala/zygf/jackshaft/spray/AkkaSprayJsonSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,16 @@ import akka.stream.scaladsl.{Keep, Source}
import akka.util.ByteString
import spray.json._
import zygf.jackshaft.exceptions.UnexpectedEndOfInputException
import zygf.jackshaft.impl.{ByteStringParser, JsonParserStage, ParsingMode, StreamingJsonParserStage}
import zygf.jackshaft.impl.akka.{JsonParserStage, StreamingJsonParserStage}
import zygf.jackshaft.impl.{ByteBufferParser, ParsingMode, StreamingJsonParserStage}

/**
* A trait providing automatic to and from JSON marshalling/unmarshalling using an in-scope *spray-json* protocol.
*/
trait AkkaSprayJsonSupport {
private def parseStrict(input: ByteString): Future[JsValue] = {
val parser = new ByteStringParser(new SprayMiddleware)
val json = parser.parseValue(input)
val parser = new ByteBufferParser(new SprayMiddleware)
val json = parser.parseValue(input.asByteBuffers.iterator)
if (json eq null) {
FastFuture.failed(UnexpectedEndOfInputException)
}
Expand Down Expand Up @@ -65,9 +66,9 @@ trait AkkaSprayJsonSupport {
implicit def singleJsonReadableFromEntityUnmarshallerConverter[T](reader: RootJsonReader[T]): FromEntityUnmarshaller[T] =
singleJsonReadableFromEntityUnmarshaller(reader)

object ByteStringsToValueStage extends JsonParserStage(() => new ByteStringParser(new SprayMiddleware))
object ByteStringsToArrayStage extends StreamingJsonParserStage(ParsingMode.ARRAY, () => new ByteStringParser(new SprayMiddleware))
object ByteStringsToStreamStage extends StreamingJsonParserStage(ParsingMode.STREAM, () => new ByteStringParser(new SprayMiddleware))
object ByteStringsToValueStage extends JsonParserStage(() => new ByteBufferParser(new SprayMiddleware))
object ByteStringsToArrayStage extends StreamingJsonParserStage(ParsingMode.ARRAY, () => new ByteBufferParser(new SprayMiddleware))
object ByteStringsToStreamStage extends StreamingJsonParserStage(ParsingMode.STREAM, () => new ByteBufferParser(new SprayMiddleware))

implicit val streamingJsArrayFromEntityUnmarshaller: FromEntityUnmarshaller[Source[JsValue, NotUsed]] = {
Unmarshaller.withMaterializer[HttpEntity, Source[JsValue, NotUsed]] {
Expand Down

0 comments on commit 155a8cb

Please sign in to comment.