Skip to content
This repository has been archived by the owner on Apr 23, 2019. It is now read-only.

Use Akka Streams instead of Akka Actors #55

Merged
merged 3 commits into from Aug 13, 2017

Conversation

wsargent
Copy link
Member

@wsargent wsargent commented Aug 1, 2017

This PR moves the reactive stocks example to use streams instead of actors as the underlying mechanic. The underlying code is the same as the Scala version, only with some tweaks necessitated by Jackson's serialization.

One thing that was surprising is that the websocket client ignores the value set for "Origin" and instead sends the same value as serverURL. It fails because it's on a different port rather than because it's checking the header and finding a difference.

Fixes #46

Supersedes #51

Compare with playframework/play-scala-websocket-example#74 (scala version of websocket example)

Copy link
Member

@richdougherty richdougherty left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me, but you might also want to get a review from someone who knows this app and streams a bit better. :)

.hasMessageContaining("Invalid Status Code 403");
String serverURL = "ws://localhost:37117/ws";
WebSocketClient.LoggingListener listener = new WebSocketClient.LoggingListener(message -> {
// System.out.println(message);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. BTW, I lost the ability to merge PRs on the examples somewhere down the line (github status altered?)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check up on your Github status for the examples. If you like you can email me if there's anything you think you should have access to that you don't and I'll sort it out. :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wsargent, I've now given you (and the rest of the Core team in Github) Write access to all example repos. Let me know how you get on next time you try to merge.

@marcospereira
Copy link
Member

Will, I'm getting the following error if I close the browser tab:

[error] a.a.RepointableActorRef - Error in stage [akka.stream.scaladsl.MergeHub$$anon$2@164b5b45]: Upstream producer failed with exception, removing from MergeHub now
akka.stream.scaladsl.MergeHub$ProducerFailed: Upstream producer failed with exception, removing from MergeHub now
	at akka.stream.scaladsl.MergeHub$$anon$2$$anon$4.onUpstreamFailure(Hub.scala:261)
	at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:485)
	at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:368)
	at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:571)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:457)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
	at akka.actor.Actor.aroundReceive(Actor.scala:513)
	at akka.actor.Actor.aroundReceive$(Actor.scala:511)
Caused by: java.util.concurrent.TimeoutException: No demand signalled in the last 1 second.
	at akka.stream.impl.Timers$BackpressureTimeout$$anon$4.onTimer(Timers.scala:133)
	at akka.stream.stage.TimerGraphStageLogic.onInternalTimer(GraphStage.scala:1228)
	at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1(GraphStage.scala:1217)
	at akka.stream.stage.TimerGraphStageLogic.$anonfun$getTimerAsyncCallback$1$adapted(GraphStage.scala:1217)
	at akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:443)
	at akka.stream.impl.fusing.GraphInterpreterShell$AsyncInput.execute(ActorGraphInterpreter.scala:453)
	at akka.stream.impl.fusing.GraphInterpreterShell.processEvent(ActorGraphInterpreter.scala:546)
	at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$processEvent(ActorGraphInterpreter.scala:725)
	at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:740)
	at akka.actor.Actor.aroundReceive(Actor.scala:513)

We should "fail" more gracefully here, I think.

@wsargent
Copy link
Member Author

From

at akka.stream.impl.Timers$BackpressureTimeout$$anon$4.onTimer(Timers.scala:133)

that is the backpressure timeout -- I removed it.

@wsargent wsargent merged commit ef399f0 into playframework:2.6.x Aug 13, 2017
@wsargent wsargent deleted the use-streams branch August 13, 2017 23:54
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants