Skip to content
Permalink
Browse files

code cleanup

  • Loading branch information...
shaileshp0110 committed May 16, 2019
1 parent b564b14 commit b3da2a0898d6979b2a1eee8df71db8ba46326ec7
@@ -101,36 +101,24 @@ class SimplePeerGroup[A, AA, M](

override def sendMessage(message: M): Task[Unit] = {
debug(
s" ++++++SimplePeerGroup sendMessage message from local address $processAddress to remote address $to , $message"
s"message from local address $processAddress to remote address $to , $message"
)
Task.sequence(underlyingChannel.map(_.sendMessage(Right(message)))).map(_ => ())
}

override def in: Observable[M] = {
debug(
s" ++++++IN++++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress"
)

Observable
.fromIterable(underlyingChannel.map {
_.in.collect {
case Right(message) =>
debug(
s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
s"Processing inbound message from remote address $to to local address $processAddress, $message"
)
message
}
})
.merge

// underlyingChannel.foldLeft(Observable.empty)(x => x)
// underlyingChannel.in.collect {
// case Right(message) =>
// debug(
// s" ++++++SimplePeerGroup Processing inbound message from remote address $to to local address $processAddress, $message"
// )
// message
// }
}

override def close(): Task[Unit] =
@@ -87,10 +87,8 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
s"New channel created with id ${nettyChannel.id()} from ${nettyChannel.localAddress()} to $remoteAddress"
)

val x = new Subscribers[M]
activeChannels.getOrElseUpdate(remoteAddress.inetSocketAddress, x)
activeChannels.getOrElseUpdate(remoteAddress.inetSocketAddress, new Subscribers[M])

// activeChannels.getOrElseUpdate(nettyChannel.id, x)
}

override def to: InetMultiAddress = Await.result(promisedRemoteAddress.future, Duration.Inf)
@@ -102,7 +100,6 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
} yield sendResult

override def in: Observable[M] = {

log.debug(
s"Processing Observable inbound message from remote address remote ${nettyChannel.remoteAddress()} " +
s"to local address ${nettyChannel.localAddress()} via channel id ChannelId ${nettyChannel.id()}."
@@ -118,15 +115,11 @@ class UDPPeerGroup[M](val config: Config)(implicit scheduler: Scheduler, codec:
override def channelRead(ctx: ChannelHandlerContext, msg: Any): Unit = {
val datagram = msg.asInstanceOf[DatagramPacket]
val remoteAddress = datagram.sender()
log.debug(
s"Processing ******channelRead message from remote address remote ${ctx.channel().remoteAddress()} " +
s"to local address ${ctx.channel().localAddress()} via channel id ChannelId ${ctx.channel().id()}."
)

if (!promisedRemoteAddress.isCompleted) {
promisedRemoteAddress.complete(Success(new InetMultiAddress(remoteAddress)))
channelSubscribers.notify(this)
}
// if (activeChannels2.contains(remoteAddress)) channelSubscribers.notify(this)

codec.decode(datagram.content().nioBuffer().asReadOnlyBuffer()).map { m =>
messageSubscribersF.foreach { messageSubscribers =>

0 comments on commit b3da2a0

Please sign in to comment.
You can’t perform that action at this time.