In [None]:
import $ivy.`com.typesafe.akka::akka-stream:2.6.14`

In [None]:
import java.time._
import scala.concurrent._, duration._
import akka._
import akka.actor._
import akka.stream._
import akka.stream.scaladsl._

In [None]:
implicit lazy val system: ActorSystem = ActorSystem("akka-stream-primer")

# Desing your own operator with `GraphStage`

## The identity BidyFlow 

In [None]:
import akka.stream.scaladsl._
import akka.stream._
import akka.stream.stage._

case class IdentityBidy[A, B]()(implicit system: ActorSystem) extends GraphStage[BidiShape[A, A, B, B]] {

    override def toString: String = "Identity Bidi"

    val inC: Inlet[A] = Inlet("inC")
    val outC: Outlet[A] = Outlet("outC")
    val inS: Inlet[B] = Inlet("inS")
    val outS: Outlet[B] = Outlet("outS")

    override val shape = BidiShape(inC, outC, inS, outS)

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {

            setHandler(inC, new InHandler {
                override def onPush(): Unit = {
                    val elem = grab(inC)
                    if (isAvailable(outC)) {
                        push(outC, elem) 
                        system.log.debug(s"inC (onPush): push $elem to outC")  
                    } else system.log.debug(s"inC (onPush): outC unavailable")  
                }
                
                override def onUpstreamFinish(): Unit =
                    completeStage()
            })

            setHandler(outC, new OutHandler {
                override def onPull(): Unit =
                    if (!hasBeenPulled(inC)) {
                        pull(inC)
                        system.log.debug(s"outC (onPull): pull inC")
                    } else system.log.debug(s"outC (onPull): inC has already been pulled")
            })

            setHandler(inS, new InHandler {
                override def onPush(): Unit = {
                    val elem = grab(inS)
                    if (isAvailable(outS)) {
                        push(outS, elem)
                        system.log.debug(s"inS (onPush): push $elem to outS")  
                    } else system.log.debug(s"inS (onPush): outS unavailable")  
                }
            })

            setHandler(outS, new OutHandler {
                override def onPull(): Unit = 
                    if (!hasBeenPulled(inS)) {
                        pull(inS)
                        system.log.debug(s"outS (onPull): pull inS")
                    } else system.log.debug(s"outS (onPull): inS has already been pulled")
            })
        }
}

In [None]:
val idBidy = BidiFlow.fromGraph(IdentityBidy[String, Int]())

In [None]:
val flow = idBidy.join(Flow.fromFunction(_.length))

In [None]:
// Debug level = 4
system.eventStream.setLogLevel(akka.event.Logging.LogLevel(4))

In [None]:
Source(List("a", "aa", "aaa", "aaaa")).via(flow).runForeach(println)

## The OneInOneOut BidyFlow 

In [None]:
import akka.stream.scaladsl._
import akka.stream._
import akka.stream.stage._

case class OneInOneOut[A, B]()(implicit system: ActorSystem) extends GraphStage[BidiShape[A, A, B, B]] {

    override def toString: String = "Identity Bidi"

    val inC: Inlet[A] = Inlet("inC")
    val outC: Outlet[A] = Outlet("outC")
    val inS: Inlet[B] = Inlet("inS")
    val outS: Outlet[B] = Outlet("outS")

    override val shape = BidiShape(inC, outC, inS, outS)

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
        new GraphStageLogic(shape) {
            
            var beingProcessed: Boolean = false

            setHandler(inC, new InHandler {
                override def onPush(): Unit = {
                    val elem = grab(inC)
                    if (isAvailable(outC)) {
                        push(outC, elem) 
                        beingProcessed = true
                        system.log.debug(s"inC (onPush): push $elem to outC")  
                    } else 
                        system.log.debug(s"inC (onPush): outC unavailable")  
                }
                
                override def onUpstreamFinish(): Unit =
                    completeStage()
            })

            setHandler(outC, new OutHandler {
                override def onPull(): Unit =
                    if (!hasBeenPulled(inC) && !beingProcessed) {
                        pull(inC)
                        system.log.debug(s"outC (onPull): pull inC")
                    } else if (!hasBeenPulled(inC) && beingProcessed)
                        system.log.debug(s"outC (onPull): waiting until it's been processed")
                    else if (hasBeenPulled(inC) && beingProcessed)
                        system.log.debug(s"outC (onPull): inC has already been pulled & being processed: should not happen!")
                    else 
                        system.log.debug(s"outC (onPull): inC has already been pulled")
            })

            setHandler(inS, new InHandler {
                override def onPush(): Unit = {
                    val elem = grab(inS)
                    
                    beingProcessed = false
                    
                    if (!hasBeenPulled(inC)){
                        pull(inC)
                        system.log.debug(s"inS (onPush): pull inC")
                    } else 
                        system.log.debug("inS (onPush): inC has already been pulled")
                        
                    
                    if (isAvailable(outS)) {
                        push(outS, elem)
                        system.log.debug(s"inS (onPush): push $elem to outS")  
                    } else 
                        system.log.debug(s"inS (onPush): outS unavailable")  
                }
            })

            setHandler(outS, new OutHandler {
                override def onPull(): Unit = 
                    if (!hasBeenPulled(inS)) {
                        pull(inS)
                        system.log.debug(s"outS (onPull): pull inS")
                    } else 
                        system.log.debug(s"outS (onPull): inS has already been pulled")
            })
        }
}

In [None]:
val oneToOneBidy = BidiFlow.fromGraph(OneInOneOut[String, Int]())

In [None]:
val flow = oneToOneBidy.join(Flow.fromFunction(_.length))

In [None]:
// Debug level = 4
system.eventStream.setLogLevel(akka.event.Logging.LogLevel(4))

In [None]:
Source(List("a", "aa", "aaa", "aaaa")).via(flow).runForeach(println)