Skip to content

Commit

Permalink
Add InMemoryPeerGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
EzequielPostan committed Jun 24, 2019
1 parent ab83077 commit 6b81852
Show file tree
Hide file tree
Showing 2 changed files with 200 additions and 0 deletions.
171 changes: 171 additions & 0 deletions scalanet/src/io/iohk/scalanet/peergroup/InMemoryPeerGroup.scala
@@ -0,0 +1,171 @@
package io.iohk.scalanet.peergroup

import monix.eval.Task
import monix.reactive.{MulticastStrategy, Observable}
import monix.reactive.subjects.ConcurrentSubject
import monix.execution.Scheduler.Implicits.global
import InMemoryPeerGroup._
import monix.execution.Scheduler

import scala.util.Random

// For simplicity we can model the network with three main interactions.
// 1. Register to the network.
// 2. Disconnect from the network.
// 3. Send messages.
class Network[A, M] {
private var peers: Map[A, InMemoryPeerGroup[A, M]] = Map()

def register(peer: InMemoryPeerGroup[A, M]): Result[Unit] =
if (peers.get(peer.processAddress).isEmpty) {
peers += (peer.processAddress -> peer)
allGood(())
} else error("The address is already in use")

def disconnect(peer: InMemoryPeerGroup[A, M]): Unit = peers -= peer.processAddress

def deliverMessage(channelID: ChannelID, from: A, to: A, message: M): Result[Unit] =
peers.get(to) match {
case None => error(s"Unreachable peer $to")
case Some(destination) =>
allGood(destination.receiveMessage(channelID, from, message))
}
}

class InMemoryPeerGroup[A, M](address: A)(implicit network: Network[A, M]) extends PeerGroup[A, M] {

private var status: PeerStatus = PeerStatus.NotInitialized
private val channelStream = ConcurrentSubject[InMemoryChannel[A, M]](MulticastStrategy.publish)
private var channelsMap: Map[ChannelID, InMemoryChannel[A, M]] = Map()
def receiveMessage(channelID: ChannelID, from: A, msg: M): Unit = {
channelsMap.get(channelID) match {
case None =>
val newChannel = new InMemoryChannel(channelID, processAddress, from)
channelsMap += (channelID -> newChannel)
channelStream.feed(List(newChannel))
newChannel.depositMessage(msg)
case Some(ch) =>
ch.depositMessage(msg)
}
}

// Public interface
def processAddress: A = address
def initialize(): Result[Unit] =
if (status == PeerStatus.Listening) error("Peer already connected")
else {
status = PeerStatus.Listening
network.register(this)
}
def client(to: A): Result[InMemoryChannel[A, M]] =
allGood(new InMemoryChannel[A, M](newChannelID(), processAddress, to))
def server(): Observable[InMemoryChannel[A, M]] = {
status match {
case PeerStatus.NotInitialized => throw new Exception(s"Peer $processAddress is not initialized yet")
case PeerStatus.Listening => channelStream
case PeerStatus.ShutDowned => throw new Exception(s"Peer $processAddress was shut downed")
}
}
def shutdown(): Result[Unit] = {
status match {
case PeerStatus.NotInitialized =>
allGood(()) // Should we return an error here?
case PeerStatus.Listening =>
status = PeerStatus.ShutDowned
network.disconnect(this)
allGood(())
case PeerStatus.ShutDowned =>
error("The peer group was already down") // Should we return an error here?
}
}
}

object InMemoryPeerGroup {

type ChannelID = java.util.UUID
def newChannelID(): ChannelID = java.util.UUID.randomUUID()

sealed trait PeerStatus
object PeerStatus {
case object NotInitialized extends PeerStatus
case object Listening extends PeerStatus
case object ShutDowned extends PeerStatus
}

trait ChannelStatus
object ChannelStatus {
case object Open extends ChannelStatus
case object Close extends ChannelStatus
}

type Result[A] = Task[A]
def allGood[A](x: A): Result[A] = Task.now(x)
def error(msg: String): Result[Nothing] = Task.now(throw new Exception(msg))

// Reference Channel trait
class InMemoryChannel[A, M](channelID: ChannelID, myAddress: A, destination: A)(implicit network: Network[A, M])
extends Channel[A, M] {
private var channelStatus: ChannelStatus = ChannelStatus.Open
private val messagesQueue: ConcurrentSubject[M, M] = ConcurrentSubject[M](MulticastStrategy.replay)
def depositMessage(m: M): Unit = messagesQueue.feed(List(m))

// Public interface
def to: A = destination
def sendMessage(message: M): Result[Unit] = network.deliverMessage(channelID, myAddress, to, message)
def in: Observable[M] = messagesQueue
def close(): Result[Unit] = {
// Note, what else should be done by close method? E.g. block messages that come to `in`?
// what should happen to the other side of the channel?
channelStatus match {
case ChannelStatus.Open =>
channelStatus = ChannelStatus.Close
allGood(())
case ChannelStatus.Close =>
error("Channel was already closed")
}
}
}

import scala.language.higherKinds

implicit def inMemoryPeerGroupTestUtils(implicit n: Network[Int, String]): TestUtils[Int, String, InMemoryPeerGroup] =
TestUtils.instance(
_.status == PeerStatus.Listening,
() => new InMemoryPeerGroup[Int, String](Random.nextInt()),
_.shutdown()
)

object TestUtils {

def apply[A, M, PG[_, _]](implicit tu: TestUtils[A, M, PG]): TestUtils[A, M, PG] = tu

def instance[A, M, PG[_, _]](
isLis: PG[A, M] => Boolean,
generateRandomPG: () => PG[A, M],
shutd: PG[A, M] => Task[Unit]
): TestUtils[A, M, PG] = new TestUtils[A, M, PG] {
override def isListening(peer: PG[A, M]): Boolean = isLis(peer)
override def generateRandomPeerGroup(): PG[A, M] = generateRandomPG()
override def shutdown(peer: PG[A, M]): Task[Unit] = shutd(peer)
}
}

trait TestUtils[A, M, PG[_, _]] {
def isListening(peer: PG[A, M]): Boolean
def generateRandomPeerGroup(): PG[A, M]
def shutdown(peer: PG[A, M]): Task[Unit]

def withTwoRandomPeerGroups(
testFunction: (PG[A, M], PG[A, M]) => Any
)(implicit n: Network[A, M], sc: Scheduler): Unit = {
val alice = generateRandomPeerGroup()
val bob = generateRandomPeerGroup()
try {
testFunction(alice, bob)
} finally {
shutdown(alice).runAsync(sc)
shutdown(bob).runAsync(sc)
}
}
}
}
@@ -0,0 +1,29 @@
package io.iohk.scalanet.peergroup

import org.scalatest.FlatSpec
import org.scalatest.Matchers._

import monix.execution.Scheduler.Implicits.global

class InMemoryPeerGroupSpec extends FlatSpec {

import InMemoryPeerGroup._

implicit val n: Network[Int, String] = new Network[Int, String]()

def isListening(p: InMemoryPeerGroup[Int, String]): Boolean =
TestUtils[Int, String, InMemoryPeerGroup].isListening(p)
def generateRandomPeerGroup(): InMemoryPeerGroup[Int, String] =
TestUtils[Int, String, InMemoryPeerGroup].generateRandomPeerGroup()

behavior of "InMemoryPeerGroupSpec.scala"

it should "not listen to the network before initialization" in {
val peer = generateRandomPeerGroup()
isListening(peer) shouldBe false
peer.initialize().runAsync
isListening(peer) shouldBe true
peer.shutdown().runAsync
}

}

0 comments on commit 6b81852

Please sign in to comment.