Skip to content
Permalink
Browse files

reformatted

  • Loading branch information...
shaileshp0110 committed May 15, 2019
1 parent 86b272d commit 53850cc8334182b894a6de2481ecd95f5dd2fe9f
@@ -19,4 +19,4 @@ case class InetMultiAddress(private[scalanet] val inetSocketAddress: InetSocketA
val state = Seq(inetAddress)
state.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
}
}
}
@@ -96,16 +96,22 @@ class SimplePeerGroup[A, AA, M](
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
debug(s" ++++++SimplePeerGroup sendMessage message from local address $processAddress to remote address $to , $message")
debug(
s" ++++++SimplePeerGroup sendMessage message from local address $processAddress to remote address $to , $message"
)
underlyingChannel.sendMessage(Right(message))
}

override def in: Observable[M] = {
debug(s" ++++++IN++++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress")
debug(
s" ++++++IN++++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress"
)

underlyingChannel.in.collect {
case Right(message) =>
debug(s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message")
debug(
s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
)
message
}
}
@@ -14,13 +14,13 @@ import org.slf4j.LoggerFactory
* There is no enrollment process. Instances are configured with a static table of all known peers.
*/
class SimplestPeerGroup[A, AA, M](
val config: Config[A, AA],
underLyingPeerGroup: PeerGroup[AA, Either[ControlMessage[A, AA], M]]
)(
implicit aCodec: Codec[A],
aaCodec: Codec[AA],
scheduler: Scheduler
) extends PeerGroup[A, M] {
val config: Config[A, AA],
underLyingPeerGroup: PeerGroup[AA, Either[ControlMessage[A, AA], M]]
)(
implicit aCodec: Codec[A],
aaCodec: Codec[AA],
scheduler: Scheduler
) extends PeerGroup[A, M] {

private val log = LoggerFactory.getLogger(getClass)

@@ -48,7 +48,7 @@ class SimplestPeerGroup[A, AA, M](
}

private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]])
extends Channel[A, M] {
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
underlyingChannel.sendMessage(Right(message))
@@ -78,24 +78,28 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

object TCPPeerGroup {
case class Config(
bindAddress: InetSocketAddress,
processAddress: InetMultiAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
bindAddress: InetSocketAddress,
processAddress: InetMultiAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
)

object Config {
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, new InetMultiAddress(bindAddress))
}

private[scalanet] class ServerChannelImpl[M](val nettyChannel: SocketChannel)(implicit codec: Codec[M], scheduler: Scheduler)
extends Channel[InetMultiAddress, M] {
private[scalanet] class ServerChannelImpl[M](val nettyChannel: SocketChannel)(
implicit codec: Codec[M],
scheduler: Scheduler
) extends Channel[InetMultiAddress, M] {

private val log = LoggerFactory.getLogger(getClass)

log.debug(s"Creating server channel from ${nettyChannel.localAddress()} to ${nettyChannel.remoteAddress()} with channel id ${nettyChannel.id}")
log.debug(
s"Creating server channel from ${nettyChannel.localAddress()} to ${nettyChannel.remoteAddress()} with channel id ${nettyChannel.id}"
)

private val messageSubject = ReplaySubject[M]()
//new Subscribers[M](s"Subscribers for ServerChannelImpl@${nettyChannel.id}")
//new Subscribers[M](s"Subscribers for ServerChannelImpl@${nettyChannel.id}")

nettyChannel
.pipeline()
@@ -120,8 +124,10 @@ object TCPPeerGroup {
}
}

private class ClientChannelImpl[M](inetSocketAddress: InetSocketAddress, clientBootstrap: Bootstrap)(implicit codec: Codec[M], scheduler: Scheduler)
extends Channel[InetMultiAddress, M] {
private class ClientChannelImpl[M](inetSocketAddress: InetSocketAddress, clientBootstrap: Bootstrap)(
implicit codec: Codec[M],
scheduler: Scheduler
) extends Channel[InetMultiAddress, M] {

private val log = LoggerFactory.getLogger(getClass)

@@ -144,8 +150,10 @@ object TCPPeerGroup {
.addLast(new ByteArrayEncoder())
.addLast(new ChannelInboundHandlerAdapter() {
override def channelActive(ctx: ChannelHandlerContext): Unit = {
log.debug(s"Creating client channel from ${ctx.channel().localAddress()} " +
s"to ${ctx.channel().remoteAddress()} with channel id ${ctx.channel().id}")
log.debug(
s"Creating client channel from ${ctx.channel().localAddress()} " +
s"to ${ctx.channel().remoteAddress()} with channel id ${ctx.channel().id}"
)
activation.complete(Success(ctx))
}

@@ -184,7 +192,8 @@ object TCPPeerGroup {
}
}

private class MessageNotifier[M](val messageSubject: Subject[M, M])(implicit codec: Codec[M]) extends ChannelInboundHandlerAdapter {
private class MessageNotifier[M](val messageSubject: Subject[M, M])(implicit codec: Codec[M])
extends ChannelInboundHandlerAdapter {

private val log = LoggerFactory.getLogger(getClass)

@@ -34,7 +34,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
private val workerGroup = new NioEventLoopGroup()
private val activeChannels = new ConcurrentHashMap[InetSocketAddress, Subscribers[M]]().asScala

// private val activeChannels = new ConcurrentHashMap[ChannelId, Subscribers[M]]().asScala
// private val activeChannels = new ConcurrentHashMap[ChannelId, Subscribers[M]]().asScala

/**
* 64 kilobytes is the theoretical maximum size of a complete IP datagram
@@ -71,7 +71,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
_ <- toTask(workerGroup.shutdownGracefully())
} yield ()

private [scalanet] class ChannelImpl(
private[scalanet] class ChannelImpl(
val nettyChannel: NioDatagramChannel,
promisedRemoteAddress: Promise[InetMultiAddress]
)(implicit codec: Codec[M])
@@ -90,7 +90,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
val x = new Subscribers[M]
activeChannels.getOrElseUpdate(remoteAddress.inetSocketAddress, x)

// activeChannels.getOrElseUpdate(nettyChannel.id, x)
// activeChannels.getOrElseUpdate(nettyChannel.id, x)
}

override def to: InetMultiAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)
@@ -101,7 +101,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
sendResult <- sendMessage(message, remoteAddress, nettyChannel)
} yield sendResult

override def in: Observable[M] ={
override def in: Observable[M] = {

log.debug(
s"Processing Observable inbound message from remote address remote ${nettyChannel.remoteAddress()} " +
@@ -126,7 +126,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
promisedRemoteAddress.complete(Success(new InetMultiAddress(remoteAddress)))
channelSubscribers.notify(this)
}
// if (activeChannels2.contains(remoteAddress)) channelSubscribers.notify(this)
// if (activeChannels2.contains(remoteAddress)) channelSubscribers.notify(this)

codec.decode(datagram.content().nioBuffer().asReadOnlyBuffer()).map { m =>
messageSubscribersF.foreach { messageSubscribers =>
@@ -146,7 +146,10 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
s"from local address ${processAddress.inetSocketAddress} via channel id ChannelId ${nettyChannel.id()}."
)
val nettyBuffer = Unpooled.wrappedBuffer(codec.encode(message))
toTask(nettyChannel.writeAndFlush(new DatagramPacket(nettyBuffer, to.inetSocketAddress, processAddress.inetSocketAddress)))
toTask(
nettyChannel
.writeAndFlush(new DatagramPacket(nettyBuffer, to.inetSocketAddress, processAddress.inetSocketAddress))
)
}

}
@@ -32,10 +32,12 @@ class SimplePeerGroupSpec extends FlatSpec {
val alicesMessage = "hi bob, from alice"
val bobsMessage = "hi alice, from bob"

val bobReceived: Future[String] = bob.server()
val bobReceived: Future[String] = bob
.server()
.flatMap(channel => channel.in)
.filter(msg => msg == alicesMessage)
.headL.runToFuture
.headL
.runToFuture

val aliceClient = alice.client(bob.processAddress).evaluated
val aliceReceived = aliceClient.in.filter(msg => msg == bobsMessage).headL.runToFuture
@@ -167,7 +169,7 @@ class SimplePeerGroupSpec extends FlatSpec {
// }

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(UdpTerminalPeerGroup/*, UdpTerminalPeerGroup*/)
val terminalPeerGroups = List(UdpTerminalPeerGroup /*, UdpTerminalPeerGroup*/ )
}

private def withASimplePeerGroup(
@@ -27,37 +27,48 @@ class SimplestPeerGroupSpec extends FlatSpec {
val alicesMessage = "hi bob, from alice"
val bobsMessage = "hi alice, from bob"

val bobReceived: Future[String] = bob.server()
.flatMap { channel => channel.in }
.filter { msg => msg == alicesMessage }
.headL.runToFuture
val bobReceived: Future[String] = bob
.server()
.flatMap { channel =>
channel.in
}
.filter { msg =>
msg == alicesMessage
}
.headL
.runToFuture

bob.server().foreach(channel => channel.sendMessage(bobsMessage).evaluated)

val aliceClient = alice.client(bob.processAddress).evaluated
val aliceReceived = aliceClient.in.filter { msg => msg == bobsMessage }.headL.runToFuture
val aliceReceived = aliceClient.in
.filter { msg =>
msg == bobsMessage
}
.headL
.runToFuture
aliceClient.sendMessage(alicesMessage).evaluated

aliceReceived.futureValue shouldBe bobsMessage
bobReceived.futureValue shouldBe alicesMessage
}

private def withTwoSimplestPeerGroups(
a: String,
b: String)(
testCode: (
SimplestPeerGroup[String, InetMultiAddress, String],
SimplestPeerGroup[String, InetMultiAddress, String]
) => Any
): Unit = {
private def withTwoSimplestPeerGroups(a: String, b: String)(
testCode: (
SimplestPeerGroup[String, InetMultiAddress, String],
SimplestPeerGroup[String, InetMultiAddress, String]
) => Any
): Unit = {

val underlying1 = randomTCPPeerGroup[Either[SimplestPeerGroup.ControlMessage[String, InetMultiAddress], String]]
val underlying2 = randomTCPPeerGroup[Either[SimplestPeerGroup.ControlMessage[String, InetMultiAddress], String]]

val routingTable = Map(a -> underlying1.processAddress, b -> underlying2.processAddress)

val simplest1 = new SimplestPeerGroup[String, InetMultiAddress, String](SimplestPeerGroup.Config(a, routingTable), underlying1)
val simplest2 = new SimplestPeerGroup[String, InetMultiAddress, String](SimplestPeerGroup.Config(b, routingTable), underlying2)
val simplest1 =
new SimplestPeerGroup[String, InetMultiAddress, String](SimplestPeerGroup.Config(a, routingTable), underlying1)
val simplest2 =
new SimplestPeerGroup[String, InetMultiAddress, String](SimplestPeerGroup.Config(b, routingTable), underlying2)

simplest1.initialize().evaluated
simplest2.initialize().evaluated

0 comments on commit 53850cc

Please sign in to comment.
You can’t perform that action at this time.