Skip to content
Permalink
Browse files

Fix the the test multicast address and made changes to simple peer gr…

…oup for multicast address
  • Loading branch information...
shaileshp0110 committed May 15, 2019
1 parent 1ddede0 commit b564b1486d6aaf1574d86ffbf1581c421e6cec18
@@ -32,17 +32,22 @@ class SimplePeerGroup[A, AA, M](

override def processAddress: A = config.processAddress

override def client(to: A): Task[Channel[A, M]] =
underLyingPeerGroup.client(routingTable(to)).map { underlyingChannel =>
new ChannelImpl(to, underlyingChannel)
}
override def client(to: A): Task[Channel[A, M]] = {
val underlyingAddresses: List[AA] = if (routingTable.contains(to)) List(routingTable(to)) else multiCastTable(to)

val underlyingChannels: Task[List[Channel[AA, Either[ControlMessage[A, AA], M]]]] =
Task.sequence(underlyingAddresses.map { aa =>
underLyingPeerGroup.client(aa)
})
underlyingChannels.map(new ChannelImpl(to, _))
}

override def server(): Observable[Channel[A, M]] = {
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
val a = reverseLookup(underlyingChannel.to)
debug(s"Received new server channel from $a")
new ChannelImpl(a, underlyingChannel)
new ChannelImpl(a, List(underlyingChannel))
}
}

@@ -91,32 +96,45 @@ class SimplePeerGroup[A, AA, M](
}
}

private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]])
private class ChannelImpl(val to: A, underlyingChannel: List[Channel[AA, Either[ControlMessage[A, AA], M]]])
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
debug(
s" ++++++SimplePeerGroup sendMessage message from local address $processAddress to remote address $to , $message"
)
underlyingChannel.sendMessage(Right(message))
Task.sequence(underlyingChannel.map(_.sendMessage(Right(message)))).map(_ => ())
}

override def in: Observable[M] = {
debug(
s" ++++++IN++++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress"
)

underlyingChannel.in.collect {
case Right(message) =>
debug(
s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
)
message
}
Observable
.fromIterable(underlyingChannel.map {
_.in.collect {
case Right(message) =>
debug(
s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
)
message
}
})
.merge

// underlyingChannel.foldLeft(Observable.empty)(x => x)
// underlyingChannel.in.collect {
// case Right(message) =>
// debug(
// s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
// )
// message
// }
}

override def close(): Task[Unit] =
underlyingChannel.close()
Task.sequence(underlyingChannel.map(_.close())).map(_ => ())
}

private def handleEnrollment(enrolMe: EnrolMe[A, AA]): Unit = {
@@ -82,82 +82,111 @@ class SimplePeerGroupSpec extends FlatSpec {

val messageNews = "Latest News"

val aliceReceivedNews = alice
val bobReceivedNews = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val messageSports = "Sports Updates"
val sportUpdates = "Sports Updates"

val aliceMessageSports = alice
val bobSportsUpdate = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageSports)
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val bobClient = bob.client(alice.processAddress).evaluated
val aliceClientNews = alice.client("news").evaluated
val aliceClientSports = alice.client("sports").evaluated

bobClient.sendMessage(messageNews).evaluated
aliceReceivedNews.futureValue shouldBe messageNews
aliceClientNews.sendMessage(messageNews).evaluated
bobReceivedNews.futureValue shouldBe messageNews

bobClient.sendMessage(messageSports).evaluated
aliceMessageSports.futureValue shouldBe messageSports
aliceClientSports.sendMessage(sportUpdates).evaluated
bobSportsUpdate.futureValue shouldBe sportUpdates

}
}
}

// it should "send a message to 2 peers sharing a multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withThreeSimplePeerGroups(
// terminalGroup,
// List("news", "sports"),
// "Alice",
// "Bob",
// "Charlie"
// ) { (alice, bob, charlie) =>
// val message = "HI!! Alice"
//
// val messageReceivedF = alice.messageChannel[String].headL.runToFuture
// val messageReceivedByBobF = bob.messageChannel[String].headL.runToFuture
//
// bob.sendMessage("Alice", message).runToFuture.futureValue
// val messageReceived = messageReceivedF.futureValue
// messageReceived shouldBe (bob.processAddress, message)
//
// val aliceMessage = "HI!! Bob"
// alice.sendMessage("Bob", aliceMessage).runToFuture.futureValue
// val messageReceivedByBob = messageReceivedByBobF.futureValue
// messageReceivedByBob shouldBe (alice.processAddress, aliceMessage)
//
// val messageReceivedByBobNewsF = bob.messageChannel[String].headL.runToFuture
// val messageReceivedByCharlieNewsF = charlie.messageChannel[String].headL.runToFuture
//
// val messageNews = "Latest News"
// alice.sendMessage("news", messageNews).runToFuture.futureValue
// val messageReceivedByBobNews = messageReceivedByBobNewsF.futureValue
// messageReceivedByBobNews shouldBe (alice.processAddress, messageNews)
// val messageReceivedByCharlieNews = messageReceivedByCharlieNewsF.futureValue
// messageReceivedByCharlieNews shouldBe (alice.processAddress, messageNews)
//
// val messageReceivedByBobSportsF = bob.messageChannel[String].headL.runToFuture
// val messageReceivedByCharlieSportsF = charlie.messageChannel[String].headL.runToFuture
//
// val messageSports = "Sports Updates"
// alice.sendMessage("sports", messageSports).runToFuture.futureValue
// val messageReceivedByBobSports = messageReceivedByBobSportsF.futureValue
// messageReceivedByBobSports shouldBe (alice.processAddress, messageSports)
// val messageReceivedByCharlieSports = messageReceivedByCharlieSportsF.futureValue
// messageReceivedByCharlieSports shouldBe (alice.processAddress, messageSports)
//
// }
// }
// }
it should "send a message to 2 peers sharing a multicast address" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withThreeSimplePeerGroups(
terminalGroup,
List("news", "sports"),
"Alice",
"Bob",
"Charlie"
) { (alice, bob, charlie) =>
val bobsMessage = "HI Alice"
val alicesMessage = "HI Bob"

val bobReceived = bob.server().mergeMap(_.in).headL.runToFuture
bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated

val aliceReceived = aliceClient.in.filter(msg => msg == bobsMessage).headL.runToFuture
aliceClient.sendMessage(alicesMessage).evaluated

bobReceived.futureValue shouldBe alicesMessage
aliceReceived.futureValue shouldBe bobsMessage

val messageNews = "Latest News"

val bobReceivedNews = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val charlieReceivedNews = charlie
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == messageNews)
}
.headL
.runToFuture

val aliceClientNews = alice.client("news").evaluated

aliceClientNews.sendMessage(messageNews).evaluated
bobReceivedNews.futureValue shouldBe messageNews
charlieReceivedNews.futureValue shouldBe messageNews

val sportUpdates = "Sports Updates"

val bobSportsUpdate = bob
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val charlieSportsUpdate = charlie
.server()
.mergeMap { channel =>
channel.in.filter(msg => msg == sportUpdates)
}
.headL
.runToFuture

val aliceSportsClient = alice.client("sports").evaluated

aliceSportsClient.sendMessage(sportUpdates).evaluated
bobSportsUpdate.futureValue shouldBe sportUpdates
charlieSportsUpdate.futureValue shouldBe sportUpdates

}
}
}

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(TcpTerminalPeerGroup, UdpTerminalPeerGroup)

0 comments on commit b564b14

Please sign in to comment.
You can’t perform that action at this time.