Skip to content

Commit

Permalink
KRZ-182 Future Handling
Browse files Browse the repository at this point in the history
  • Loading branch information
nob13 committed Feb 29, 2024
1 parent e1cc09d commit 0435611
Show file tree
Hide file tree
Showing 9 changed files with 170 additions and 168 deletions.
16 changes: 12 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ val scalaXmlVersion = "2.1.0"
val scalaTagsVersion = "0.12.0"
val circeVersion = "0.14.6"

val isIntelliJ = {
val isIdea = sys.props.get("idea.managed").contains("true")
if (isIdea) {
println("Using IntelliJ workarounds. Do not publish")
}
isIdea
}

def publishSettings = Seq(
publishTo := sonatypePublishToBundle.value,
sonatypeBundleDirectory := (ThisBuild / baseDirectory).value / "target" / "sonatype-staging" / s"${version.value}",
Expand Down Expand Up @@ -70,7 +78,7 @@ lazy val lib = (crossProject(JSPlatform, JVMPlatform, NativePlatform) in file("l
testSettings,
publishSettings,
libraryDependencies += (
"dev.zio" %%% "zio" % zioVersion % Provided
"dev.zio" %%% "zio" % zioVersion % (if (isIntelliJ) Compile else Provided)
)
)
.jsSettings(
Expand Down Expand Up @@ -126,9 +134,9 @@ lazy val rpc = (crossProject(JSPlatform, JVMPlatform, NativePlatform) in file("r
.settings(
name := "kreuzberg-rpc",
libraryDependencies ++= Seq(
"io.circe" %%% "circe-core" % circeVersion,
"io.circe" %%% "circe-parser" % circeVersion,
"dev.zio" %%% "zio" % zioVersion % Provided
"io.circe" %%% "circe-core" % circeVersion,
"io.circe" %%% "circe-parser" % circeVersion,
"dev.zio" %%% "zio" % zioVersion % (if (isIntelliJ) Compile else Provided)
),
evictionErrorLevel := Level.Warn,
testSettings,
Expand Down
180 changes: 88 additions & 92 deletions engine-naive/src/main/scala/kreuzberg/engine/naive/EventManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package kreuzberg.engine.naive
import kreuzberg.*
import kreuzberg.engine.naive.utils.MutableMultimap
import kreuzberg.dom.*
import kreuzberg.engine.common.{TreeNode, ModelValues}
import kreuzberg.engine.common.{ModelValues, TreeNode}

import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future}
import scala.ref.WeakReference
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal
import scala.scalajs.concurrent.JSExecutionContext.Implicits.queue

Expand Down Expand Up @@ -136,35 +136,92 @@ class EventManager(delegate: EventManagerDelegate)(using ServiceRepository) {

private def transformSink[T](node: TreeNode, eventSink: EventSink[T]): T => Unit = {
eventSink match
case EventSink.ModelChange(modelId, f) =>
case EventSink.ModelChange(modelId, f) =>
eventData =>
val change = PendingModelChange(
modelId,
fn = f(eventData, _)
)
_pending.append(change)
ensureNextIteration()
case EventSink.ChannelSink(channel) =>
case EventSink.ChannelSink(channel) =>
eventData => triggerChannel(channel, eventData)
case EventSink.ExecuteCode(f) => f
case EventSink.Multiple(sinks) =>
val converted = sinks.map(transformSink(node, _))
eventData => converted.foreach(x => x(eventData))
case EventSink.ContraCollect(underlying, pf) =>
val converted = transformSink(node, underlying)
eventData =>
if (pf.isDefinedAt(eventData)) {
converted(pf(eventData))
} else {
()
}
case EventSink.ContraMap(underlying, f) =>
val converted = transformSink(node, underlying)
eventData => converted(f(eventData))
case EventSink.SetProperty(property) =>
case EventSink.ExecuteCode(f) => f
case EventSink.SetProperty(property) =>
eventData => {
updateJsProperty(eventData, property)
}
case EventSink.PreTransformer(underlying, transformer) =>
preTransformSink(node, transformer, underlying)
}

private def preTransformSink[E, F](
node: TreeNode,
transformer: EventTransformer[E, F],
sink: EventSink[F]
): E => Unit = {
val underlying = transformSink(node, sink)
buildTransformer(node, transformer, underlying)
}

private def buildTransformer[E, F](
node: TreeNode,
transformer: EventTransformer[E, F],
underlying: F => Unit
): E => Unit = {
transformer match {
case EventTransformer.Map(fn) => in => underlying(fn(in))
case EventTransformer.Collect(fn) => in => if (fn.isDefinedAt(in)) { underlying(fn(in)) }
case EventTransformer.Tapped(fn) =>
in => {
try {
fn(in)
} catch {
case NonFatal(e) =>
Logger.warn(s"Error in tap: ${e.getMessage}")
}
underlying(in)
}
case EventTransformer.WithState(runtimeState) => { in =>
try {
val state = fetchStateUnsafe(runtimeState)
underlying((in, state))
} catch {
case NonFatal(e) =>
Logger.warn(s"Error fetching runtime state: ${e.getMessage}")
}
}
case EventTransformer.WithEffect(effectFn) => { in =>
effectFn(in).fn(implicitly[ExecutionContext]).andThen { case result =>
underlying((in, result))
}
}
case EventTransformer.TryUnpack1(failureSink) => {
val failureSinkTransformed = transformSink(node, failureSink)
in => {
in match {
case Success(ok) => underlying(ok)
case Failure(failure) => failureSinkTransformed(failure)
}
}
}
case EventTransformer.TryUnpack2(failureSink) => {
val failureSinkTransformed = transformSink(node, failureSink)
in => {
in match {
case (v, Success(ok)) => underlying(v -> ok)
case (v, Failure(failure)) => failureSinkTransformed(v -> failure)
}
}
}
case EventTransformer.And(other) => {
val othersTransformed = transformSink(node, other)
in => {
othersTransformed.apply(in)
underlying(in)
}
}
}
}

private def triggerChannel[T](ref: WeakReference[Channel[T]], data: T): Unit = {
Expand All @@ -177,7 +234,7 @@ class EventManager(delegate: EventManagerDelegate)(using ServiceRepository) {

private def bindEventSource[E](ownNode: TreeNode, eventSource: EventSource[E], sink: E => Unit): Unit = {
eventSource match
case j: EventSource.Js[_] =>
case j: EventSource.Js[_] =>
j.jsEvent.componentId match {
case None =>
val handler: ScalaJsEvent => Unit = { in =>
Expand Down Expand Up @@ -219,57 +276,20 @@ class EventManager(delegate: EventManagerDelegate)(using ServiceRepository) {
}
bindJsEvent(source, j.jsEvent, sink)
}
case EventSource.Assembled =>
case EventSource.Assembled =>
scalajs.js.timers.setTimeout(0) {
sink(())
}
case ws: EventSource.WithState[_, _] =>
bindWithState(ownNode, ws, (a, b) => sink((a, b)))
case EventSource.MapSource(from, fn) =>
bindEventSource(
ownNode,
from,
x => {
val mapped = fn(x)
sink(mapped)
}
)
case EventSource.CollectEvent(from, fn) =>
bindEventSource(
ownNode,
from,
x => {
if (fn.isDefinedAt(x)) {
sink(fn(x))
} else {
// nothing
}
}
)
case e: EventSource.EffectEvent[_, _] =>
case e: EventSource.EffectEvent[_, _] =>
bindEffect(ownNode, e, sink)
case a: EventSource.AndSource[_] =>
case a: EventSource.AndSource[_] =>
bindAnd(ownNode, a, sink)
case c: EventSource.ChannelSource[_] =>
case c: EventSource.ChannelSource[_] =>
bindChannel(ownNode, c, sink)
case o: EventSource.OrSource[_] =>
case o: EventSource.OrSource[_] =>
bindEventSource(ownNode, o.left, sink)
bindEventSource(ownNode, o.right, sink)
case t: EventSource.TapSource[_] =>
bindEventSource(
ownNode,
t.inner,
{ value =>
try {
t.fn(value)
} catch {
case NonFatal(e) =>
Logger.warn(s"Error in tap: ${e.getMessage}")
}
sink(value)
}
)
case t: EventSource.Timer =>
case t: EventSource.Timer =>
val timer = if (t.periodic) {
val handle = scalajs.js.timers.setInterval(t.duration) { sink(()) }
RegisteredTimer(
Expand All @@ -282,26 +302,9 @@ class EventManager(delegate: EventManagerDelegate)(using ServiceRepository) {
)
}
_registeredTimers.add(ownNode.id, timer)
}

private def bindWithState[E, S](
ownNode: TreeNode,
eventSource: EventSource.WithState[E, S],
sink: (E, S) => Unit
): Unit = {
bindEventSource[E](
ownNode,
eventSource.inner,
x => {
try {
val state = fetchStateUnsafe(eventSource.runtimeState)
sink(x, state)
} catch {
case NonFatal(e) =>
Logger.warn(s"Error fetching runtime state: ${e.getMessage}")
}
}
)
case EventSource.PostTransformer(source, transformer) =>
val transformedSink = buildTransformer(ownNode, transformer, sink)
bindEventSource(ownNode, source, transformedSink)
}

private def fetchStateUnsafe[S](s: RuntimeState[S]): S = {
Expand Down Expand Up @@ -340,19 +343,12 @@ class EventManager(delegate: EventManagerDelegate)(using ServiceRepository) {
own,
event.trigger,
v =>
runEffect(event.effectOperation(v)).andThen { case result =>
event.effectOperation(v).fn(implicitly[ExecutionContext]).andThen { case result =>
sink((v, result))
}
)
}

private def runEffect[T](in: Effect[T]): Future[T] = {
in match {
case Effect.LazyFuture(fn) => fn(implicitly[ExecutionContext])
case Effect.Const(value) => Future.successful(value)
}
}

private def bindAnd[T](ownNode: TreeNode, event: EventSource.AndSource[T], sink1: T => Unit): Unit = {
val sink0 = transformSink(ownNode, event.binding.sink)
val combined: T => Unit = { value =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ object App extends SimpleComponentBase {
PathCodec.prefix("/lazy/"),
eagerTitle = path => s"Lazy...",
routingTarget = path => {
Effect.future { _ =>
Effect.future {
SlowApiMock.timer(
1.second,
RoutingTarget(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ object TodoAdderForm extends SimpleComponentBase {
object LazyTodoViewer extends LazyLoader[TodoList] {
override def load()(using c: ServiceRepository): Effect[TodoList] = {
val api = provide[Api]
Effect.future { _ => api.todoApi.listItems() }.map(response => TodoList.apply(response.items))
Effect.future { api.todoApi.listItems() }.map(response => TodoList.apply(response.items))
}

override def ok(data: TodoList)(using c: SimpleContext): Html = {
Expand Down
25 changes: 9 additions & 16 deletions lib/shared/src/main/scala/kreuzberg/Effect.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,18 @@ package kreuzberg

import scala.concurrent.{ExecutionContext, Future}

/** Wraps an Effect. */
sealed trait Effect[+T] {
def map[U](f: T => U): Effect[U]
}

object Effect {
case class LazyFuture[T](fn: ExecutionContext => Future[T]) extends Effect[T] {
override def map[U](f: T => U): Effect[U] = {
LazyFuture { ec =>
fn(ec).map(f)(ec)
}
/** Wraps an Effect so that it is lazy */
case class Effect[T](fn: ExecutionContext => Future[T]) {
def map[U](mapFn: T => U): Effect[U] = {
Effect { ec =>
fn(ec).map(mapFn)(ec)
}
}
}

case class Const[T](value: T) extends Effect[T] {
override def map[U](f: T => U): Effect[U] = Const(f(value))
}
object Effect {

def const[T](value: T): Const[T] = Const(value)
def const[T](value: T): Effect[T] = Effect(_ => Future.successful(value))

def future[T](f: ExecutionContext => Future[T]): LazyFuture[T] = LazyFuture(f)
inline def future[T](f: ExecutionContext ?=> Future[T]): Effect[T] = Effect(ec => f(using ec))
}

0 comments on commit 0435611

Please sign in to comment.