Skip to content
This repository has been archived by the owner on Dec 31, 2020. It is now read-only.

Implement initial Phoenix Channels module #15

Merged
merged 22 commits into from
Aug 22, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,26 @@ libraryDependencies += "io.taig" %% "communicator" % "3.0.0-RC2"
## Quickstart

```scala
scala> import io.taig.communicator._; import monix.eval.Task
scala> import io.taig.communicator._; import request._; import monix.eval.Task
import io.taig.communicator._
import request._
import monix.eval.Task

scala> // To build request tasks, an implicit OkHttpClient should be in scope
| implicit val client = Client()
client: io.taig.communicator.Client = okhttp3.OkHttpClient@457fda5e
client: io.taig.communicator.Client = okhttp3.OkHttpClient@62903ff9

scala> // Simple OkHttp request builder
| val builder = Request.Builder().url( "http://taig.io/" )
builder: okhttp3.Request.Builder = okhttp3.Request$Builder@62736268
builder: okhttp3.Request.Builder = okhttp3.Request$Builder@4766c6

scala> // Construct a Task[Response]
| val request: Request = Request( builder.build() )
request: io.taig.communicator.Request = io.taig.communicator.Request@555461ef
request: io.taig.communicator.request.Request = io.taig.communicator.request.Request@12be74a9

scala> // Parse the response to a String
| val requestContent: Task[Response.With[String]] = request.parse[String]
requestContent: monix.eval.Task[io.taig.communicator.Response.With[String]] = BindAsync(<function3>,<function1>)
requestContent: monix.eval.Task[io.taig.communicator.request.Response.With[String]] = BindAsync(<function3>,<function1>)

scala> // Kick off the actual request
| import monix.execution.Scheduler.Implicits.global
Expand All @@ -68,7 +69,7 @@ scala> requestContent.runAsync.andThen {
| case Success( content ) => "Success"
| case Failure( exception ) => "Failure"
| }
res5: monix.execution.CancelableFuture[io.taig.communicator.Response.With[String]] = monix.execution.CancelableFuture$Implementation@3e8834f0
res5: monix.execution.CancelableFuture[io.taig.communicator.request.Response.With[String]] = monix.execution.CancelableFuture$Implementation@69987e6
```

## Usage
Expand Down
30 changes: 24 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ lazy val communicator = project.in( file( "." ) )
test <<= test in tests in Test,
tut <<= tut in documentation
)
.aggregate( common, request, websocket )
.dependsOn( common, request, websocket )
.aggregate( common, request, websocket, phoenix )
.dependsOn( common, request, websocket, phoenix )

lazy val common = project
.settings( Settings.common )
.settings(
libraryDependencies ++=
"com.squareup.okhttp3" % "okhttp" % Settings.dependency.okhttp ::
"com.typesafe.scala-logging" %% "scala-logging" % "3.4.0" ::
"io.monix" %% "monix-eval" % Settings.dependency.monix ::
"ch.qos.logback" % "logback-classic" % "1.1.7" ::
Nil,
name := "common",
startYear := Some( 2016 )
Expand All @@ -45,6 +47,19 @@ lazy val websocket = project
)
.dependsOn( common )

lazy val phoenix = project
.settings( Settings.common )
.settings(
libraryDependencies ++=
"io.circe" %% "circe-core" % Settings.dependency.circe ::
"io.circe" %% "circe-generic" % Settings.dependency.circe ::
"io.circe" %% "circe-parser" % Settings.dependency.circe ::
Nil,
name := "phoenix",
startYear := Some( 2016 )
)
.dependsOn( websocket )

lazy val documentation = project
.settings( tutSettings ++ Settings.common )
.settings(
Expand All @@ -55,14 +70,17 @@ lazy val documentation = project
Nil,
tutTargetDirectory := file( "." )
)
.dependsOn( common, request, websocket )
.dependsOn( common, request, websocket, phoenix )

lazy val tests = project
.settings( Settings.common )
.settings (
libraryDependencies ++=
"com.squareup.okhttp3" % "mockwebserver" % Settings.dependency.okhttp % "test" ::
"org.scalatest" %% "scalatest" % "3.0.0-RC4" % "test" ::
Nil
"ch.qos.logback" % "logback-classic" % "1.1.7" % "test" ::
"io.backchat.hookup" %% "hookup" % "0.4.2" % "test" ::
"org.scalatest" %% "scalatest" % "3.0.0" % "test" ::
Nil,
testOptions in Test += Tests.Argument( "-oFD" )
)
.dependsOn( common, request, websocket )
.dependsOn( common, request, websocket, phoenix )
2 changes: 2 additions & 0 deletions common/src/main/scala/io/taig/communicator/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ package object communicator {
}
}

type OkHttpRequest = okhttp3.Request

type Client = OkHttpClient

object Client {
Expand Down
2 changes: 1 addition & 1 deletion documentation/src/main/tut/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ libraryDependencies += "io.taig" %% "communicator" % "3.0.0-RC2"
## Quickstart

```tut
import io.taig.communicator._; import monix.eval.Task
import io.taig.communicator._; import request._; import monix.eval.Task

// To build request tasks, an implicit OkHttpClient should be in scope
implicit val client = Client()
Expand Down
44 changes: 44 additions & 0 deletions phoenix/src/main/scala/io/taig/communicator/phoenix/Channel.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.taig.communicator.phoenix

import io.circe.Json
import io.taig.communicator.phoenix.message.{ Request, Response }
import io.taig.communicator.phoenix.message.Response.Payload
import monix.reactive.Observable

class Channel( phoenix: Phoenix, val topic: Topic ) { self ⇒
private[phoenix] def send( request: Request ): Unit = {
phoenix.writer.send( request )
}

private[phoenix] def send( event: Event, payload: Json ): Unit = {
send( Request( topic, event, payload, phoenix.ref ) )
}

val reader: Observable[Payload] = {
phoenix.reader
.filter( _.topic == topic )
.collect {
case Response( _, _, Some( payload ), _ ) ⇒ payload
}
}

val writer: ChannelWriter = new ChannelWriter {
override def send( event: String, payload: Json ) = {
self.send( Event( event ), payload )
}
}

def leave(): Unit = {
logger.info( s"Leaving channel $topic" )
send( Event.Leave, Json.Null )
}

def close(): Unit = {
leave()
phoenix.close()
}
}

object Channel {
type EventPayload = ( String, Json )
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.taig.communicator.phoenix

import io.circe.Json

trait ChannelWriter {
def send( event: String, payload: Json ): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.taig.communicator.phoenix

import scala.concurrent.duration._
import scala.language.postfixOps

object Default {
val heartbeat: Option[FiniteDuration] = Some( 7 seconds )
}
27 changes: 27 additions & 0 deletions phoenix/src/main/scala/io/taig/communicator/phoenix/Event.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.taig.communicator.phoenix

import cats.data.Xor
import io.circe.{ Decoder, Encoder }

sealed case class Event( name: String )

object Event {
object Close extends Event( "phx_close" )
object Error extends Event( "phx_error" )
object Join extends Event( "phx_join" )
object Reply extends Event( "phx_reply" )
object Leave extends Event( "phx_leave" )

val all = Close :: Error :: Join :: Reply :: Leave :: Nil

implicit val encoderEvent: Encoder[Event] = Encoder[String].contramap( _.name )

implicit val decoderEvent: Decoder[Event] = {
Decoder[String].emap { name ⇒
Xor.fromOption(
all.find( _.name == name ),
s"Event $name does not exist"
)
}
}
}
155 changes: 155 additions & 0 deletions phoenix/src/main/scala/io/taig/communicator/phoenix/Phoenix.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
package io.taig.communicator.phoenix

import io.circe.Json
import io.taig.communicator._
import io.taig.communicator.phoenix.message.Response.{ Payload, Status }
import io.taig.communicator.phoenix.message.{ Request, Response }
import io.taig.communicator.websocket.{ WebSocketChannels, WebSocketWriter }
import monix.eval.Task
import monix.execution.{ Cancelable, Scheduler }
import monix.reactive.OverflowStrategy

import scala.concurrent.duration._
import scala.language.postfixOps

class Phoenix(
channels: WebSocketChannels[Response, Request],
heartbeat: Option[FiniteDuration]
)(
implicit
scheduler: Scheduler
) {
private val iterator: Iterator[Ref] = {
Stream.iterate( 0L )( _ + 1 ).map( Ref( _ ) ).iterator
}

private var periodicHeartbeat: Option[Cancelable] = None

private[phoenix] def ref = synchronized( iterator.next() )

private[phoenix] def withRef[T]( f: Ref ⇒ T ): T = f( ref )

/**
* A Reader that only cares about message events
*
* Also the heartbeat is started with the first subscription.
*/
private[phoenix] val reader = {
channels.reader.collect {
case websocket.Event.Message( response ) ⇒ response
}.doOnStart { _ ⇒
heartbeat.foreach( startHeartbeat )
}.publish
}

/**
* A Writer that proxies the Channel Writer in order the reschedule
* the heartbeat
*/
private[phoenix] val writer = {
heartbeat.fold[WebSocketWriter[Request]]( channels.writer ) { heartbeat ⇒
new HeartbeatWebSocketWriterProxy(
channels.writer,
() ⇒ startHeartbeat( heartbeat ),
stopHeartbeat
)
}
}

private def startHeartbeat( heartbeat: FiniteDuration ): Unit = synchronized {
logger.debug( "Starting heartbeat" )

periodicHeartbeat.foreach { heartbeat ⇒
logger.warn( "Overriding existing heartbeat" )
heartbeat.cancel()
}

val scheduler = Scheduler.singleThread( "heartbeat" )

val cancelable = scheduler.scheduleWithFixedDelay( heartbeat, heartbeat ) {
logger.debug( "Sending heartbeat" )

channels.writer.sendNow {
Request(
Topic( "phoenix" ),
Event( "heartbeat" ),
Json.Null,
ref
)
}
}

periodicHeartbeat = Some( cancelable )
}

private def stopHeartbeat(): Unit = synchronized {
logger.debug( "Stopping heartbeat" )

periodicHeartbeat.foreach( _.cancel() )
periodicHeartbeat = None
}

def join( topic: Topic, payload: Json = Json.Null ): Task[Channel] = withRef { ref ⇒
val send = Task {
logger.info( s"Requesting to join channel $topic" )
val request = Request( topic, Event.Join, payload, ref )
writer.send( request )
reader.connect()
}

val receive = reader.collect {
case Response( `topic`, _, Some( Payload( Status.Ok, _ ) ), `ref` ) ⇒
logger.info( s"Successfully joined channel $topic" )
new Channel( this, topic )
}.firstL

for {
_ ← send
receive ← receive
} yield receive
}

def close(): Unit = {
logger.debug( "Closing" )
stopHeartbeat()
channels.close()
}
}

object Phoenix {
def apply(
request: OkHttpRequest,
strategy: OverflowStrategy.Synchronous[websocket.Event[Response]] = websocket.Default.strategy,
heartbeat: Option[FiniteDuration] = Default.heartbeat
)(
implicit
client: Client,
scheduler: Scheduler
): Phoenix = {
val channels = WebSocketChannels[Response, Request]( request, strategy )
new Phoenix( channels, heartbeat )
}
}

private class HeartbeatWebSocketWriterProxy(
writer: WebSocketWriter[Request],
start: () ⇒ Unit,
stop: () ⇒ Unit
) extends WebSocketWriter[Request] {
override def send( value: Request ) = {
stop()
writer.send( value )
start()
}

override def ping( value: Option[Request] ) = {
stop()
writer.ping( value )
start()
}

override def close( code: Int, reason: Option[String] ) = {
stop()
writer.close( code, reason )
}
}
11 changes: 11 additions & 0 deletions phoenix/src/main/scala/io/taig/communicator/phoenix/Ref.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.taig.communicator.phoenix

import io.circe.{ Decoder, Encoder }

case class Ref( value: Long ) extends AnyVal

object Ref {
implicit val encoderRef: Encoder[Ref] = Encoder[Long].contramap( _.value )

implicit val decoderRef: Decoder[Ref] = Decoder[Long].map( Ref( _ ) )
}
Loading