Skip to content

Commit

Permalink
PM-3129: Add LocalConnectionManager on top of the remote one. (#50)
Browse files Browse the repository at this point in the history
  • Loading branch information
aakoshh committed Jun 11, 2021
1 parent 415b8ee commit 21de7f1
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.iohk.metronome.networking

import cats.implicits._
import cats.effect.{Concurrent, Timer, Resource, ContextShift}
import java.net.InetSocketAddress
import monix.eval.{TaskLift, TaskLike}
import monix.tail.Iterant
import scodec.Codec

trait LocalConnectionManager[F[_], K, M] {
def isConnected: F[Boolean]
def incomingMessages: Iterant[F, M]
def sendMessage(
message: M
): F[Either[ConnectionHandler.ConnectionAlreadyClosedException[K], Unit]]
}

/** Connect to a single local process and keep the connection alive. */
object LocalConnectionManager {

def apply[
F[_]: Concurrent: TaskLift: TaskLike: Timer: ContextShift,
K: Codec,
M: Codec
](
encryptedConnectionsProvider: EncryptedConnectionProvider[F, K, M],
targetKey: K,
targetAddress: InetSocketAddress,
retryConfig: RemoteConnectionManager.RetryConfig
)(implicit
tracers: NetworkTracers[F, K, M]
): Resource[F, LocalConnectionManager[F, K, M]] = {
for {
remoteConnectionManager <- RemoteConnectionManager[F, K, M](
encryptedConnectionsProvider,
RemoteConnectionManager.ClusterConfig[K](
Set(targetKey -> targetAddress)
),
retryConfig
)
localConnectionManager = new LocalConnectionManager[F, K, M] {
override def isConnected =
remoteConnectionManager.getAcquiredConnections.map(
_.contains(targetKey)
)

override def incomingMessages =
remoteConnectionManager.incomingMessages.map {
case ConnectionHandler.MessageReceived(_, m) => m
}

override def sendMessage(message: M) =
remoteConnectionManager.sendMessage(targetKey, message)
}
} yield localConnectionManager
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -297,15 +297,14 @@ object RemoteConnectionManager {
* @param retryConfig retry configuration for outgoing connections (incoming connections are not retried)
*/
def apply[
F[_]: Concurrent: TaskLift: TaskLike: Timer,
F[_]: Concurrent: TaskLift: TaskLike: Timer: ContextShift,
K: Codec,
M: Codec
](
encryptedConnectionsProvider: EncryptedConnectionProvider[F, K, M],
clusterConfig: ClusterConfig[K],
retryConfig: RetryConfig
)(implicit
cs: ContextShift[F],
tracers: NetworkTracers[F, K, M]
): Resource[F, RemoteConnectionManager[F, K, M]] = {
for {
Expand Down

0 comments on commit 21de7f1

Please sign in to comment.