Skip to content
Permalink
Browse files

WIP looking at failing test under different monix subjects.

  • Loading branch information...
jtownson committed May 14, 2019
1 parent 8d5595e commit aa6d2b0a49e842097056a3457835348c0c89cd90
@@ -40,7 +40,10 @@ class SimplePeerGroup[A, AA, M](
override def server(): Observable[Channel[A, M]] = {
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
new ChannelImpl(reverseLookup(underlyingChannel.to), underlyingChannel)
val a = reverseLookup(underlyingChannel.to)
debug(s"Received new server channel from $a " +
s"with underlying id ${underlyingChannel.asInstanceOf[TCPPeerGroup.ServerChannelImpl[Either[ControlMessage[A, AA], M]]].nettyChannel.id()}")
new ChannelImpl(a, underlyingChannel)
}
}

@@ -68,7 +71,7 @@ class SimplePeerGroup[A, AA, M](
case Left(e: Enrolled[A, AA]) =>
routingTable.clear()
routingTable ++= e.routingTable
log.debug(
debug(
s"Peer address '$processAddress' enrolled into group and installed new routing table:\n${e.routingTable}"
)
}
@@ -99,7 +102,7 @@ class SimplePeerGroup[A, AA, M](
override def in: Observable[M] = {
underlyingChannel.in.collect {
case Right(message) =>
log.debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
}
@@ -126,12 +129,16 @@ class SimplePeerGroup[A, AA, M](
}
}

private def notifyPeer(address: A, underlyingAddress: AA) = {
private def notifyPeer(address: A, underlyingAddress: AA): Unit = {
val enrolledReply = Enrolled(address, underlyingAddress, routingTable.toMap, multiCastTable.toMap)
underLyingPeerGroup
.client(underlyingAddress)
.foreach(channel => channel.sendMessage(Left(enrolledReply)).runToFuture)
}

private def debug(logMsg: String): Unit = {
log.debug(s"@$processAddress $logMsg")
}
}

object SimplePeerGroup {
@@ -1,19 +1,21 @@
package io.iohk.scalanet.peergroup

import io.iohk.decco._
import io.iohk.scalanet.peergroup.SimplestPeerGroup.Config
import io.iohk.scalanet.peergroup.SimplestPeerGroup.{Config, ControlMessage}
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import org.slf4j.LoggerFactory

/**
* Trivial example of a higher-level peer group.
* There is no enrollment process. Instances must be configured with a static table of all known peers.
* Demonstrates the mapping of addresses and messages to an underlying transport
* where control messages may be sent in addition to those from the user.
* There is no enrollment process. Instances are configured with a static table of all known peers.
*/
class SimplestPeerGroup[A, AA, M](
val config: Config[A, AA],
underLyingPeerGroup: PeerGroup[AA, M]
underLyingPeerGroup: PeerGroup[AA, Either[ControlMessage[A, AA], M]]
)(
implicit aCodec: Codec[A],
aaCodec: Codec[AA],
@@ -45,16 +47,16 @@ class SimplestPeerGroup[A, AA, M](
Task.unit
}

private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, M])
private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]])
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
underlyingChannel.sendMessage(message)
underlyingChannel.sendMessage(Right(message))
}

override def in: Observable[M] = {
underlyingChannel.in.collect {
case message =>
case Right(message) =>
log.debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
@@ -66,5 +68,11 @@ class SimplestPeerGroup[A, AA, M](
}

object SimplestPeerGroup {

sealed trait ControlMessage[A, AA]

// Not used. Included because codec derivation does not work for empty sealed traits.
case class CM1[A, AA]() extends ControlMessage[A, AA]

case class Config[A, AA](processAddress: A, knownPeers: Map[A, AA])
}
@@ -1,13 +1,19 @@
package io.iohk.scalanet.peergroup

import monix.execution.Scheduler
import monix.reactive.subjects.{ReplaySubject}

private[scalanet] class Subscribers[T](id: String = "")(implicit scheduler: Scheduler) {

import monix.reactive.subjects.ReplaySubject
val messageStream = ReplaySubject[T]()

// import monix.reactive.subjects.PublishSubject
// val messageStream = PublishSubject[T]()

def notify(t: T): Unit = {
messageStream.onNext(t)
}

def onComplete(): Unit =
messageStream.onComplete()
}
@@ -4,7 +4,7 @@ import java.net.{InetAddress, InetSocketAddress}

import io.iohk.decco.{Codec, DecodeFailure}
import io.iohk.scalanet.peergroup.PeerGroup.TerminalPeerGroup
import io.iohk.scalanet.peergroup.TCPPeerGroup.Config
import io.iohk.scalanet.peergroup.TCPPeerGroup._
import io.netty.bootstrap.{Bootstrap, ServerBootstrap}
import io.netty.buffer.{ByteBuf, Unpooled}
import io.netty.channel._
@@ -43,7 +43,7 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
.channel(classOf[NioServerSocketChannel])
.childHandler(new ChannelInitializer[SocketChannel]() {
override def initChannel(ch: SocketChannel): Unit = {
val newChannel = new ServerChannelImpl(ch)
val newChannel = new ServerChannelImpl[M](ch)
channelSubscribers.notify(newChannel)
log.debug(s"$processAddress received inbound from ${ch.remoteAddress()}.")
}
@@ -60,26 +60,67 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def processAddress: InetMultiAddress = config.processAddress

override def client(to: InetMultiAddress): Task[Channel[InetMultiAddress, M]] = {
new ClientChannelImpl(to.inetSocketAddress).initialize
new ClientChannelImpl[M](to.inetSocketAddress, clientBootstrap).initialize
}

override def server(): Observable[Channel[InetMultiAddress, M]] = channelSubscribers.messageStream

override def shutdown(): Task[Unit] =
override def shutdown(): Task[Unit] = {
channelSubscribers.onComplete()
for {

_ <- toTask(serverBind.channel().close())
_ <- toTask(workerGroup.shutdownGracefully())
} yield ()
}
}

private def toTask(f: util.concurrent.Future[_]): Task[Unit] = {
val promisedCompletion = Promise[Unit]()
f.addListener((_: util.concurrent.Future[_]) => promisedCompletion.complete(Success(())))
Task.fromFuture(promisedCompletion.future)
object TCPPeerGroup {
case class Config(
bindAddress: InetSocketAddress,
processAddress: InetMultiAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
)

object Config {
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, new InetMultiAddress(bindAddress))
}

private[scalanet] class ServerChannelImpl[M](val nettyChannel: SocketChannel)(implicit codec: Codec[M], scheduler: Scheduler)
extends Channel[InetMultiAddress, M] {

private val log = LoggerFactory.getLogger(getClass)

log.debug(s"Creating server channel from ${nettyChannel.localAddress()} to ${nettyChannel.remoteAddress()} with channel id ${nettyChannel.id}")

private val messageSubscribers = new Subscribers[M](s"Subscribers for ServerChannelImpl@${nettyChannel.id}")

nettyChannel
.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast(new MessageNotifier(messageSubscribers))

override val to: InetMultiAddress = InetMultiAddress(nettyChannel.remoteAddress())

override def sendMessage(message: M): Task[Unit] = {
toTask({
nettyChannel
.writeAndFlush(Unpooled.wrappedBuffer(codec.encode(message)))
})
}

override def in: Observable[M] = messageSubscribers.messageStream

override def close(): Task[Unit] = {
messageSubscribers.onComplete()
toTask(nettyChannel.close())
}
}

private class ClientChannelImpl(inetSocketAddress: InetSocketAddress)(implicit codec: Codec[M])
extends Channel[InetMultiAddress, M] {
private class ClientChannelImpl[M](inetSocketAddress: InetSocketAddress, clientBootstrap: Bootstrap)(implicit codec: Codec[M], scheduler: Scheduler)
extends Channel[InetMultiAddress, M] {

private val log = LoggerFactory.getLogger(getClass)

val to: InetMultiAddress = InetMultiAddress(inetSocketAddress)

@@ -100,19 +141,20 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
.addLast(new ByteArrayEncoder())
.addLast(new ChannelInboundHandlerAdapter() {
override def channelActive(ctx: ChannelHandlerContext): Unit = {
log.debug(s"Creating client channel from ${ctx.channel().localAddress()} " +
s"to ${ctx.channel().remoteAddress()} with channel id ${ctx.channel().id}")
activation.complete(Success(ctx))
}

override def channelInactive(ctx: ChannelHandlerContext): Unit = {
deactivation.complete(Success(()))
}
})
.addLast(new MessageNotifier(subscribers))

.addLast(new MessageNotifier[M](subscribers))
}
})

def initialize: Task[ClientChannelImpl] = toTask(bootstrap.connect(inetSocketAddress)).map(_ => this)
def initialize: Task[ClientChannelImpl[M]] = toTask(bootstrap.connect(inetSocketAddress)).map(_ => this)

override def sendMessage(message: M): Task[Unit] = {

@@ -133,61 +175,35 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def in: Observable[M] = subscribers.messageStream

override def close(): Task[Unit] = {
subscribers.onComplete()
activationF.foreach(ctx => ctx.close())
Task.fromFuture(deactivationF)
}

}

private class MessageNotifier(val messageSubscribers: Subscribers[M]) extends ChannelInboundHandlerAdapter {
private class MessageNotifier[M](val messageSubscribers: Subscribers[M])(implicit codec: Codec[M]) extends ChannelInboundHandlerAdapter {

private val log = LoggerFactory.getLogger(getClass)

override def channelInactive(channelHandlerContext: ChannelHandlerContext): Unit =
messageSubscribers.onComplete()

override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
val messageE: Either[DecodeFailure, M] = codec.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer())
log.debug(
s"Processing inbound message from remote address ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()}, ${messageE.getOrElse("decode failed")}"
)

messageE.foreach(messageSubscribers.notify)
}
}

private class ServerChannelImpl(val nettyChannel: SocketChannel)(implicit codec: Codec[M])
extends Channel[InetMultiAddress, M] {

private val messageSubscribers = new Subscribers[M]

nettyChannel
.pipeline()
.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Int.MaxValue, 0, 4, 0, 4))
.addLast("frameEncoder", new LengthFieldPrepender(4))
.addLast(new MessageNotifier(messageSubscribers))

override val to: InetMultiAddress = InetMultiAddress(nettyChannel.remoteAddress())

override def sendMessage(message: M): Task[Unit] = {
toTask({
nettyChannel
.writeAndFlush(Unpooled.wrappedBuffer(codec.encode(message)))
})
}

override def in: Observable[M] = messageSubscribers.messageStream

override def close(): Task[Unit] = {
toTask(nettyChannel.close())
messageE.foreach { message =>
messageSubscribers.notify(message)
}
}

}
}

object TCPPeerGroup {
case class Config(
bindAddress: InetSocketAddress,
processAddress: InetMultiAddress,
remoteHostConfig: Map[InetAddress, Int] = Map.empty[InetAddress, Int]
)
object Config {
def apply(bindAddress: InetSocketAddress): Config = Config(bindAddress, new InetMultiAddress(bindAddress))
private def toTask(f: util.concurrent.Future[_]): Task[Unit] = {
val promisedCompletion = Promise[Unit]()
f.addListener((_: util.concurrent.Future[_]) => promisedCompletion.complete(Success(())))
Task.fromFuture(promisedCompletion.future)
}

}
@@ -3,7 +3,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%d{HH:mm:ss.SSS} %0logger %-5level %msg%n</pattern>
<pattern>%t %0logger %-5level %msg%n</pattern>
</encoder>
</appender>

Oops, something went wrong.

0 comments on commit aa6d2b0

Please sign in to comment.
You can’t perform that action at this time.