Skip to content
Permalink
Browse files

SimplePeerGroup Testcases now working with TCPPeerGroup underlying.

  • Loading branch information...
jtownson committed May 14, 2019
1 parent 09ec551 commit 3777cd5a57c5b36741af14e7781b0e02e2052322
@@ -41,8 +41,7 @@ class SimplePeerGroup[A, AA, M](
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
val a = reverseLookup(underlyingChannel.to)
debug(s"Received new server channel from $a " +
s"with underlying id ${underlyingChannel.asInstanceOf[TCPPeerGroup.ServerChannelImpl[Either[ControlMessage[A, AA], M]]].nettyChannel.id()}")
debug(s"Received new server channel from $a")
new ChannelImpl(a, underlyingChannel)
}
}
@@ -9,8 +9,11 @@ import org.slf4j.LoggerFactory

/**
* Trivial example of a higher-level peer group.
* Demonstrates the mapping of addresses and messages to an underlying transport
* where control messages may be sent in addition to those from the user.
* Demonstrates
* 1. the mapping of addresses to an underlying address scheme.
* 2. the mapping of channel and message notifications from an underlying peer group.
* 3. The use of Either to support both user and internal/control message protocols on
* the same underlying peer group.
* There is no enrollment process. Instances are configured with a static table of all known peers.
*/
class SimplestPeerGroup[A, AA, M](

This file was deleted.

Oops, something went wrong.
@@ -16,6 +16,7 @@ import io.netty.util
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import monix.reactive.subjects.{PublishSubject, ReplaySubject, Subject}
import org.slf4j.LoggerFactory

import scala.collection.JavaConverters._
@@ -28,12 +29,11 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

private val log = LoggerFactory.getLogger(getClass)

private val channelSubscribers =
new Subscribers[Channel[InetMultiAddress, M]](s"Channel Subscribers for UDPPeerGroup@'$processAddress'")
private val channelSubject = PublishSubject[Channel[InetMultiAddress, M]]()

private val workerGroup = new NioEventLoopGroup()

private val activeChannels = new ConcurrentHashMap[ChannelId, Subscribers[M]]().asScala
private val activeChannels = new ConcurrentHashMap[ChannelId, Subject[M, M]]().asScala

/**
* 64 kilobytes is the theoretical maximum size of a complete IP datagram
@@ -62,13 +62,15 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
ct.map(nettyChannel => new ChannelImpl(nettyChannel, Promise().complete(Success(to))))
}

override def server(): Observable[Channel[InetMultiAddress, M]] = channelSubscribers.messageStream
override def server(): Observable[Channel[InetMultiAddress, M]] = channelSubject

override def shutdown(): Task[Unit] =
override def shutdown(): Task[Unit] = {
channelSubject.onComplete()
for {
_ <- toTask(serverBind.channel().close())
_ <- toTask(workerGroup.shutdownGracefully())
} yield ()
}

private class ChannelImpl(
val nettyChannel: NioDatagramChannel,
@@ -79,13 +81,13 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

nettyChannel.pipeline().addLast(this)

private val messageSubscribersF: Future[Subscribers[M]] = for {
private val messageSubjectF: Future[Subject[M, M]] = for {
remoteAddress <- promisedRemoteAddress.future
} yield {
log.debug(
s"New channel created with id ${nettyChannel.id()} from ${nettyChannel.localAddress()} to $remoteAddress"
)
activeChannels.getOrElseUpdate(nettyChannel.id, new Subscribers[M])
activeChannels.getOrElseUpdate(nettyChannel.id, ReplaySubject[M]())
}

override def to: InetMultiAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)
@@ -96,7 +98,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
sendResult <- sendMessage(message, remoteAddress, nettyChannel)
} yield sendResult

override def in: Observable[M] = Await.result(messageSubscribersF, Duration.Inf).messageStream
override def in: Observable[M] = Await.result(messageSubjectF, Duration.Inf)

override def close(): Task[Unit] = {
activeChannels.remove(nettyChannel.id)
@@ -108,16 +110,16 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
val remoteAddress = datagram.sender()
if (!promisedRemoteAddress.isCompleted) {
promisedRemoteAddress.complete(Success(new InetMultiAddress(remoteAddress)))
channelSubscribers.notify(this)
channelSubject.onNext(this)
}

codec.decode(datagram.content().nioBuffer().asReadOnlyBuffer()).map { m =>
messageSubscribersF.foreach { messageSubscribers =>
messageSubjectF.foreach { messageSubscribers =>
log.debug(
s"Processing inbound message from remote address remote ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}."
)
messageSubscribers.notify(m)
messageSubscribers.onNext(m)
}
}
}

This file was deleted.

Oops, something went wrong.
@@ -166,7 +166,7 @@ class SimplePeerGroupSpec extends FlatSpec {
// }

trait SimpleTerminalPeerGroups {
val terminalPeerGroups = List(TcpTerminalPeerGroup/*, UdpTerminalPeerGroup*/)
val terminalPeerGroups = List(TcpTerminalPeerGroup, UdpTerminalPeerGroup)
}

private def withASimplePeerGroup(

0 comments on commit 3777cd5

Please sign in to comment.
You can’t perform that action at this time.