Skip to content

Commit

Permalink
active channels to IntetSocketAddress
Browse files Browse the repository at this point in the history
  • Loading branch information
shaileshp0110 committed May 15, 2019
1 parent 09ec551 commit 86b272d
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 69 deletions.
9 changes: 6 additions & 3 deletions src/io/iohk/scalanet/peergroup/SimplePeerGroup.scala
Expand Up @@ -41,8 +41,8 @@ class SimplePeerGroup[A, AA, M](
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
val a = reverseLookup(underlyingChannel.to)
debug(s"Received new server channel from $a " +
s"with underlying id ${underlyingChannel.asInstanceOf[TCPPeerGroup.ServerChannelImpl[Either[ControlMessage[A, AA], M]]].nettyChannel.id()}")
// debug(s"Received new server channel from $a " +
// s"with underlying id ${underlyingChannel.asInstanceOf[UDPPeerGroup.ChannelImpl[Either[ControlMessage[A, AA], M]]].nettyChannel.id()}")
new ChannelImpl(a, underlyingChannel)
}
}
Expand Down Expand Up @@ -96,13 +96,16 @@ class SimplePeerGroup[A, AA, M](
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
debug(s" ++++++SimplePeerGroup sendMessage message from local address $processAddress to remote address $to , $message")
underlyingChannel.sendMessage(Right(message))
}

override def in: Observable[M] = {
debug(s" ++++++IN++++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress")

underlyingChannel.in.collect {
case Right(message) =>
debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
debug(s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/io/iohk/scalanet/peergroup/UDPPeerGroup.scala
Expand Up @@ -32,8 +32,9 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
new Subscribers[Channel[InetMultiAddress, M]](s"Channel Subscribers for UDPPeerGroup@'$processAddress'")

private val workerGroup = new NioEventLoopGroup()
private val activeChannels = new ConcurrentHashMap[InetSocketAddress, Subscribers[M]]().asScala

private val activeChannels = new ConcurrentHashMap[ChannelId, Subscribers[M]]().asScala
// private val activeChannels = new ConcurrentHashMap[ChannelId, Subscribers[M]]().asScala

/**
* 64 kilobytes is the theoretical maximum size of a complete IP datagram
Expand Down Expand Up @@ -70,7 +71,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
_ <- toTask(workerGroup.shutdownGracefully())
} yield ()

private class ChannelImpl(
private [scalanet] class ChannelImpl(
val nettyChannel: NioDatagramChannel,
promisedRemoteAddress: Promise[InetMultiAddress]
)(implicit codec: Codec[M])
Expand All @@ -85,7 +86,11 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
log.debug(
s"New channel created with id ${nettyChannel.id()} from ${nettyChannel.localAddress()} to $remoteAddress"
)
activeChannels.getOrElseUpdate(nettyChannel.id, new Subscribers[M])

val x = new Subscribers[M]
activeChannels.getOrElseUpdate(remoteAddress.inetSocketAddress, x)

// activeChannels.getOrElseUpdate(nettyChannel.id, x)
}

override def to: InetMultiAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)
Expand All @@ -96,20 +101,32 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
sendResult <- sendMessage(message, remoteAddress, nettyChannel)
} yield sendResult

override def in: Observable[M] = Await.result(messageSubscribersF, Duration.Inf).messageStream
override def in: Observable[M] ={

log.debug(
s"Processing Observable inbound message from remote address remote ${nettyChannel.remoteAddress()} " +
s"to local address ${nettyChannel.localAddress()} via channel id ChannelId ${nettyChannel.id()}."
)
Await.result(messageSubscribersF, Duration.Inf).messageStream
}

override def close(): Task[Unit] = {
activeChannels.remove(nettyChannel.id)
activeChannels.remove(nettyChannel.remoteAddress())
Task.unit
}

override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
val datagram = msg.asInstanceOf[DatagramPacket]
val remoteAddress = datagram.sender()
log.debug(
s"Processing ******channelRead message from remote address remote ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}."
)
if (!promisedRemoteAddress.isCompleted) {
promisedRemoteAddress.complete(Success(new InetMultiAddress(remoteAddress)))
channelSubscribers.notify(this)
}
// if (activeChannels2.contains(remoteAddress)) channelSubscribers.notify(this)

codec.decode(datagram.content().nioBuffer().asReadOnlyBuffer()).map { m =>
messageSubscribersF.foreach { messageSubscribers =>
Expand All @@ -123,6 +140,11 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
}

private def sendMessage(message: M, to: InetMultiAddress, nettyChannel: NioDatagramChannel): Task[Unit] = {

log.debug(
s"sendMessage message to remote address remote ${to.inetSocketAddress} " +
s"from local address ${processAddress.inetSocketAddress} via channel id ChannelId ${nettyChannel.id()}."
)
val nettyBuffer = Unpooled.wrappedBuffer(codec.encode(message))
toTask(nettyChannel.writeAndFlush(new DatagramPacket(nettyBuffer, to.inetSocketAddress, processAddress.inetSocketAddress)))
}
Expand Down
123 changes: 62 additions & 61 deletions src/io/iohk/scalanet/test/peergroup/SimplePeerGroupSpec.scala
Expand Up @@ -37,19 +37,19 @@ class SimplePeerGroupSpec extends FlatSpec {
.filter(msg => msg == alicesMessage)
.headL.runToFuture

bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated

val aliceReceived = aliceClient.in.filter(msg => msg == bobsMessage).headL.runToFuture

bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

aliceClient.sendMessage(alicesMessage).evaluated

bobReceived.futureValue shouldBe alicesMessage
aliceReceived.futureValue shouldBe bobsMessage
}
}
}

//
it should "send a message to itself" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withASimplePeerGroup(terminalGroup, "Alice") { alice =>
Expand All @@ -61,62 +61,63 @@ class SimplePeerGroupSpec extends FlatSpec {
}
}
}

it should "send a message to another peer's multicast address" in new SimpleTerminalPeerGroups {
terminalPeerGroups.foreach { terminalGroup =>
withTwoSimplePeerGroups(
terminalGroup,
List("news", "sports"),
"Alice",
"Bob"
) { (alice, bob) =>
val bobsMessage = "HI Alice"
val alicesMessage = "HI Bob"

val aliceReceived = alice
.server()
.flatMap { channel =>
channel.sendMessage(alicesMessage).runToFuture
channel.in
}
.headL
.runToFuture

val bobsClient: Channel[String, String] = bob.client(alice.processAddress).evaluated
bobsClient.sendMessage(bobsMessage).runToFuture
val bobReceived = bobsClient.in.headL.runToFuture
aliceReceived.futureValue shouldBe bobsMessage

val bobReceivedNews = bob
.server()
.flatMap { channel =>
channel.in
}
.headL
.runToFuture
val messageNews = "Latest News"
val aliceClient: Channel[String, String] = alice.client(bob.processAddress).evaluated
aliceClient.sendMessage(messageNews).runToFuture

bobReceivedNews.futureValue shouldBe messageNews

val bobReceivedSports = bob
.server()
.flatMap { channel =>
channel.in
}
.headL
.runToFuture
val messageSports = "Sports Updates"

val aliceClientNews: Channel[String, String] = alice.client(bob.processAddress).evaluated

aliceClientNews.sendMessage(messageSports).runToFuture
bobReceivedSports.futureValue shouldBe messageSports

}
}
}
//
// it should "send a message to another peer's multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
// withTwoSimplePeerGroups(
// terminalGroup,
// List("news", "sports"),
// "Alice",
// "Bob"
// ) { (alice, bob) =>
// val bobsMessage = "HI Alice"
// val alicesMessage = "HI Bob"
//
// val aliceReceived = alice
// .server()
// .flatMap { channel =>
// channel.sendMessage(alicesMessage).runToFuture
// channel.in
// }
// .headL
// .runToFuture
//
// val bobsClient: Channel[String, String] = bob.client(alice.processAddress).evaluated
// bobsClient.sendMessage(bobsMessage).runToFuture
// val bobReceived = bobsClient.in.headL.runToFuture
// aliceReceived.futureValue shouldBe bobsMessage
// bobReceived.futureValue shouldBe alicesMessage
//
// val bobReceivedNews = bob
// .server()
// .flatMap { channel =>
// channel.in
// }
// .headL
// .runToFuture
// val messageNews = "Latest News"
// val aliceClient: Channel[String, String] = alice.client(bob.processAddress).evaluated
// aliceClient.sendMessage(messageNews).runToFuture
//
// bobReceivedNews.futureValue shouldBe messageNews
//
// val bobReceivedSports = bob
// .server()
// .flatMap { channel =>
// channel.in
// }
// .headL
// .runToFuture
// val messageSports = "Sports Updates"
//
// val aliceClientNews: Channel[String, String] = alice.client(bob.processAddress).evaluated
//
// aliceClientNews.sendMessage(messageSports).runToFuture
// bobReceivedSports.futureValue shouldBe messageSports
//
// }
// }
// }

// it should "send a message to 2 peers sharing a multicast address" in new SimpleTerminalPeerGroups {
// terminalPeerGroups.foreach { terminalGroup =>
Expand Down Expand Up @@ -166,7 +167,7 @@ class SimplePeerGroupSpec extends FlatSpec {
// }

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(TcpTerminalPeerGroup/*, UdpTerminalPeerGroup*/)
val terminalPeerGroups = List(UdpTerminalPeerGroup/*, UdpTerminalPeerGroup*/)
}

private def withASimplePeerGroup(
Expand Down

0 comments on commit 86b272d

Please sign in to comment.