Skip to content

Commit

Permalink
Fix/json huge parsing (#37)
Browse files Browse the repository at this point in the history
* named param
* sequenceCardanoApiResponses
* use foldLeft
  • Loading branch information
maciejbak85 committed Nov 17, 2020
1 parent 011fe9b commit fa43c30
Show file tree
Hide file tree
Showing 8 changed files with 53,016 additions and 31 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea
target
.bsp
10 changes: 9 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
val akkaVersion = "2.6.8"
val akkaHttpVersion = "10.2.0"
val akkaHttpCirce = "1.31.0"
val akkaJsonStreaming = "2.0.2"
val circeVersion = "0.13.0"
val scalaTestVersion = "3.1.2"
val commonsCodecVersion = "1.15"
Expand Down Expand Up @@ -47,9 +48,16 @@ lazy val rootProject = (project in file("."))
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-actor-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"com.lightbend.akka" %% "akka-stream-alpakka-json-streaming" % akkaJsonStreaming,
"io.circe" %% "circe-generic-extras" % circeVersion,
"de.heikoseeberger" %% "akka-http-circe" % akkaHttpCirce,
"commons-codec" % "commons-codec" % commonsCodecVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % "it, test",
)
),
assemblyMergeStrategy in assembly := {
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
)
2 changes: 1 addition & 1 deletion cmdline.sh
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#!/bin/bash

VER=0.1.3-SNAPSHOT
#BASE_URL="http://cardano-wallet-testnet.iog.solutions:8090/v2/"
#BASE_URL="http://cardano-wallet-testnet3.iog.solutions:8090/v2/"
#BASE_URL="http://localhost:8090/v2/"

#run sbt assembly to create this jar
Expand Down
66 changes: 42 additions & 24 deletions src/main/scala/iog/psg/cardano/CardanoApiCodec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import java.time.format.DateTimeFormatter

import akka.http.scaladsl.model.ContentType.WithFixedCharset
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.unmarshalling.Unmarshaller.eitherUnmarshaller
import akka.http.scaladsl.unmarshalling.{Unmarshal, Unmarshaller}
import akka.stream.Materializer
import akka.stream.alpakka.json.scaladsl.JsonReader
import akka.stream.scaladsl.Sink
import akka.util.ByteString
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe._
Expand All @@ -24,6 +26,7 @@ import iog.psg.cardano.CardanoApiCodec.TxState.TxState
import iog.psg.cardano.CardanoApiCodec.Units.Units
import org.apache.commons.codec.binary.Hex

import scala.annotation.tailrec
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
Expand Down Expand Up @@ -385,27 +388,39 @@ object CardanoApiCodec {

private def strictEntityF: Future[HttpEntity.Strict] = response.entity.toStrict(timeout)

private def errorUnparseableResult[T](err: Throwable): CardanoApiResponse[T] = Left(ErrorMessage(err.getMessage, "UNPARSEABLE RESULT"))

private def extractErrorResponse[T](strictEntity: Future[HttpEntity.Strict]): Future[CardanoApiResponse[T]] = {
private def extractErrorResponse[T](strictEntity: Future[HttpEntity.Strict]): Future[CardanoApiResponse[T]] =
strictEntity.map(e => toErrorMessage(e.data) match {
case Left(err) => Left(ErrorMessage(err.getMessage, "UNPARSEABLE RESULT"))
case Left(err) => errorUnparseableResult(err)
case Right(v) => Left(v)
})

}

def toNetworkInfoResponse: Future[CardanoApiResponse[NetworkInfo]]
= to[NetworkInfo](Unmarshal(_).to[CardanoApiResponse[NetworkInfo]])

def to[T](f: HttpEntity.Strict => Future[CardanoApiResponse[T]]): Future[CardanoApiResponse[T]] =
decodeResponseEntityOrHandleError(response, () => strictEntityF.flatMap(f))

def toWallet: Future[CardanoApiResponse[Wallet]]
= to[Wallet](Unmarshal(_).to[CardanoApiResponse[Wallet]])

def toWallets: Future[CardanoApiResponse[Seq[Wallet]]]
= to[Seq[Wallet]](Unmarshal(_).to[CardanoApiResponse[Seq[Wallet]]])

def toWalletAddressIds: Future[CardanoApiResponse[Seq[WalletAddressId]]]
= to[Seq[WalletAddressId]](Unmarshal(_).to[CardanoApiResponse[Seq[WalletAddressId]]])

def toFundPaymentsResponse: Future[CardanoApiResponse[FundPaymentsResponse]]
= to[FundPaymentsResponse](Unmarshal(_).to[CardanoApiResponse[FundPaymentsResponse]])

def to[T](f: HttpEntity.Strict => Future[CardanoApiResponse[T]]): Future[CardanoApiResponse[T]] = {
def toCreateTransactionsResponse: Future[CardanoApiResponse[Seq[CreateTransactionResponse]]]
= decodeInStream[CreateTransactionResponse](response, "$[*]")

private def decodeResponseEntityOrHandleError[T](response: HttpResponse, decodeF: () => Future[CardanoApiResponse[T]]) = {
response.entity.contentType match {
case WithFixedCharset(MediaTypes.`application/json`) =>
// Load into memory using toStrict
// a. no responses utilise streaming and
// b. the Either unmarshaller requires it
strictEntityF.flatMap(f)
decodeF()

case c: ContentType
if c.mediaType == MediaTypes.`text/plain` ||
Expand All @@ -418,21 +433,24 @@ object CardanoApiCodec {
}
}

def toWallet: Future[CardanoApiResponse[Wallet]]
= to[Wallet](Unmarshal(_).to[CardanoApiResponse[Wallet]])

def toWallets: Future[CardanoApiResponse[Seq[Wallet]]]
= to[Seq[Wallet]](Unmarshal(_).to[CardanoApiResponse[Seq[Wallet]]])


def toWalletAddressIds: Future[CardanoApiResponse[Seq[WalletAddressId]]]
= to[Seq[WalletAddressId]](Unmarshal(_).to[CardanoApiResponse[Seq[WalletAddressId]]])

def toFundPaymentsResponse: Future[CardanoApiResponse[FundPaymentsResponse]]
= to[FundPaymentsResponse](Unmarshal(_).to[CardanoApiResponse[FundPaymentsResponse]])
final def decodeInStream[T](response: HttpResponse, jsonPath: String)(implicit um: Unmarshaller[String, T]): Future[CardanoApiResponse[Seq[T]]] =
decodeResponseEntityOrHandleError(response, () =>
response.entity.dataBytes
.via(JsonReader.select(jsonPath))
.mapAsync(parallelism = 4)(bs => unmarshalOrRecoverToUnparseable[T](bs.utf8String))
.runWith(Sink.seq).map(sequenceCardanoApiResponses)
)

private def unmarshalOrRecoverToUnparseable[T](utf8String: String)(implicit um: Unmarshaller[String, T]): Future[CardanoApiResponse[T]] =
Unmarshal(utf8String).to[T].map(Right(_)).recover {
case e: Exception => errorUnparseableResult(e)
}

def toCreateTransactionResponses: Future[CardanoApiResponse[Seq[CreateTransactionResponse]]]
= to[Seq[CreateTransactionResponse]](Unmarshal(_).to[CardanoApiResponse[Seq[CreateTransactionResponse]]])
private def sequenceCardanoApiResponses[T](responses: Seq[CardanoApiResponse[T]]): CardanoApiResponse[Seq[T]] =
responses.foldLeft[CardanoApiResponse[Seq[T]]](Right(Seq.empty)) {
case (_, Left(error)) => Left(error)
case (acc, Right(elem)) => acc.map(_ :+ elem)
}

def toCreateTransactionResponse: Future[CardanoApiResponse[CreateTransactionResponse]]
= to[CreateTransactionResponse](Unmarshal(_).to[CardanoApiResponse[CreateTransactionResponse]])
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/iog/psg/cardano/CardanoApiImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private class CardanoApiImpl(baseUriWithPort: String)(implicit ec: ExecutionCont
uri = uriWithQueries,
method = GET
),
_.toCreateTransactionResponses
_.toCreateTransactionsResponse
)
}

Expand Down

0 comments on commit fa43c30

Please sign in to comment.