# Ejemplo final

Implementamos el mismo ejemplo anterior pero esta vez utilizando **cluster sharding**.

## Índice
- Protocolo de mensajería
  + De entrada
  + De salida
  + Eventos
- Cuentas y transferencias
  + Actor cuenta
    - Funciones, a partir del mensaje: 
      + Id de cuenta a partir del mensaje
      + Región de sharding a partir del mensaje
  + Interfaz cuenta
  + Definición de una transferencia
- Implementaciones
  + Logica de negocio: actualización del balance
  + Publicación de eventos
- Probandolo todo
  + Se crea el sistema de actores con la configuración necesaria
  + Se crea la región de cluster sharding
- Bonus track
  + Utilizar `akka-management`
  + Escuchar los eventos del cluster 

###  Se importan la librerías de akka 

> Ahora se importan las librerías de cluster sharding

In [1]:
import $ivy.`com.typesafe.akka::akka-cluster-sharding:2.5.14`
import $ivy.`com.lightbend.akka.management::akka-management:0.17.0`
import $ivy.`com.lightbend.akka.management::akka-management-cluster-http:0.17.0`

[32mimport [39m[36m$ivy.$                                                
[39m
[32mimport [39m[36m$ivy.$                                                      
[39m
[32mimport [39m[36m$ivy.$                                                                   [39m

### Implicitos necesarios

In [2]:
import scala.concurrent.ExecutionContext 
import java.util.concurrent.Executors
import akka.util.Timeout
import scala.concurrent.duration._

implicit val ec  = ExecutionContext.fromExecutorService( Executors.newFixedThreadPool( 20 ) )
implicit val timeout = Timeout( 5 seconds )   

[32mimport [39m[36mscala.concurrent.ExecutionContext 
[39m
[32mimport [39m[36mjava.util.concurrent.Executors
[39m
[32mimport [39m[36makka.util.Timeout
[39m
[32mimport [39m[36mscala.concurrent.duration._

[39m
[36mec[39m: [32mconcurrent[39m.[32mExecutionContextExecutorService[39m = scala.concurrent.impl.ExecutionContextImpl$$anon$1@54b6bfae
[36mtimeout[39m: [32makka[39m.[32mutil[39m.[32mTimeout[39m = [33mTimeout[39m(5 seconds)

---

---
## Se define el protocolo de mensajería

### Protocolo de entrada

In [3]:
sealed trait AccountIn {
    
    val accountId : String
}

// Commands

sealed trait AccountCommand extends AccountIn { 
    val amount : Int
}

final case class Withdrawal(accountId : String, amount : Int) extends AccountCommand 
final case class Income(accountId : String, amount : Int) extends AccountCommand 


// Queries

sealed trait AccountQuery extends AccountIn

final case class GetBalance(accountId : String) extends AccountQuery

defined [32mtrait[39m [36mAccountIn[39m
defined [32mtrait[39m [36mAccountCommand[39m
defined [32mclass[39m [36mWithdrawal[39m
defined [32mclass[39m [36mIncome[39m
defined [32mtrait[39m [36mAccountQuery[39m
defined [32mclass[39m [36mGetBalance[39m

### Protocolo de salida

In [4]:
sealed trait AccountOut

final case class CurrentBalance( balance: Int ) extends AccountOut
final case class DeltaBalance( delta : Int ) extends AccountOut

defined [32mtrait[39m [36mAccountOut[39m
defined [32mclass[39m [36mCurrentBalance[39m
defined [32mclass[39m [36mDeltaBalance[39m

### Eventos

In [5]:
sealed trait AccountEvent {    
    val idAccount : String
    val amount: Int
}

case class WithdrawalCreated( val idAccount: String, val amount : Int ) extends AccountEvent
case class IncomeCreated( val idAccount: String, val amount : Int ) extends AccountEvent

defined [32mtrait[39m [36mAccountEvent[39m
defined [32mclass[39m [36mWithdrawalCreated[39m
defined [32mclass[39m [36mIncomeCreated[39m

---

---
## Cuentas y transferencias
### Actor 'Cuenta'

In [6]:
import akka.actor._
import scala.collection.mutable.Queue
import scala.util._

class ActorAccount( private val updateBalance : (Int, Int) => Try[Int], 
                       private val queueCQRS: Queue[AccountEvent] ) extends Actor {
    
    val id = self.path.name
    
    var balance : Int = 0
    
    override def receive = {
        
        case command : AccountCommand => manageCommads( command )
        case querry  : AccountQuery   => manageQueries( querry )
        case other                    => unhandled( other )
        
    }
    
    private def manageCommads( command: AccountCommand ) : Unit = {
        
        command match {
            case Withdrawal(_, amount ) => execUpdateBalance( -1 * amount, command)
            case Income( _, amount )     => execUpdateBalance( amount, command )
            
        }           
        
    }
    
    private def execUpdateBalance( amount : Int, command : AccountCommand) = {        
        updateBalance( amount, balance ) match {            
            case Success( newBalance ) => {
                sender() ! DeltaBalance( newBalance - balance ) 
                sendEvent( command )
                balance = newBalance
            }
            case Failure(  error ) => sender() ! Status.Failure( error )         
        }
    }
    
    private def sendEvent( command: AccountCommand ) {
        
        val event : AccountEvent = command match {
            case Withdrawal(_, amount ) => WithdrawalCreated( id, amount ) 
            case Income( _, amount )     => IncomeCreated( id, amount ) 
        }
        
        queueCQRS.enqueue( event )
        
    }
    
    private def manageQueries( queries : AccountQuery ) : Unit = queries match {
        case GetBalance(_) => sender() !  CurrentBalance( balance )
    }
}


/*

#### Se crea un objeto acompañante
Tendra las funciones necesarias para la creación de la región de sharding
*/

object ActorAccount {
    
    import akka.cluster.sharding.{ShardRegion, ClusterSharding, ClusterShardingSettings}
    import akka.actor.ActorRef
    
    
    def props( updateBalance : (Int, Int) => Try[Int], queueCQRS: Queue[AccountEvent] ) = Props {
        new ActorAccount( updateBalance, queueCQRS ) 
    }        
    
    def extractShardId: ShardRegion.ExtractShardId = {
        case a : AccountIn =>  ( Math.abs( a.accountId.hashCode % 3 ) ).toString

    }

    def  extractEntityId: ShardRegion.ExtractEntityId = {
        case a : AccountIn =>  ( a.accountId, a)
    }
    
}


[32mimport [39m[36makka.actor._
[39m
[32mimport [39m[36mscala.collection.mutable.Queue
[39m
[32mimport [39m[36mscala.util._

[39m
defined [32mclass[39m [36mActorAccount[39m
defined [32mobject[39m [36mActorAccount[39m

### Interfaz 'Cuenta'

In [7]:
import scala.concurrent.Future

trait Account {
   def makeWithdrawal( amount : Int ) : Future[ Int ] 
   def makeIncome( amount : Int ) : Future[Int] 
   def getBalance: Future[Int]
}

object Account {
    
    import akka.pattern._
    import akka.actor._
    import akka.util.Timeout
    
    def apply( accountId : String, accountSharding : ActorRef )
                ( implicit ec : ExecutionContext, timeout : Timeout ) = new Account {
      
     def makeWithdrawal( amount : Int ) : Future[ Int] = {
        (
            accountSharding ? Withdrawal( accountId, amount ) 
        ).mapTo[DeltaBalance].map( _.delta )            
     }

     def makeIncome( amount : Int ) : Future[Int] = {
         ( 
            accountSharding ? Income( accountId,  amount ) 
         ).mapTo[DeltaBalance].map( _.delta )             
     }
        
      def getBalance : Future[Int] = {
         ( 
             accountSharding ? GetBalance( accountId )
         ). mapTo[ CurrentBalance ].map( _.balance ) 
      }
        
    }
}



[32mimport [39m[36mscala.concurrent.Future

[39m
defined [32mtrait[39m [36mAccount[39m
defined [32mobject[39m [36mAccount[39m

### Defincición de una transferencia

Se simula una operación/compensacion siguiendo el patrón sagas

In [8]:
object Transfer {
    
    import scala.concurrent._
    
    def transfer( from : Account, to: Account )( amount : Int )( implicit ec : ExecutionContext) = {
        
        from.makeWithdrawal( amount ).flatMap {
             _ => to.makeIncome( amount )
                    .map( _ => true )
                    .recoverWith{ 
                        case _ => from.makeIncome( amount )
                                      .map( _=> false ) 
                    }
        }
        
   }
    
}



defined [32mobject[39m [36mTransfer[39m

---

----
## Implementaciones

### Lógica de negocio
Se define una lógica de negocio simple. En este caso no se admiten descubiertos, pero por ejemplo se pueden implementar diferentes lógicas como un porcentaje de descubierto dependiendo del balance. 
> El objetivo final es que la lógica puede estar separada del actor y puede ser validada y probada aparte

In [9]:
import scala.util._

val updateBalance : (Int,Int) => Try[Int] = ( amount, balance ) => {
   
    val newBalance = amount + balance
    
    if( newBalance >= 0 ) {
    
        Success( newBalance )
        
    } else {
        
        Failure( new IllegalStateException( s"It should not be in red( ${newBalance} )" ) )
    }
    
}

[32mimport [39m[36mscala.util._

[39m
[36mupdateBalance[39m: ([32mInt[39m, [32mInt[39m) => [32mTry[39m[[32mInt[39m] = $sess.cmd8Wrapper$Helper$$Lambda$3409/401254090@16f7f608

### Indirección de publicación de eventos
Se define una cola que será la indirección de publicación de eventos.
En este caso para esta prueba será una cola mitable de Scala.   
> En un sistema real puede ser un akka stream con su fuente '_materializada_' en una cola

In [10]:
import scala.collection.mutable.Queue

val queueCQRS = Queue[AccountEvent]()

[32mimport [39m[36mscala.collection.mutable.Queue

[39m
[36mqueueCQRS[39m: [32mQueue[39m[[32mAccountEvent[39m] = [33mQueue[39m()

---

---
## Probandolo todo

### _Testing: Utilidades_

In [11]:
object TestUtil {
    
    import scala.concurrent._, duration._
    import akka.pattern._
    import akka.util.Timeout


    val tm = 5 seconds
    val timeout = Timeout( tm )

    def result[T]( future : => Future[T] ) =  Try {
        Await.result( future, tm )
    }
    
}

defined [32mobject[39m [36mTestUtil[39m

---
### Iniciando el entorno

#### Singlenton de utilidades del sistema de actores

Se crea un objeto con los métodos necesarios para crear el sistema de actores con la configuración necesaria que requiere akka sharding. También permite parar el sistema de actores de una manera ordenada.

> En este caso existen dos _seed nodes_ configurados para permitir comprobar el comportamiento del cluster

In [12]:

object SystemUtil {
    
    import com.typesafe.config.ConfigFactory 
    import akka.actor._
    import akka.cluster.Cluster
    import scala.concurrent.Future

    val SystemName = "test2"
    
    val AkkaPort = 2554
    
    val AkkaManagementPort = 8554
    
    val akkaCfg =
      s"""
        |akka {
        |  
        |  remote {
        |    netty.tcp {
        |      hostname = "127.0.0.1"
        |      port = ${AkkaPort}
        |    }
        |  }
        |  
        |  cluster {
        |     seed-nodes = [
        |                     "akka.tcp://${SystemName}@127.0.0.1:2554",
        |                     "akka.tcp://${SystemName}@127.0.0.1:2553"
        |                   ]
        |      sharding.state-store-mode = ddata
        |    }
        |
        |  actor {
        |    provider = "akka.cluster.ClusterActorRefProvider"
        |  }
        |
        |  management {
        |     http {
        |       hostname = "127.0.0.1"
        |       port = ${AkkaManagementPort} 
        |     }
        |
        |  }
        |
        |}
      """.stripMargin
    
    lazy val system = ActorSystem.create( SystemName, 
                                          ConfigFactory.parseString( akkaCfg ).resolve() 
                                         )
    def terminate : Unit = {
        val cluster = Cluster.get( system )
        cluster.registerOnMemberRemoved( system.terminate )
        cluster.leave( cluster.selfAddress )
    }
    
}



defined [32mobject[39m [36mSystemUtil[39m

####  Se crea la región de cluster sharding

In [13]:
import akka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
import akka.actor.ActorRef

val accountsSharding : ActorRef = ClusterSharding( SystemUtil.system ).start(
      typeName = "accounts",
      entityProps = ActorAccount.props( updateBalance, queueCQRS ),
      settings = ClusterShardingSettings( SystemUtil.system ),
      extractShardId = ActorAccount.extractShardId,
      extractEntityId = ActorAccount.extractEntityId
    )

[INFO] [09/04/2018 22:17:15.737] [pool-6-thread-15] [akka.remote.Remoting] Starting remoting
[INFO] [09/04/2018 22:17:15.931] [pool-6-thread-15] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://test2@127.0.0.1:2554]
[INFO] [09/04/2018 22:17:15.937] [pool-6-thread-15] [akka.remote.Remoting] Remoting now listens on addresses: [akka.tcp://test2@127.0.0.1:2554]
[INFO] [09/04/2018 22:17:15.961] [pool-6-thread-15] [akka.cluster.Cluster(akka://test2)] Cluster Node [akka.tcp://test2@127.0.0.1:2554] - Starting up...
[INFO] [09/04/2018 22:17:16.117] [pool-6-thread-15] [akka.cluster.Cluster(akka://test2)] Cluster Node [akka.tcp://test2@127.0.0.1:2554] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [09/04/2018 22:17:16.118] [pool-6-thread-15] [akka.cluster.Cluster(akka://test2)] Cluster Node [akka.tcp://test2@127.0.0.1:2554] - Started up successfully
[WARN] [09/04/2018 22:17:16.318] [New I/O boss #3] [NettyTransport(akka://test2)] Remote connection to [null] 

[32mimport [39m[36makka.cluster.sharding.{ClusterSharding, ClusterShardingSettings}
[39m
[32mimport [39m[36makka.actor.ActorRef

[39m
[36maccountsSharding[39m: [32mActorRef[39m = Actor[akka://test2/system/sharding/accounts#-1643427668]

#### Se crean dos 'entidades' cuenta

In [14]:
val accountOne = Account( "accountOne", accountsSharding )
val accountTwo = Account( "accountTwo", accountsSharding )

[36maccountOne[39m: [32mAnyRef[39m with [32mAccount[39m = $sess.cmd6Wrapper$Helper$Account$$anon$1@56309941
[36maccountTwo[39m: [32mAnyRef[39m with [32mAccount[39m = $sess.cmd6Wrapper$Helper$Account$$anon$1@5bad7878

#### Se hace un ingreso incial a las dos cuentas

Después se comprueba el balance y se obtiene el total del dinero (la suma de los dos balances)

> Aquí se hace `Await` sólo por motivos de testing

In [15]:

TestUtil.result{
    for {
        _  <- accountOne.makeIncome( 1000 )
        _  <- accountTwo.makeIncome( 1000 ) 
        b1 <- accountOne.getBalance  
        b2 <- accountTwo.getBalance 
    } yield {( b1, b2, b1 + b2 )}
    
}

[36mres14[39m: [32mTry[39m[([32mInt[39m, [32mInt[39m, [32mInt[39m)] = [33mSuccess[39m(([32m1000[39m, [32m1000[39m, [32m2000[39m))

#### Se comprueban los eventos

In [16]:
queueCQRS.toList ; queueCQRS.clear

[36mres15_0[39m: [32mList[39m[[32mAccountEvent[39m] = [33mList[39m(IncomeCreated(accountOne,1000), IncomeCreated(accountTwo,1000))

---
### _Probando, probando_

#### Funciones de utilidades
Para poder testear transferencias de una cuenta a otra de una manera más cómoda

In [17]:
val transfersOneToTwo =  Transfer.transfer( accountOne, accountTwo)( _ )
val transfersTwoToOne =  Transfer.transfer( accountTwo, accountOne)( _ )

[36mtransfersOneToTwo[39m: [32mInt[39m => [32mFuture[39m[[32mBoolean[39m] = $sess.cmd16Wrapper$Helper$$Lambda$4119/2098623858@1b60cff4
[36mtransfersTwoToOne[39m: [32mInt[39m => [32mFuture[39m[[32mBoolean[39m] = $sess.cmd16Wrapper$Helper$$Lambda$4120/958369353@5edc1284

#### Primera prueba

In [18]:
TestUtil.result{
       for {
           a <- transfersOneToTwo( 500 ) 
           b <- transfersTwoToOne( 500 ) 
       } yield( a && b )
}

[36mres17[39m: [32mTry[39m[[32mBoolean[39m] = [33mSuccess[39m([32mtrue[39m)

#### Se vuelen a compruebar los balances

> `Await` sólo por motivos de testing

In [19]:
TestUtil.result {
    for {
        b1 <- accountOne.getBalance
        b2 <- accountTwo.getBalance  
    } yield{  (b1, b2, b1 +b2) }    
}

[36mres18[39m: [32mTry[39m[([32mInt[39m, [32mInt[39m, [32mInt[39m)] = [33mSuccess[39m(([32m1000[39m, [32m1000[39m, [32m2000[39m))

#### Se vuelven a comprobar los eventos

In [20]:
queueCQRS.toList ; queueCQRS.clear

[36mres19_0[39m: [32mList[39m[[32mAccountEvent[39m] = [33mList[39m(
  WithdrawalCreated(accountOne,500),
  IncomeCreated(accountTwo,500),
  WithdrawalCreated(accountTwo,500),
  IncomeCreated(accountOne,500)
)

---
### Bonus track


#### Akka management

Se arranca akka-management. En este caso arranca un api rest en el puerto definido en `AkkaManagementPort` en `SystemUtil`

In [21]:
import akka.management.AkkaManagement

TestUtil.result {
    AkkaManagement( SystemUtil.system ).start()    
}



[32mimport [39m[36makka.management.AkkaManagement

[39m
[36mres20_1[39m: [32mTry[39m[[32makka[39m.[32mhttp[39m.[32mscaladsl[39m.[32mmodel[39m.[32mUri[39m] = [33mSuccess[39m(http://127.0.0.1:8554)

#### Escuchar eventos del estado del cluster

Actor '_listener_' que escucha los eventos del cluster `MemberEvent` y `ReachabilityEvent` y los almacena en una variable.

Se puede obtener esa información eviando un mensaje del tipo `GetClusterStateEvent`. Después de devolver estos datos se incializa la variable.

In [22]:
sealed trait EventClusterListenerIn 
final case object GetClusterStateEvent extends EventClusterListenerIn

class EventClusterListener extends Actor {
    
    import akka.cluster.Cluster
    
    import akka.cluster.ClusterEvent._
    
    val cluster = Cluster( context.system  )
    
    cluster.subscribe(self, 
                      initialStateMode = InitialStateAsEvents, 
                      classOf[MemberEvent], 
                      classOf[ReachabilityEvent] )
    
    var listDomain = Set[ClusterDomainEvent]()       
    
    
    override def receive = {
        
        case a : ClusterDomainEvent => {
            listDomain = listDomain + a
        }
        
        case GetClusterStateEvent => {
            sender() ! listDomain
            listDomain = Set()
        }       
    }
    
}


defined [32mtrait[39m [36mEventClusterListenerIn[39m
defined [32mobject[39m [36mGetClusterStateEvent[39m
defined [32mclass[39m [36mEventClusterListener[39m

Se crea un objeto que envuelve al actor para gestionar estos mensajes

In [23]:
object EventClusterListener {
    
    import akka.pattern._
    import akka.actor._
    import akka.util.Timeout
    
    import akka.cluster.ClusterEvent._
    
    lazy val listener = SystemUtil.system.actorOf( Props( new EventClusterListener() ) )
    
     def getEvents: Future[Set[ClusterDomainEvent]] = {
       ( listener ? GetClusterStateEvent ) .mapTo[Set[ClusterDomainEvent]]           
     }
}



defined [32mobject[39m [36mEventClusterListener[39m

Se obtienes los eventos escuchados por este nodo

> Se utiliza `TestUtil` por motivos de testing

In [26]:
TestUtil.result {
   EventClusterListener.getEvents
}

[36mres25[39m: [32mTry[39m[[32mSet[39m[[32makka[39m.[32mcluster[39m.[32mClusterEvent[39m.[32mClusterDomainEvent[39m]] = [33mSuccess[39m([33mSet[39m())