Skip to content

Commit

Permalink
Connect is able to force a new connection (#849)
Browse files Browse the repository at this point in the history
  • Loading branch information
diegomrsantos committed Jan 25, 2023
1 parent ca19f8f commit 4ace70d
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 23 deletions.
3 changes: 2 additions & 1 deletion libp2p/dial.nim
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ method connect*(
self: Dial,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false) {.async, base.} =
forceDial = false,
reuseConnection = true) {.async, base.} =
## connect remote peer without negotiating
## a protocol
##
Expand Down
47 changes: 27 additions & 20 deletions libp2p/dialer.nim
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,28 @@ proc dialAndUpgrade(
if not isNil(result):
return result

proc tryReusingConnection(self: Dialer, peerId: PeerId): Future[Opt[Connection]] {.async.} =
var conn = self.connManager.selectConn(peerId)
if conn == nil:
return Opt.none(Connection)

if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager", conn
await conn.close()
raise newException(DialFailedError, "Zombie connection encountered")

trace "Reusing existing connection", conn, direction = $conn.dir
return Opt.some(conn)

proc internalConnect(
self: Dialer,
peerId: Opt[PeerId],
addrs: seq[MultiAddress],
forceDial: bool):
forceDial: bool,
reuseConnection = true):
Future[Connection] {.async.} =
if Opt.some(self.localPeerId) == peerId:
raise newException(CatchableError, "can't dial self!")
Expand All @@ -161,24 +178,13 @@ proc internalConnect(
try:
await lock.acquire()

# Check if we have a connection already and try to reuse it
var conn =
if peerId.isSome: self.connManager.selectConn(peerId.get())
else: nil
if conn != nil:
if conn.atEof or conn.closed:
# This connection should already have been removed from the connection
# manager - it's essentially a bug that we end up here - we'll fail
# for now, hoping that this will clean themselves up later...
warn "dead connection in connection manager", conn
await conn.close()
raise newException(DialFailedError, "Zombie connection encountered")

trace "Reusing existing connection", conn, direction = $conn.dir
return conn
if peerId.isSome and reuseConnection:
let connOpt = await self.tryReusingConnection(peerId.get())
if connOpt.isSome:
return connOpt.get()

let slot = self.connManager.getOutgoingSlot(forceDial)
conn =
let conn =
try:
await self.dialAndUpgrade(peerId, addrs)
except CatchableError as exc:
Expand Down Expand Up @@ -207,15 +213,16 @@ method connect*(
self: Dialer,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false) {.async.} =
forceDial = false,
reuseConnection = true) {.async.} =
## connect remote peer without negotiating
## a protocol
##

if self.connManager.connCount(peerId) > 0:
if self.connManager.connCount(peerId) > 0 and reuseConnection:
return

discard await self.internalConnect(Opt.some(peerId), addrs, forceDial)
discard await self.internalConnect(Opt.some(peerId), addrs, forceDial, reuseConnection)

method connect*(
self: Dialer,
Expand Down
5 changes: 3 additions & 2 deletions libp2p/switch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,11 @@ method connect*(
s: Switch,
peerId: PeerId,
addrs: seq[MultiAddress],
forceDial = false): Future[void] {.public.} =
forceDial = false,
reuseConnection = true): Future[void] {.public.} =
## Connects to a peer without opening a stream to it

s.dialer.connect(peerId, addrs, forceDial)
s.dialer.connect(peerId, addrs, forceDial, reuseConnection)

method connect*(
s: Switch,
Expand Down
38 changes: 38 additions & 0 deletions tests/testdialer.nim
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Nim-LibP2P
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

import std/options
import chronos
import unittest2
import ../libp2p/[builders,
switch]
import ./helpers

suite "Dialer":
teardown:
checkTrackers()

asyncTest "Connect forces a new connection":

let
src = newStandardSwitch()
dst = newStandardSwitch()

await dst.start()

await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
check src.connManager.connCount(dst.peerInfo.peerId) == 1

await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs)
check src.connManager.connCount(dst.peerInfo.peerId) == 1

await src.connect(dst.peerInfo.peerId, dst.peerInfo.addrs, true, false)
check src.connManager.connCount(dst.peerInfo.peerId) == 2

await allFutures(src.stop(), dst.stop())

0 comments on commit 4ace70d

Please sign in to comment.