Skip to content

Commit

Permalink
For #5207: use IO instead of Future
Browse files Browse the repository at this point in the history
  • Loading branch information
ebruchez committed Dec 1, 2023
1 parent 41de625 commit 4440279
Show file tree
Hide file tree
Showing 25 changed files with 114 additions and 140 deletions.
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
package org.orbeon.connection

import cats.effect.IO
import cats.effect.unsafe.implicits.global

import java.io.{ByteArrayInputStream, InputStream}
import scala.concurrent.{ExecutionContext, Future}


object ConnectionSupport {

// Convert the `fs2.Stream` to an `InputStream`. Of course, We'd like to stream all the way ideally, but this is a
// first step. We cannot use `fs2.io.toInputStream` because it requires running two threads, which doesn't work in
// JavaScript. So we go through an in-memory `Array` for now. Note that sending data also works with `Array`s.
def fs2StreamToInputStreamInMemory(s: fs2.Stream[IO, Byte]): Future[InputStream] =
s.compile.to(Array).map(new ByteArrayInputStream(_)).unsafeToFuture()
def fs2StreamToInputStreamInMemory(s: fs2.Stream[IO, Byte]): IO[InputStream] =
s.compile.to(Array).map(new ByteArrayInputStream(_))

def asyncToSyncStreamedContent(content: AsyncStreamedContent)(implicit ec: ExecutionContext): Future[StreamedContent] =
def asyncToSyncStreamedContent(content: AsyncStreamedContent): IO[StreamedContent] =
fs2StreamToInputStreamInMemory(content.stream).map(is =>
StreamedContent(
inputStream = is,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.util

import cats.effect.IO
import org.orbeon.connection.{AsyncConnectionResult, AsyncStreamedContent, ConnectionResult, StreamedContent}
import org.orbeon.io.UriScheme
import org.orbeon.oxf.externalcontext.ExternalContext
Expand All @@ -23,7 +24,6 @@ import org.orbeon.oxf.util.CoreUtils._
import org.orbeon.oxf.util.Logging.debug

import java.net.URI
import scala.concurrent.Future
import scala.jdk.CollectionConverters._


Expand Down Expand Up @@ -52,7 +52,7 @@ trait ConnectionTrait {
logBody : Boolean)(implicit
logger : IndentedLogger,
externalContext : ExternalContext
): Future[AsyncConnectionResult]
): IO[AsyncConnectionResult]

def isInternalPath(path: String): Boolean

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.fr

import cats.effect.unsafe.implicits.global
import cats.syntax.option._
import org.orbeon.oxf.externalcontext.ExternalContext
import org.orbeon.oxf.fr.FormRunner._
Expand Down Expand Up @@ -98,7 +99,7 @@ trait FormRunnerPublish {
password = password.trimAllToOpt,
formVersion = dstFormVersionTrimmedOpt,
workflowStage = None
),
).unsafeToFuture(),
Duration.Inf
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.fr

import cats.effect.unsafe.implicits.global
import org.orbeon.oxf.externalcontext.ExternalContext
import org.orbeon.oxf.fr.FormRunner._
import org.orbeon.oxf.fr.FormRunnerPersistence.{DataFormatVersionName, DataXml}
Expand Down Expand Up @@ -110,7 +111,7 @@ trait FormRunnerSummary {
forceAttachments = true,
formVersion = Some(formVersion),
workflowStage = Some(workflowStage)
),
).unsafeToFuture(),
Duration.Inf
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.fr.process

import cats.effect.IO
import org.orbeon.oxf.fr.process.ProcessInterpreter._
import org.orbeon.oxf.fr.process.ProcessParser._
import org.orbeon.oxf.test.{DocumentTestBase, ResourceManagerSupport}
Expand Down Expand Up @@ -51,7 +52,7 @@ class SimpleProcessTest
def writeSuspendedProcess(processId: String, process: String): Unit = _suspendedProcess = Some(processId -> process)
def readSuspendedProcess: Try[(String, String)] = Try(_suspendedProcess.get)

def submitContinuation[T](actionResultF: Future[T], continuation: Try[T] => Unit): Unit = ???
def submitContinuation[T](actionResultF: IO[T], continuation: Try[T] => Unit): Unit = ??? // xxx test

// Constant so that we can test properly
override def createUniqueProcessId: String = ConstantProcessId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.orbeon.oxf.fr

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import cats.syntax.option._
import enumeratum.EnumEntry.Lowercase
import enumeratum._
Expand All @@ -32,7 +31,7 @@ import org.orbeon.oxf.http.Headers._
import org.orbeon.oxf.http.{BasicCredentials, Headers, HttpMethod}
import org.orbeon.oxf.properties.Property
import org.orbeon.oxf.util.ContentTypes.isTextOrXMLOrJSONContentType
import org.orbeon.oxf.util.CoreCrossPlatformSupport.{properties, shiftExternalContext}
import org.orbeon.oxf.util.CoreCrossPlatformSupport.properties
import org.orbeon.oxf.util.CoreUtils._
import org.orbeon.oxf.util.MarkupUtils._
import org.orbeon.oxf.util.PathUtils._
Expand Down Expand Up @@ -684,18 +683,14 @@ trait FormRunnerPersistence {
standaloneOpt = None
)

IO.fromFuture(
shiftExternalContext[IO, Future[AsyncConnectionResult]](t => IO(t)) {
Connection.connectAsync(
method = HttpMethod.PUT,
url = resolvedPutUri,
credentials = credentials,
content = StreamedContent.asyncFromBytes(bytes, ContentTypes.XmlContentType.some).some,
headers = allPutHeaders,
loadState = true,
logBody = false
)
}
Connection.connectAsync(
method = HttpMethod.PUT,
url = resolvedPutUri,
credentials = credentials,
content = StreamedContent.asyncFromBytes(bytes, ContentTypes.XmlContentType.some).some,
headers = allPutHeaders,
loadState = true,
logBody = false
)
}

Expand All @@ -710,18 +705,14 @@ trait FormRunnerPersistence {

val (resolvedGetUri, allGetHeaders) = getAttachmentUriAndHeaders(fromBasePaths, beforeUrl)

IO.fromFuture(
shiftExternalContext[IO, Future[AsyncConnectionResult]](t => IO(t)) {
Connection.connectAsync(
method = HttpMethod.GET,
url = resolvedGetUri,
credentials = None,
content = None,
headers = allGetHeaders,
loadState = true,
logBody = false
)
}
Connection.connectAsync(
method = HttpMethod.GET,
url = resolvedGetUri,
credentials = None,
content = None,
headers = allGetHeaders,
loadState = true,
logBody = false
)
}

Expand Down Expand Up @@ -752,18 +743,14 @@ trait FormRunnerPersistence {
getHeader = xfcd.headersGetter
)

IO.fromFuture(
shiftExternalContext[IO, Future[AsyncConnectionResult]](t => IO(t)) {
Connection.connectAsync(
method = HttpMethod.PUT,
url = resolvedPutUri,
credentials = credentials,
content = StreamedContent(stream, ContentTypes.OctetStreamContentType.some, contentLength = None).some,
headers = allPutHeaders,
loadState = true,
logBody = false
)
}
Connection.connectAsync(
method = HttpMethod.PUT,
url = resolvedPutUri,
credentials = credentials,
content = StreamedContent(stream, ContentTypes.OctetStreamContentType.some, contentLength = None).some,
headers = allPutHeaders,
loadState = true,
logBody = false
)
}

Expand Down Expand Up @@ -807,7 +794,7 @@ trait FormRunnerPersistence {
externalContext : ExternalContext,
coreCrossPlatformSupport: CoreCrossPlatformSupportTrait,
xfcd : XFormsContainingDocument
): Future[(List[AttachmentWithEncryptedAtRest], Option[Int], Option[String])] = {
): IO[(List[AttachmentWithEncryptedAtRest], Option[Int], Option[String])] = {

val credentials =
username map (BasicCredentials(_, password, preemptiveAuth = true, domain = None))
Expand Down Expand Up @@ -844,7 +831,6 @@ trait FormRunnerPersistence {
AttachmentMatch.BasePaths(includes = fromBasePaths.map(_._1), excludes = List(toBasePath))
)


def rewriteServiceUrl(url: String) =
URLRewriterUtils.rewriteServiceURL(
externalContext.getRequest,
Expand All @@ -871,22 +857,19 @@ trait FormRunnerPersistence {
val putUrl =
URI.create(rewriteServiceUrl(PathUtils.appendQueryString(toBaseURI + toBasePath + filename, commonQueryString)))

val resultIo =
for {
savedAttachments <- saveAllAttachmentsStream.compile.toList
_ = updateAttachments(preparedDataDocumentInfo, savedAttachments)
cxr <- saveXmlDataIo(preparedData, putUrl, formVersion, credentials, workflowStage)
versionOpt = Headers.firstItemIgnoreCase(cxr.headers, OrbeonFormDefinitionVersion).map(_.toInt) // will throw if the version is not an integer
bytesOpt <- if (cxr.content.contentType.exists(isTextOrXMLOrJSONContentType)) cxr.content.stream.compile.to(Array).map(Some.apply) else IO.pure(None)
stringOpt = bytesOpt.flatMap(b => SubmissionUtils.readTextContent(StreamedContent.fromBytes(b, cxr.content.contentType)))
} yield
(
savedAttachments,
versionOpt,
stringOpt
)

resultIo.unsafeToFuture()
for {
savedAttachments <- saveAllAttachmentsStream.compile.toList
_ = updateAttachments(preparedDataDocumentInfo, savedAttachments)
cxr <- saveXmlDataIo(preparedData, putUrl, formVersion, credentials, workflowStage)
versionOpt = Headers.firstItemIgnoreCase(cxr.headers, OrbeonFormDefinitionVersion).map(_.toInt) // will throw if the version is not an integer
bytesOpt <- if (cxr.content.contentType.exists(isTextOrXMLOrJSONContentType)) cxr.content.stream.compile.to(Array).map(Some.apply) else IO.pure(None)
stringOpt = bytesOpt.flatMap(b => SubmissionUtils.readTextContent(StreamedContent.fromBytes(b, cxr.content.contentType)))
} yield
(
savedAttachments,
versionOpt,
stringOpt
)

// In our persistence implementation, we do not remove attachments if saving the data fails.
// However, some custom persistence implementations do. So we don't think we can assume that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.fr.process

import cats.effect.IO
import org.orbeon.oxf.common.OXFException
import org.orbeon.oxf.externalcontext.ExternalContext
import org.orbeon.oxf.externalcontext.ExternalContext.EmbeddableParam
Expand Down Expand Up @@ -181,7 +182,7 @@ trait FormRunnerActionsCommon {
)

// Saving is an asynchronous operation
val future: Future[(List[AttachmentWithEncryptedAtRest], Option[Int], Option[String])] =
val computation: IO[(List[AttachmentWithEncryptedAtRest], Option[Int], Option[String])] =
frc.putWithAttachments(
liveData = frc.formInstance.root,
migrate = Some(maybeMigrateData),
Expand Down Expand Up @@ -233,7 +234,7 @@ trait FormRunnerActionsCommon {
}
} .flatten

(future, continuation _)
(computation, continuation _)
}

def tryRelinquishLease(params: ActionParams): ActionResult = ActionResult.trySync {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package org.orbeon.oxf.fr.process

import cats.effect.IO
import org.orbeon.exception.OrbeonFormatter
import org.orbeon.oxf.externalcontext.ExternalContext
import org.orbeon.oxf.fr.XMLNames
Expand All @@ -32,7 +33,6 @@ import org.orbeon.xforms.XFormsNames._
import org.orbeon.xml.NamespaceMapping

import scala.annotation.tailrec
import scala.concurrent.Future
import scala.util.control.NoStackTrace
import scala.util.{Failure, Success, Try}

Expand All @@ -54,7 +54,7 @@ trait ProcessInterpreter extends Logging {
def clearSuspendedProcess(): Unit
def writeSuspendedProcess(processId: String, process: String): Unit
def readSuspendedProcess: Try[(String, String)]
def submitContinuation[T](actionResultF: Future[T], continuation: Try[T] => Unit): Unit
def submitContinuation[T](actionResultF: IO[T], continuation: Try[T] => Unit): Unit
def createUniqueProcessId: String = CoreCrossPlatformSupport.randomHexId
def transactionStart(): Unit
def transactionRollback(): Unit
Expand Down Expand Up @@ -420,11 +420,11 @@ object ProcessInterpreter {
sealed trait InternalActionResult extends ActionResult
object ActionResult {
case class Sync(value: Try[Any]) extends InternalActionResult
case class Async[T](value: Try[(Future[T], Try[T] => Try[Any])]) extends ActionResult
case class Async[T](value: Try[(IO[T], Try[T] => Try[Any])]) extends ActionResult
case class Interrupt(message: Option[String], value: Option[Try[Any]]) extends InternalActionResult

def trySync(body: => Any): ActionResult = ActionResult.Sync(Try(body))
def tryAsync[T](body: => (Future[T], Try[T] => Try[Any])): ActionResult = ActionResult.Async(Try(body))
def tryAsync[T](body: => (IO[T], Try[T] => Try[Any])): ActionResult = ActionResult.Async(Try(body))
}

type ActionParams = Map[Option[String], String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,10 @@
*/
package org.orbeon.oxf.fr.process

import cats.effect.IO
import org.orbeon.dom.Document
import org.orbeon.oxf.fr.FormRunner._
import org.orbeon.oxf.fr.process.ProcessInterpreter.{Action, ActionResult}
import org.orbeon.oxf.fr.process.ProcessInterpreter.Action
import org.orbeon.oxf.fr.process.ProcessParser.{RecoverCombinator, ThenCombinator}
import org.orbeon.oxf.fr.{DataStatus, FormRunnerParams, Names}
import org.orbeon.oxf.util.StringUtils._
Expand All @@ -31,7 +32,6 @@ import org.orbeon.scaxon.NodeInfoConversions
import org.orbeon.scaxon.SimplePath._

import scala.collection.mutable.ListBuffer
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

Expand Down Expand Up @@ -80,12 +80,12 @@ trait SimpleProcessCommon
def clearSuspendedProcess(): Unit =
setvalue(topLevelInstance(Names.PersistenceModel, "fr-processes-instance").get.rootElement, "")

def submitContinuation[T](actionResultF: Future[T], continuation: Try[T] => Unit): Unit =
def submitContinuation[T](actionResultF: IO[T], continuation: Try[T] => Unit): Unit =
inScopeContainingDocument
.getAsynchronousSubmissionManager
.addAsynchronousCompletion(
description = s"process process id: $runningProcessId ",
future = actionResultF,
computation = actionResultF,
continuation = continuation,
awaitInCurrentRequest = Some(Duration.Inf)
)
Expand Down
Loading

0 comments on commit 4440279

Please sign in to comment.