Skip to content
Permalink
Browse files

Adding SimplestPeerGroup. This is a WIP effort to fix a concurrency

problem in SimplePeerGroup.
  • Loading branch information...
jtownson committed May 13, 2019
1 parent b2bd25d commit 8d5595e2aea0ac2d0ef1327285c6fef3e9eb3a56
@@ -58,11 +58,13 @@
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>

This file was deleted.

Oops, something went wrong.
@@ -34,20 +34,14 @@ class SimplePeerGroup[A, AA, M](

override def client(to: A): Task[Channel[A, M]] =
underLyingPeerGroup.client(routingTable(to)).map { underlyingChannel =>
println("***underlyingChannel*******" + underlyingChannel)
new ChannelImpl(to, underlyingChannel)
}

override def server(): Observable[Channel[A, M]] = {
underLyingPeerGroup.server().map { underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]] =>
// val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
//
//new ChannelImpl(reverseLookup(underlyingChannel.to), underlyingChannel)
println("***processAddress*******" + processAddress)
new ChannelImpl(processAddress, underlyingChannel)

val reverseLookup: mutable.Map[AA, A] = routingTable.map(_.swap)
new ChannelImpl(reverseLookup(underlyingChannel.to), underlyingChannel)
}

}

override def shutdown(): Task[Unit] = underLyingPeerGroup.shutdown()
@@ -98,12 +92,14 @@ class SimplePeerGroup[A, AA, M](
private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, Either[ControlMessage[A, AA], M]])
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = underlyingChannel.sendMessage(Right(message))
override def sendMessage(message: M): Task[Unit] = {
underlyingChannel.sendMessage(Right(message))
}

override def in: Observable[M] = {
underlyingChannel.in.collect {
case Right(message) =>
println(underlyingChannel + "*******In Underline channel**********" + message)
log.debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
}
@@ -0,0 +1,70 @@
package io.iohk.scalanet.peergroup

import io.iohk.decco._
import io.iohk.scalanet.peergroup.SimplestPeerGroup.Config
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.Observable
import org.slf4j.LoggerFactory

/**
* Trivial example of a higher-level peer group.
* There is no enrollment process. Instances must be configured with a static table of all known peers.
*/
class SimplestPeerGroup[A, AA, M](
val config: Config[A, AA],
underLyingPeerGroup: PeerGroup[AA, M]
)(
implicit aCodec: Codec[A],
aaCodec: Codec[AA],
scheduler: Scheduler
) extends PeerGroup[A, M] {

private val log = LoggerFactory.getLogger(getClass)

private implicit val apc: PartialCodec[A] = aCodec.partialCodec
private implicit val aapc: PartialCodec[AA] = aaCodec.partialCodec

override def processAddress: A = config.processAddress

override def client(to: A): Task[Channel[A, M]] =
underLyingPeerGroup.client(config.knownPeers(to)).map { underlyingChannel =>
new ChannelImpl(to, underlyingChannel)
}

override def server(): Observable[Channel[A, M]] = {
underLyingPeerGroup.server().map { underlyingChannel =>
val reverseLookup: Map[AA, A] = config.knownPeers.map(_.swap)
new ChannelImpl(reverseLookup(underlyingChannel.to), underlyingChannel)
}
}

override def shutdown(): Task[Unit] = underLyingPeerGroup.shutdown()

override def initialize(): Task[Unit] = {
Task.unit
}

private class ChannelImpl(val to: A, underlyingChannel: Channel[AA, M])
extends Channel[A, M] {

override def sendMessage(message: M): Task[Unit] = {
underlyingChannel.sendMessage(message)
}

override def in: Observable[M] = {
underlyingChannel.in.collect {
case message =>
log.debug(s"Processing inbound message from remote address $to to local address $processAddress, $message")
message
}
}

override def close(): Task[Unit] =
underlyingChannel.close()
}
}

object SimplestPeerGroup {
case class Config[A, AA](processAddress: A, knownPeers: Map[A, AA])
}
@@ -1,24 +1,13 @@
package io.iohk.scalanet.peergroup

import java.util.concurrent.CopyOnWriteArraySet
import monix.execution.Scheduler
import monix.reactive.subjects.{ReplaySubject}

import monix.reactive.{Observable, OverflowStrategy}
import monix.reactive.observers.Subscriber
private[scalanet] class Subscribers[T](id: String = "")(implicit scheduler: Scheduler) {

import scala.collection.mutable
import scala.collection.JavaConverters._

private[scalanet] class Subscribers[T](id: String = "") {
val subscriberSet: mutable.Set[Subscriber.Sync[T]] =
new CopyOnWriteArraySet[Subscriber.Sync[T]]().asScala

val messageStream: Observable[T] =
Observable.create(overflowStrategy = OverflowStrategy.Unbounded)((subscriber: Subscriber.Sync[T]) => {
subscriberSet.add(subscriber)
() => subscriberSet.remove(subscriber)
})
val messageStream = ReplaySubject[T]()

def notify(t: T): Unit = {
subscriberSet.foreach(_.onNext(t))
messageStream.onNext(t)
}
}
@@ -2,7 +2,7 @@ package io.iohk.scalanet.peergroup

import java.net.{InetAddress, InetSocketAddress}

import io.iohk.decco.Codec
import io.iohk.decco.{Codec, DecodeFailure}
import io.iohk.scalanet.peergroup.PeerGroup.TerminalPeerGroup
import io.iohk.scalanet.peergroup.TCPPeerGroup.Config
import io.netty.bootstrap.{Bootstrap, ServerBootstrap}
@@ -45,8 +45,7 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def initChannel(ch: SocketChannel): Unit = {
val newChannel = new ServerChannelImpl(ch)
channelSubscribers.notify(newChannel)
log.debug(s"$processAddress received inbound from ${ch.remoteAddress()}. " +
s"Notified ${channelSubscribers.subscriberSet.size} subscribers.")
log.debug(s"$processAddress received inbound from ${ch.remoteAddress()}.")
}
})
.option[Integer](ChannelOption.SO_BACKLOG, 128)
@@ -142,11 +141,13 @@ class TCPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:

private class MessageNotifier(val messageSubscribers: Subscribers[M]) extends ChannelInboundHandlerAdapter {
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
val messageE: Either[DecodeFailure, M] = codec.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer())
log.debug(
s"Processing inbound message from remote address ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()}"
s"to local address ${ctx.channel().localAddress()}, ${messageE.getOrElse("decode failed")}"
)
codec.decode(msg.asInstanceOf[ByteBuf].nioBuffer().asReadOnlyBuffer()).foreach(messageSubscribers.notify)

messageE.foreach(messageSubscribers.notify)
}
}

@@ -115,8 +115,7 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
messageSubscribersF.foreach { messageSubscribers =>
log.debug(
s"Processing inbound message from remote address remote ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}. " +
s"Notifying ${messageSubscribers.subscriberSet.size} subscribers."
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}."
)
messageSubscribers.notify(m)
}
@@ -3,7 +3,7 @@

<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%-5level - %msg%n</pattern>
<pattern>%d{HH:mm:ss.SSS} %0logger %-5level %msg%n</pattern>
</encoder>
</appender>

This file was deleted.

Oops, something went wrong.
Oops, something went wrong.

0 comments on commit 8d5595e

Please sign in to comment.
You can’t perform that action at this time.