Skip to content
Permalink
Browse files

Test commented and for both UDP and TCP

  • Loading branch information...
shaileshp0110 committed May 15, 2019
2 parents 665b6be + 95383a9 commit 341d714ca62ad7090400c7c06bc6b987bc0d6ff6
@@ -41,8 +41,7 @@ class SimplePeerGroup[A, AA, M](
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
val a = reverseLookup(underlyingChannel.to)
// debug(s"Received new server channel from $a " +
// s"with underlying id ${underlyingChannel.asInstanceOf[UDPPeerGroup.ChannelImpl[Either[ControlMessage[A, AA], M]]].nettyChannel.id()}")
debug(s"Received new server channel from $a")
new ChannelImpl(a, underlyingChannel)
}
}
@@ -54,7 +53,7 @@ class SimplePeerGroup[A, AA, M](

underLyingPeerGroup
.server()
.flatMap(channel => channel.in)
.mergeMap(channel => channel.in)
.collect {
case Left(e: EnrolMe[A, AA]) => e
}
@@ -66,7 +65,7 @@ class SimplePeerGroup[A, AA, M](

val enrolledTask: Task[Unit] = underLyingPeerGroup
.server()
.flatMap(channel => channel.in)
.mergeMap(channel => channel.in)
.collect {
case Left(e: Enrolled[A, AA]) =>
routingTable.clear()
@@ -9,8 +9,11 @@ import org.slf4j.LoggerFactory

/**
* Trivial example of a higher-level peer group.
* Demonstrates the mapping of addresses and messages to an underlying transport
* where control messages may be sent in addition to those from the user.
* Demonstrates
* 1. the mapping of addresses to an underlying address scheme.
* 2. the mapping of channel and message notifications from an underlying peer group.
* 3. The use of Either to support both user and internal/control message protocols on
* the same underlying peer group.
* There is no enrollment process. Instances are configured with a static table of all known peers.
*/
class SimplestPeerGroup[A, AA, M](

This file was deleted.

Oops, something went wrong.
@@ -32,11 +32,7 @@ class SimplePeerGroupSpec extends FlatSpec {
val alicesMessage = "hi bob, from alice"
val bobsMessage = "hi alice, from bob"

val bobReceived: Future[String] = bob.server()
.flatMap(channel => channel.in)
.filter(msg => msg == alicesMessage)
.headL.runToFuture

val bobReceived = bob.server().mergeMap(_.in).headL.runToFuture
bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated
@@ -49,19 +45,19 @@ class SimplePeerGroupSpec extends FlatSpec {
}
}
}
//

it should "send a message to itself" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withASimplePeerGroup(terminalGroup, "Alice") { alice =>
val message = Random.alphanumeric.take(1044).mkString
val aliceReceived = alice.server().flatMap(_.in).headL.runToFuture
val aliceReceived = alice.server().mergeMap(_.in).headL.runToFuture
val aliceClient: Channel[String, String] = alice.client(alice.processAddress).evaluated
aliceClient.sendMessage(message).runToFuture
aliceReceived.futureValue shouldBe message
}
}
}
//

// it should "send a message to another peer's multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withTwoSimplePeerGroups(
@@ -75,7 +71,7 @@ class SimplePeerGroupSpec extends FlatSpec {
//
// val aliceReceived = alice
// .server()
// .flatMap { channel =>
// .mergeMap { channel =>
// channel.sendMessage(alicesMessage).runToFuture
// channel.in
// }
@@ -86,13 +82,12 @@ class SimplePeerGroupSpec extends FlatSpec {
// bobsClient.sendMessage(bobsMessage).runToFuture
// val bobReceived = bobsClient.in.headL.runToFuture
// aliceReceived.futureValue shouldBe bobsMessage
// bobReceived.futureValue shouldBe alicesMessage
//
// val bobReceivedNews = bob
// .server()
// .flatMap { channel =>
// .mergeMap { channel =>
// channel.in
// }
// }.drop(1)
// .headL
// .runToFuture
// val messageNews = "Latest News"
@@ -103,7 +98,7 @@ class SimplePeerGroupSpec extends FlatSpec {
//
// val bobReceivedSports = bob
// .server()
// .flatMap { channel =>
// .mergeMap { channel =>
// channel.in
// }
// .headL
@@ -167,7 +162,7 @@ class SimplePeerGroupSpec extends FlatSpec {
// }

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(TcpTerminalPeerGroup , UdpTerminalPeerGroup )
val terminalPeerGroups = List(TcpTerminalPeerGroup, UdpTerminalPeerGroup)
}

private def withASimplePeerGroup(
@@ -27,7 +27,7 @@ class TCPPeerGroupSpec extends FlatSpec with BeforeAndAfterAll {
val bobsMessage = Random.alphanumeric.take(1024).mkString

bob.server().foreachL(channel => channel.sendMessage(bobsMessage).evaluated).runToFuture
val bobReceived: Future[String] = bob.server().flatMap(channel => channel.in).headL.runToFuture
val bobReceived: Future[String] = bob.server().mergeMap(channel => channel.in).headL.runToFuture

val aliceClient = alice.client(bob.processAddress).evaluated
val aliceReceived = aliceClient.in.headL.runToFuture
@@ -24,7 +24,7 @@ class UDPPeerGroupSpec extends FlatSpec {
val alicesMessage = Random.alphanumeric.take(1024 * 4).mkString
val bobsMessage = Random.alphanumeric.take(1024 * 4).mkString

val bobReceived: Future[String] = bob.server().flatMap(channel => channel.in).headL.runToFuture
val bobReceived: Future[String] = bob.server().mergeMap(channel => channel.in).headL.runToFuture
bob.server().foreach(channel => channel.sendMessage(bobsMessage).runToFuture)

val aliceClient = alice.client(bob.processAddress).evaluated

0 comments on commit 341d714

Please sign in to comment.
You can’t perform that action at this time.