Skip to content
Permalink
Browse files

Adding InetMultiAddress to help with tcp address confusions.

  • Loading branch information...
jtownson committed May 10, 2019
1 parent 950d41e commit b2bd25d5c9fd7a71294e954f48cd652ee32ea8c7
@@ -0,0 +1,22 @@
package io.iohk.scalanet.peergroup

import java.net.{InetAddress, InetSocketAddress}

case class InetMultiAddress(private[scalanet] val inetSocketAddress: InetSocketAddress) {

val inetAddress: InetAddress = inetSocketAddress.getAddress

def canEqual(other: Any): Boolean = other.isInstanceOf[InetMultiAddress]

override def equals(other: Any): Boolean = other match {
case that: InetMultiAddress =>
(that canEqual this) &&
inetAddress == that.inetAddress
case _ => false
}

override def hashCode(): Int = {
val state = Seq(inetAddress)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
@@ -19,7 +19,6 @@ private[scalanet] class Subscribers[T](id: String = "") {
})

def notify(t: T): Unit = {
println("********notification******" + t)
subscriberSet.foreach(_.onNext(t))
}
}
@@ -23,12 +23,12 @@ import scala.concurrent.{Future, Promise}
import scala.util.Success

class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec: Codec[M])
extends TerminalPeerGroup[InetSocketAddress, M]() {
extends TerminalPeerGroup[InetMultiAddress, M]() {

private val log = LoggerFactory.getLogger(getClass)

private val channelSubscribers =
new Subscribers[Channel[InetSocketAddress, M]](s"Channel Subscribers for TCPPeerGroup@'$processAddress'")
new Subscribers[Channel[InetMultiAddress, M]](s"Channel Subscribers for TCPPeerGroup@'$processAddress'")

private val workerGroup = new NioEventLoopGroup()

@@ -45,6 +45,8 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def initChannel(ch: SocketChannel): Unit = {
val newChannel = new ServerChannelImpl(ch)
channelSubscribers.notify(newChannel)
log.debug(s"$processAddress received inbound from ${ch.remoteAddress()}. " +
s"Notified ${channelSubscribers.subscriberSet.size} subscribers.")
}
})
.option[Integer](ChannelOption.SO_BACKLOG, 128)
@@ -56,13 +58,13 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def initialize(): Task[Unit] =
toTask(serverBind).map(_ => log.info(s"Server bound to address ${config.bindAddress}"))

override def processAddress: InetSocketAddress = config.processAddress
override def processAddress: InetMultiAddress = config.processAddress

override def client(to: InetSocketAddress): Task[Channel[InetSocketAddress, M]] = {
new ClientChannelImpl(to).initialize
override def client(to: InetMultiAddress): Task[Channel[InetMultiAddress, M]] = {
new ClientChannelImpl(to.inetSocketAddress).initialize
}

override def server(): Observable[Channel[InetSocketAddress, M]] = channelSubscribers.messageStream
override def server(): Observable[Channel[InetMultiAddress, M]] = channelSubscribers.messageStream

override def shutdown(): Task[Unit] =
for {
@@ -77,8 +79,10 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
Task.fromFuture(promisedCompletion.future)
}

private class ClientChannelImpl(val to: InetSocketAddress)(implicit codec: Codec[M])
extends Channel[InetSocketAddress, M] {
private class ClientChannelImpl(inetSocketAddress: InetSocketAddress)(implicit codec: Codec[M])
extends Channel[InetMultiAddress, M] {

val to: InetMultiAddress = InetMultiAddress(inetSocketAddress)

private val activation = Promise[ChannelHandlerContext]()
private val activationF = activation.future
@@ -109,15 +113,16 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
}
})

def initialize: Task[ClientChannelImpl] = toTask(bootstrap.connect(to)).map(_ => this)
def initialize: Task[ClientChannelImpl] = toTask(bootstrap.connect(inetSocketAddress)).map(_ => this)

override def sendMessage(message: M): Task[Unit] = {

val f: Future[Unit] =
activationF
.map(ctx => {
println(
s"****My Remote Address :${ctx.channel().remoteAddress()} *******${ctx.channel().id()}*****Client*********My Local Address ${ctx.channel().localAddress()}"
log.debug(
s"Processing outbound message from local address ${ctx.channel().localAddress()} " +
s"to remote address ${ctx.channel().remoteAddress()} via channel id ${ctx.channel().id()}"
)
ctx.writeAndFlush(Unpooled.wrappedBuffer(codec.encode(message)))
})
@@ -137,16 +142,16 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

private class MessageNotifier(val messageSubscribers: Subscribers[M]) extends ChannelInboundHandlerAdapter {
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
println(
s"******remote*${ctx.channel().remoteAddress()}***local***${ctx.channel().localAddress()}**message tcp channel read *********${codec
.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer())}"
log.debug(
s"Processing inbound message from remote address ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()}"
)
codec.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer()).map(messageSubscribers.notify)
codec.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer()).foreach(messageSubscribers.notify)
}
}

private class ServerChannelImpl(val nettyChannel: SocketChannel)(implicit codec: Codec[M])
extends Channel[InetSocketAddress, M] {
extends Channel[InetMultiAddress, M] {

private val messageSubscribers = new Subscribers[M]

@@ -155,14 +160,8 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast(new MessageNotifier(messageSubscribers))
//def getInetSocketAddress = new InetSocketAddress(nettyChannel.remoteAddress().getAddress, config.remoteHostConfig(nettyChannel.remoteAddress().getAddress))

override val to: InetSocketAddress = {
println(s"*My remote Address: ${nettyChannel.remoteAddress()} **********${nettyChannel
.id()}****Server*******My Local Address: ${nettyChannel.localAddress()}")

nettyChannel.remoteAddress()
}
override val to: InetMultiAddress = InetMultiAddress(nettyChannel.remoteAddress())

override def sendMessage(message: M): Task[Unit] = {
toTask({
@@ -182,14 +181,12 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

object TCPPeerGroup {
case class Config(
bindAddress: InetSocketAddress,
processAddress: InetSocketAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
bindAddress: InetSocketAddress,
processAddress: InetMultiAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
)
object Config {
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, bindAddress)
def apply(bindAddress: InetSocketAddress, remoteHostConfig: Map[InetAddress, Int]): Config =
Config(bindAddress, bindAddress, remoteHostConfig)
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, new InetMultiAddress(bindAddress))
}

}
@@ -24,12 +24,12 @@ import scala.concurrent.{Await, Future, Promise}
import scala.util.Success

class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec: Codec[M])
extends TerminalPeerGroup[InetSocketAddress, M]() {
extends TerminalPeerGroup[InetMultiAddress, M]() {

private val log = LoggerFactory.getLogger(getClass)

private val channelSubscribers =
new Subscribers[Channel[InetSocketAddress, M]](s"Channel Subscribers for UDPPeerGroup@'$processAddress'")
new Subscribers[Channel[InetMultiAddress, M]](s"Channel Subscribers for UDPPeerGroup@'$processAddress'")

private val workerGroup = new NioEventLoopGroup()

@@ -45,7 +45,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
.option[RecvByteBufAllocator](ChannelOption.RCVBUF_ALLOCATOR, new DefaultMaxBytesRecvByteBufAllocator)
.handler(new ChannelInitializer[NioDatagramChannel]() {
override def initChannel(ch: NioDatagramChannel): Unit = {
new ChannelImpl(ch, Promise[InetSocketAddress]())
new ChannelImpl(ch, Promise[InetMultiAddress]())
}
})

@@ -54,15 +54,15 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def initialize(): Task[Unit] =
toTask(serverBind).map(_ => log.info(s"Server bound to address ${config.bindAddress}"))

override def processAddress: InetSocketAddress = config.processAddress
override def processAddress: InetMultiAddress = config.processAddress

override def client(to: InetSocketAddress): Task[Channel[InetSocketAddress, M]] = {
val cf = bootstrap.connect(to)
override def client(to: InetMultiAddress): Task[Channel[InetMultiAddress, M]] = {
val cf = bootstrap.connect(to.inetSocketAddress)
val ct: Task[NioDatagramChannel] = toTask(cf).map(_ => cf.channel().asInstanceOf[NioDatagramChannel])
ct.map(nettyChannel => new ChannelImpl(nettyChannel, Promise().complete(Success(to))))
}

override def server(): Observable[Channel[InetSocketAddress, M]] = channelSubscribers.messageStream
override def server(): Observable[Channel[InetMultiAddress, M]] = channelSubscribers.messageStream

override def shutdown(): Task[Unit] =
for {
@@ -72,10 +72,10 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

private class ChannelImpl(
val nettyChannel: NioDatagramChannel,
promisedRemoteAddress: Promise[InetSocketAddress]
promisedRemoteAddress: Promise[InetMultiAddress]
)(implicit codec: Codec[M])
extends ChannelInboundHandlerAdapter
with Channel[InetSocketAddress, M] {
with Channel[InetMultiAddress, M] {

nettyChannel.pipeline().addLast(this)

@@ -88,7 +88,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
activeChannels.getOrElseUpdate(nettyChannel.id, new Subscribers[M])
}

override def to: InetSocketAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)
override def to: InetMultiAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)

override def sendMessage(message: M): Task[Unit] =
for {
@@ -107,23 +107,25 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
val datagram = msg.asInstanceOf[DatagramPacket]
val remoteAddress = datagram.sender()
if (!promisedRemoteAddress.isCompleted) {
promisedRemoteAddress.complete(Success(remoteAddress))
promisedRemoteAddress.complete(Success(new InetMultiAddress(remoteAddress)))
channelSubscribers.notify(this)
}

codec.decode(datagram.content().nioBuffer().asReadOnlyBuffer()).map { m =>
messageSubscribersF.foreach { messageSubscribers =>
log.debug(
s"ChannelId ${ctx.channel().id()} remote : ${ctx.channel().remoteAddress()} local : ${ctx.channel().localAddress()} NOTIFYING SUBSCRIBERS OF THE Message: $m. Subscriber count = ${messageSubscribers.subscriberSet.size}"
s"Processing inbound message from remote address remote ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}. " +
s"Notifying ${messageSubscribers.subscriberSet.size} subscribers."
)
messageSubscribers.notify(m)
}
}
}

private def sendMessage(message: M, to: InetSocketAddress, nettyChannel: NioDatagramChannel): Task[Unit] = {
private def sendMessage(message: M, to: InetMultiAddress, nettyChannel: NioDatagramChannel): Task[Unit] = {
val nettyBuffer = Unpooled.wrappedBuffer(codec.encode(message))
toTask(nettyChannel.writeAndFlush(new DatagramPacket(nettyBuffer, to, processAddress)))
toTask(nettyChannel.writeAndFlush(new DatagramPacket(nettyBuffer, to.inetSocketAddress, processAddress.inetSocketAddress)))
}

}
@@ -137,9 +139,9 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

object UDPPeerGroup {

case class Config(bindAddress: InetSocketAddress, processAddress: InetSocketAddress)
case class Config(bindAddress: InetSocketAddress, processAddress: InetMultiAddress)

object Config {
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, bindAddress)
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, new InetMultiAddress(bindAddress))
}
}
@@ -4,7 +4,7 @@ import java.net._
import java.nio.ByteBuffer

import io.iohk.decco.Codec
import io.iohk.scalanet.peergroup.{PeerGroup, TCPPeerGroup, UDPPeerGroup}
import io.iohk.scalanet.peergroup.{InetMultiAddress, PeerGroup, TCPPeerGroup, UDPPeerGroup}
import monix.execution.Scheduler

import scala.concurrent.Await
@@ -79,7 +79,7 @@ object NetUtils {

def randomTerminalPeerGroup[M](
t: SimpleTerminalPeerGroup
)(implicit scheduler: Scheduler, codec: Codec[M]): PeerGroup[InetSocketAddress, M] =
)(implicit scheduler: Scheduler, codec: Codec[M]): PeerGroup[InetMultiAddress, M] =
t match {
case TcpTerminalPeerGroup => randomTCPPeerGroup
case UdpTerminalPeerGroup => randomUDPPeerGroup
@@ -115,11 +115,8 @@ object NetUtils {
val address = aRandomAddress()
val address2 = aRandomAddress()

val remoteHostConfig = Map[InetAddress, Int](address.getAddress -> address2.getPort)
val remoteHostConfig2 = Map[InetAddress, Int](address2.getAddress -> address.getPort)

val pg1 = new TCPPeerGroup(TCPPeerGroup.Config(address, remoteHostConfig))
val pg2 = new TCPPeerGroup(TCPPeerGroup.Config(address2, remoteHostConfig2))
val pg1 = new TCPPeerGroup(TCPPeerGroup.Config(address))
val pg2 = new TCPPeerGroup(TCPPeerGroup.Config(address2))

Await.result(pg1.initialize().runToFuture, 10 seconds)
Await.result(pg2.initialize().runToFuture, 10 seconds)
@@ -8,7 +8,7 @@ object TaskValues {

implicit class TaskOps[T](task: Task[T]) {

def evaluated(implicit scheduler: Scheduler): T = {
def evaluated(implicit scheduler: Scheduler, patienceConfig: PatienceConfig): T = {
task.runToFuture.futureValue
}
}
@@ -1,7 +1,5 @@
package io.iohk.scalanet.peergroup

import java.net.InetSocketAddress

import io.iohk.decco.auto._
import io.iohk.scalanet.NetUtils._
import io.iohk.scalanet.peergroup.SimplePeerGroup.ControlMessage
@@ -181,7 +179,7 @@ class SimplePeerGroupSpec extends FlatSpec {
private def withASimplePeerGroup(
underlyingTerminalGroup: SimpleTerminalPeerGroup,
a: String
)(testCode: SimplePeerGroup[String, InetSocketAddress, String] => Any): Unit = {
)(testCode: SimplePeerGroup[String, InetMultiAddress, String] => Any): Unit = {
withSimplePeerGroups(underlyingTerminalGroup, a, List.empty[String])(groups => testCode(groups(0)))
}

@@ -192,8 +190,8 @@ class SimplePeerGroupSpec extends FlatSpec {
b: String
)(
testCode: (
SimplePeerGroup[String, InetSocketAddress, String],
SimplePeerGroup[String, InetSocketAddress, String]
SimplePeerGroup[String, InetMultiAddress, String],
SimplePeerGroup[String, InetMultiAddress, String]
) => Any
): Unit = {

@@ -208,9 +206,9 @@ class SimplePeerGroupSpec extends FlatSpec {
c: String
)(
testCode: (
SimplePeerGroup[String, InetSocketAddress, String],
SimplePeerGroup[String, InetSocketAddress, String],
SimplePeerGroup[String, InetSocketAddress, String]
SimplePeerGroup[String, InetMultiAddress, String],
SimplePeerGroup[String, InetMultiAddress, String],
SimplePeerGroup[String, InetMultiAddress, String]
) => Any
): Unit = {

@@ -219,20 +217,20 @@ class SimplePeerGroupSpec extends FlatSpec {
)
}

type UnderlyingMessage = Either[ControlMessage[String, InetSocketAddress], String]
type UnderlyingMessage = Either[ControlMessage[String, InetMultiAddress], String]

private def withSimplePeerGroups(
underlyingTerminalGroup: SimpleTerminalPeerGroup,
bootstrapAddress: String,
multiCastAddresses: List[String],
addresses: String*
)(
testCode: Seq[SimplePeerGroup[String, InetSocketAddress, String]] => Any
testCode: Seq[SimplePeerGroup[String, InetMultiAddress, String]] => Any
): Unit = {

val bootStrapTerminalGroup = randomTerminalPeerGroup[UnderlyingMessage](underlyingTerminalGroup)
val bootstrap = new SimplePeerGroup(
SimplePeerGroup.Config(bootstrapAddress, List.empty[String], Map.empty[String, InetSocketAddress]),
SimplePeerGroup.Config(bootstrapAddress, List.empty[String], Map.empty[String, InetMultiAddress]),
bootStrapTerminalGroup
)
bootstrap.initialize().runToFuture.futureValue
Oops, something went wrong.

0 comments on commit b2bd25d

Please sign in to comment.
You can’t perform that action at this time.