Skip to content
Permalink
Browse files

Merge branch 'CE-613/sessions-multiplexing' of github.com:input-outpu…

…t-hk/scalanet into CE-613/sessions-multiplexing
  • Loading branch information...
jtownson committed May 16, 2019
2 parents 96ab987 + b3da2a0 commit 5ec08524eca5a87c8bed78fec22de451dd65e664
@@ -32,17 +32,22 @@ class SimplePeerGroup[A, AA, M](

override def processAddress: A = config.processAddress

override def client(to: A): Task[Channel[A, M]] =
underLyingPeerGroup.client(routingTable(to)).map { underlyingChannel =>
new ChannelImpl(to, underlyingChannel)
}
override def client(to: A): Task[Channel[A, M]] = {
val underlyingAddresses: List[AA] = if (routingTable.contains(to)) List(routingTable(to)) else multiCastTable(to)

val underlyingChannels: Task[List[Channel[AA, Either[ControlMessage[A, AA], M]]]] =
Task.gatherUnordered(underlyingAddresses.map { aa =>
underLyingPeerGroup.client(aa)
})
underlyingChannels.map(new ChannelImpl(to, _))
}

override def server(): Observable[Channel[A, M]] = {
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
val a = reverseLookup(underlyingChannel.to)
debug(s"Received new server channel from $a")
new ChannelImpl(a, underlyingChannel)
new ChannelImpl(a, List(underlyingChannel))
}
}

@@ -91,23 +96,33 @@ class SimplePeerGroup[A, AA, M](
}
}

private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]])
private class ChannelImpl(val to: A, underlyingChannel: List[Channel[AA, Either[ControlMessage[A, AA], M]]])
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
underlyingChannel.sendMessage(Right(message))
debug(
s"message from local address $processAddress to remote address $to , $message"
)
Task.gatherUnordered(underlyingChannel.map(_.sendMessage(Right(message)))).map(_ => ())
}

override def in: Observable[M] = {
underlyingChannel.in.collect {
case Right(message) =>
debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
Observable
.fromIterable(underlyingChannel.map {
_.in.collect {
case Right(message) =>
debug(
s"Processing inbound message from remote address $to to local address $processAddress, $message"
)
message
}
})
.merge

}

override def close(): Task[Unit] =
underlyingChannel.close()
Task.gatherUnordered(underlyingChannel.map(_.close())).map(_ => ())
}

private def handleEnrollment(enrolMe: EnrolMe[A, AA]): Unit = {
@@ -0,0 +1,18 @@
package io.iohk.scalanet.peergroup

import monix.execution.Scheduler

private[scalanet] class Subscribers[T](id: String = "")(implicit scheduler: Scheduler) {

import monix.reactive.subjects.ReplaySubject
val messageStream = ReplaySubject[T]()

// import monix.reactive.subjects.PublishSubject
// val messageStream = PublishSubject[T]()
def notify(t: T): Unit = {
messageStream.onNext(t)
}

def onComplete(): Unit =
messageStream.onComplete()
}
@@ -13,14 +13,26 @@ import scala.concurrent.Future
import scala.concurrent.duration._
import io.iohk.scalanet.TaskValues._

//import scala.util.Random
import scala.util.Random

class SimplePeerGroupSpec extends FlatSpec {

implicit val patienceConfig = ScalaFutures.PatienceConfig(timeout = 5 second, interval = 1000 millis)

behavior of "SimplePeerGroup"

it should "send a message to itself" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withASimplePeerGroup(terminalGroup, "Alice") { alice =>
val message = Random.alphanumeric.take(1044).mkString
val aliceReceived = alice.server().mergeMap(_.in).headL.runToFuture
val aliceClient: Channel[String, String] = alice.client(alice.processAddress).evaluated
aliceClient.sendMessage(message).runToFuture
aliceReceived.futureValue shouldBe message
}
}
}

it should "send and receive a message to another peer of SimplePeerGroup" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withTwoSimplePeerGroups(
@@ -46,123 +58,138 @@ class SimplePeerGroupSpec extends FlatSpec {
}
}

// it should "send a message to itself" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withASimplePeerGroup(terminalGroup, "Alice") { alice =>
// val message = Random.alphanumeric.take(1044).mkString
// val aliceReceived = alice.server().flatMap(_.in).headL.runToFuture
// val aliceClient: Channel[String, String] = alice.client(alice.processAddress).evaluated
// aliceClient.sendMessage(message).runToFuture
// aliceReceived.futureValue shouldBe message
// }
// }
// }
//
// it should "send a message to another peer's multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withTwoSimplePeerGroups(
// terminalGroup,
// List("news", "sports"),
// "Alice",
// "Bob"
// ) { (alice, bob) =>
// val bobsMessage = "HI Alice"
// val alicesMessage = "HI Bob"
//
// val aliceReceived = alice
// .server()
// .flatMap { channel =>
// channel.sendMessage(alicesMessage).runToFuture
// channel.in
// }
// .headL
// .runToFuture
//
// val bobsClient: Channel[String, String] = bob.client(alice.processAddress).evaluated
// bobsClient.sendMessage(bobsMessage).runToFuture
// val bobReceived = bobsClient.in.headL.runToFuture
// aliceReceived.futureValue shouldBe bobsMessage
//
// val bobReceivedNews = bob
// .server()
// .flatMap { channel =>
// channel.in
// }
// .headL
// .runToFuture
// val messageNews = "Latest News"
// val aliceClient: Channel[String, String] = alice.client(bob.processAddress).evaluated
// aliceClient.sendMessage(messageNews).runToFuture
//
// bobReceivedNews.futureValue shouldBe messageNews
//
// val bobReceivedSports = bob
// .server()
// .flatMap { channel =>
// channel.in
// }
// .headL
// .runToFuture
// val messageSports = "Sports Updates"
//
// val aliceClientNews: Channel[String, String] = alice.client(bob.processAddress).evaluated
//
// aliceClientNews.sendMessage(messageSports).runToFuture
// bobReceivedSports.futureValue shouldBe messageSports
//
// }
// }
// }

// it should "send a message to 2 peers sharing a multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withThreeSimplePeerGroups(
// terminalGroup,
// List("news", "sports"),
// "Alice",
// "Bob",
// "Charlie"
// ) { (alice, bob, charlie) =>
// val message = "HI!! Alice"
//
// val messageReceivedF = alice.messageChannel[String].headL.runToFuture
// val messageReceivedByBobF = bob.messageChannel[String].headL.runToFuture
//
// bob.sendMessage("Alice", message).runToFuture.futureValue
// val messageReceived = messageReceivedF.futureValue
// messageReceived shouldBe (bob.processAddress, message)
//
// val aliceMessage = "HI!! Bob"
// alice.sendMessage("Bob", aliceMessage).runToFuture.futureValue
// val messageReceivedByBob = messageReceivedByBobF.futureValue
// messageReceivedByBob shouldBe (alice.processAddress, aliceMessage)
//
// val messageReceivedByBobNewsF = bob.messageChannel[String].headL.runToFuture
// val messageReceivedByCharlieNewsF = charlie.messageChannel[String].headL.runToFuture
//
// val messageNews = "Latest News"
// alice.sendMessage("news", messageNews).runToFuture.futureValue
// val messageReceivedByBobNews = messageReceivedByBobNewsF.futureValue
// messageReceivedByBobNews shouldBe (alice.processAddress, messageNews)
// val messageReceivedByCharlieNews = messageReceivedByCharlieNewsF.futureValue
// messageReceivedByCharlieNews shouldBe (alice.processAddress, messageNews)
//
// val messageReceivedByBobSportsF = bob.messageChannel[String].headL.runToFuture
// val messageReceivedByCharlieSportsF = charlie.messageChannel[String].headL.runToFuture
//
// val messageSports = "Sports Updates"
// alice.sendMessage("sports", messageSports).runToFuture.futureValue
// val messageReceivedByBobSports = messageReceivedByBobSportsF.futureValue
// messageReceivedByBobSports shouldBe (alice.processAddress, messageSports)
// val messageReceivedByCharlieSports = messageReceivedByCharlieSportsF.futureValue
// messageReceivedByCharlieSports shouldBe (alice.processAddress, messageSports)
//
// }
// }
// }
it should "send a message to another peer's multicast address" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withTwoSimplePeerGroups(
terminalGroup,
List("news", "sports"),
"Alice",
"Bob"
) { (alice, bob) =>
val bobsMessage = "HI Alice"
val alicesMessage = "HI Bob"

val bobReceived = bob.server().mergeMap(_.in).headL.runToFuture
bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated

val aliceReceived = aliceClient.in.filter(msg => msg == bobsMessage).headL.runToFuture
aliceClient.sendMessage(alicesMessage).evaluated

bobReceived.futureValue shouldBe alicesMessage
aliceReceived.futureValue shouldBe bobsMessage

val messageNews = "Latest News"

val bobReceivedNews = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val sportUpdates = "Sports Updates"

val bobSportsUpdate = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val aliceClientNews = alice.client("news").evaluated
val aliceClientSports = alice.client("sports").evaluated

aliceClientNews.sendMessage(messageNews).evaluated
bobReceivedNews.futureValue shouldBe messageNews

aliceClientSports.sendMessage(sportUpdates).evaluated
bobSportsUpdate.futureValue shouldBe sportUpdates

}
}
}

it should "send a message to 2 peers sharing a multicast address" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withThreeSimplePeerGroups(
terminalGroup,
List("news", "sports"),
"Alice",
"Bob",
"Charlie"
) { (alice, bob, charlie) =>
val bobsMessage = "HI Alice"
val alicesMessage = "HI Bob"

val bobReceived = bob.server().mergeMap(_.in).headL.runToFuture
bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated

val aliceReceived = aliceClient.in.filter(msg => msg == bobsMessage).headL.runToFuture
aliceClient.sendMessage(alicesMessage).evaluated

bobReceived.futureValue shouldBe alicesMessage
aliceReceived.futureValue shouldBe bobsMessage

val messageNews = "Latest News"

val bobReceivedNews = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val charlieReceivedNews = charlie
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val aliceClientNews = alice.client("news").evaluated

aliceClientNews.sendMessage(messageNews).evaluated
bobReceivedNews.futureValue shouldBe messageNews
charlieReceivedNews.futureValue shouldBe messageNews

val sportUpdates = "Sports Updates"

val bobSportsUpdate = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val charlieSportsUpdate = charlie
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val aliceSportsClient = alice.client("sports").evaluated

aliceSportsClient.sendMessage(sportUpdates).evaluated
bobSportsUpdate.futureValue shouldBe sportUpdates
charlieSportsUpdate.futureValue shouldBe sportUpdates

}
}
}

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(TcpTerminalPeerGroup /* UdpTerminalPeerGroup*/ )
val terminalPeerGroups = List(TcpTerminalPeerGroup, UdpTerminalPeerGroup)
}

private def withASimplePeerGroup(
@@ -29,7 +29,7 @@ class SimplestPeerGroupSpec extends FlatSpec {

val bobReceived: Future[String] = bob
.server()
.mergeMap { channel =>
.flatMap { channel =>
channel.in
}
.filter { msg =>

0 comments on commit 5ec0852

Please sign in to comment.
You can’t perform that action at this time.