Skip to content

Commit

Permalink
make sure all streams are tracked (#422)
Browse files Browse the repository at this point in the history
* make sure all streams are tracked

* revert unnecesary change
  • Loading branch information
dryajov committed Nov 5, 2020
1 parent 6040cb4 commit 3956f3f
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 79 deletions.
2 changes: 2 additions & 0 deletions libp2p/muxers/mplex/lpchannel.nim
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ const
MaxWrites = 1024 ##\
## Maximum number of in-flight writes - after this, we disconnect the peer

LPChannelTrackerName* = "LPChannel"

type
LPChannel* = ref object of BufferStream
id*: uint64 # channel id
Expand Down
3 changes: 3 additions & 0 deletions libp2p/protocols/secure/secure.nim
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ export protocol
logScope:
topics = "secure"

const
SecureConnTrackerName* = "SecureConn"

type
Secure* = ref object of LPProtocol # base type for secure managers

Expand Down
33 changes: 1 addition & 32 deletions libp2p/stream/bufferstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,7 @@ logScope:
topics = "bufferstream"

const
BufferStreamTrackerName* = "libp2p.bufferstream"

type
BufferStreamTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64

proc setupBufferStreamTracker(): BufferStreamTracker {.gcsafe.}

proc getBufferStreamTracker(): BufferStreamTracker {.gcsafe.} =
result = cast[BufferStreamTracker](getTracker(BufferStreamTrackerName))
if isNil(result):
result = setupBufferStreamTracker()

proc dumpTracking(): string {.gcsafe.} =
var tracker = getBufferStreamTracker()
result = "Opened buffers: " & $tracker.opened & "\n" &
"Closed buffers: " & $tracker.closed

proc leakTransport(): bool {.gcsafe.} =
var tracker = getBufferStreamTracker()
result = (tracker.opened != tracker.closed)

proc setupBufferStreamTracker(): BufferStreamTracker =
result = new BufferStreamTracker
result.opened = 0
result.closed = 0
result.dump = dumpTracking
result.isLeaked = leakTransport
addTracker(BufferStreamTrackerName, result)
BufferStreamTrackerName* = "BufferStream"

type
BufferStream* = ref object of Connection
Expand Down Expand Up @@ -79,7 +50,6 @@ method initStream*(s: BufferStream) =
s.readQueue = newAsyncQueue[seq[byte]](1)

trace "BufferStream created", s
inc getBufferStreamTracker().opened

proc newBufferStream*(timeout: Duration = DefaultConnectionTimeout): BufferStream =
new result
Expand Down Expand Up @@ -169,7 +139,6 @@ method closeImpl*(s: BufferStream): Future[void] =
if not s.pushedEof: # Potentially wake up reader
asyncSpawn s.pushEof()

inc getBufferStreamTracker().closed
trace "Closed BufferStream", s

procCall Connection(s).closeImpl() # noraises, nocancels
1 change: 1 addition & 0 deletions libp2p/stream/chronosstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ logScope:

const
DefaultChronosStreamTimeout = 10.minutes
ChronosStreamTrackerName* = "ChronosStream"

type
ChronosStream* = ref object of Connection
Expand Down
32 changes: 1 addition & 31 deletions libp2p/stream/connection.nim
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ logScope:
topics = "connection"

const
ConnectionTrackerName* = "libp2p.connection"
ConnectionTrackerName* = "Connection"
DefaultConnectionTimeout* = 5.minutes

type
Expand All @@ -33,35 +33,8 @@ type
peerInfo*: PeerInfo
observedAddr*: Multiaddress

ConnectionTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64

proc setupConnectionTracker(): ConnectionTracker {.gcsafe.}
proc timeoutMonitor(s: Connection) {.async, gcsafe.}

proc getConnectionTracker*(): ConnectionTracker {.gcsafe.} =
result = cast[ConnectionTracker](getTracker(ConnectionTrackerName))
if isNil(result):
result = setupConnectionTracker()

proc dumpTracking(): string {.gcsafe.} =
var tracker = getConnectionTracker()
result = "Opened conns: " & $tracker.opened & "\n" &
"Closed conns: " & $tracker.closed

proc leakTransport(): bool {.gcsafe.} =
var tracker = getConnectionTracker()
result = (tracker.opened != tracker.closed)

proc setupConnectionTracker(): ConnectionTracker =
result = new ConnectionTracker
result.opened = 0
result.closed = 0
result.dump = dumpTracking
result.isLeaked = leakTransport
addTracker(ConnectionTrackerName, result)

func shortLog*(conn: Connection): string =
if conn.isNil: "Connection(nil)"
elif conn.peerInfo.isNil: $conn.oid
Expand All @@ -85,15 +58,12 @@ method initStream*(s: Connection) =
trace "Idle timeout expired, closing connection", s
s.close()

inc getConnectionTracker().opened

method closeImpl*(s: Connection): Future[void] =
# Cleanup timeout timer
trace "Closing connection", s
if not isNil(s.timerTaskFut) and not s.timerTaskFut.finished:
s.timerTaskFut.cancel()

inc getConnectionTracker().closed
trace "Closed connection", s

procCall LPStream(s).closeImpl()
Expand Down
33 changes: 33 additions & 0 deletions libp2p/stream/lpstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ export oids
logScope:
topics = "lpstream"

const
LPStreamTrackerName* = "LPStream"

type
Direction* {.pure.} = enum
In, Out
Expand All @@ -49,6 +52,34 @@ type
InvalidVarintError* = object of LPStreamError
MaxSizeError* = object of LPStreamError

StreamTracker* = ref object of TrackerBase
opened*: uint64
closed*: uint64

proc setupStreamTracker(name: string): StreamTracker =
let tracker = new StreamTracker

proc dumpTracking(): string {.gcsafe.} =
return "Opened " & tracker.id & " :" & $tracker.opened & "\n" &
"Closed " & tracker.id & " :" & $tracker.closed

proc leakTransport(): bool {.gcsafe.} =
return (tracker.opened != tracker.closed)

tracker.id = name
tracker.opened = 0
tracker.closed = 0
tracker.dump = dumpTracking
tracker.isLeaked = leakTransport
addTracker(name, tracker)

return tracker

proc getStreamTracker(name: string): StreamTracker {.gcsafe.} =
result = cast[StreamTracker](getTracker(name))
if isNil(result):
result = setupStreamTracker(name)

proc newLPStreamReadError*(p: ref CatchableError): ref CatchableError =
var w = newException(LPStreamReadError, "Read stream failed")
w.msg = w.msg & ", originated from [" & $p.name & "] " & p.msg
Expand Down Expand Up @@ -92,6 +123,7 @@ method initStream*(s: LPStream) {.base.} =
s.oid = genOid()

libp2p_open_streams.inc(labelValues = [s.objName, $s.dir])
inc getStreamTracker(s.objName).opened
trace "Stream created", s, objName = s.objName, dir = $s.dir

proc join*(s: LPStream): Future[void] =
Expand Down Expand Up @@ -220,6 +252,7 @@ method closeImpl*(s: LPStream): Future[void] {.async, base.} =
trace "Closing stream", s, objName = s.objName, dir = $s.dir
s.closeEvent.fire()
libp2p_open_streams.dec(labelValues = [s.objName, $s.dir])
inc getStreamTracker(s.objName).closed
trace "Closed stream", s, objName = s.objName, dir = $s.dir

method close*(s: LPStream): Future[void] {.base, async.} = # {.raises [Defect].}
Expand Down
5 changes: 5 additions & 0 deletions tests/helpers.nim
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import ../libp2p/transports/tcptransport
import ../libp2p/stream/bufferstream
import ../libp2p/crypto/crypto
import ../libp2p/stream/lpstream
import ../libp2p/muxers/mplex/lpchannel
import ../libp2p/protocols/secure/secure

const
StreamTransportTrackerName = "stream.transport"
StreamServerTrackerName = "stream.server"

trackerNames = [
LPStreamTrackerName,
ConnectionTrackerName,
LPChannelTrackerName,
SecureConnTrackerName,
BufferStreamTrackerName,
TcpTransportTrackerName,
StreamTransportTrackerName,
Expand Down
4 changes: 2 additions & 2 deletions tests/testbufferstream.nim
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import ../libp2p/stream/bufferstream,

suite "BufferStream":
teardown:
# echo getTracker("libp2p.bufferstream").dump()
check getTracker("libp2p.bufferstream").isLeaked() == false
# echo getTracker(BufferStreamTrackerName).dump()
check getTracker(BufferStreamTrackerName).isLeaked() == false

test "push data to buffer":
proc testpushData(): Future[bool] {.async.} =
Expand Down
31 changes: 17 additions & 14 deletions tests/testswitch.nim
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ import ../libp2p/[errors,
peerinfo,
crypto/crypto,
protocols/protocol,
protocols/secure/secure,
muxers/muxer,
muxers/mplex/lpchannel,
stream/lpstream]
import ./helpers

Expand Down Expand Up @@ -246,11 +248,12 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false
var channelTracker = getTracker(LPChannelTrackerName)
# echo channelTracker.dump()
check channelTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
doAssert(not isNil(connTracker))
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down Expand Up @@ -305,11 +308,11 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
var bufferTracker = getTracker(LPChannelTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down Expand Up @@ -370,11 +373,11 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
var bufferTracker = getTracker(LPChannelTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down Expand Up @@ -434,11 +437,11 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
var bufferTracker = getTracker(LPChannelTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down Expand Up @@ -498,11 +501,11 @@ suite "Switch":
check not switch1.isConnected(switch2.peerInfo)
check not switch2.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
var bufferTracker = getTracker(LPChannelTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down Expand Up @@ -577,11 +580,11 @@ suite "Switch":
check not switch2.isConnected(switch1.peerInfo)
check not switch3.isConnected(switch1.peerInfo)

var bufferTracker = getTracker(BufferStreamTrackerName)
var bufferTracker = getTracker(LPChannelTrackerName)
# echo bufferTracker.dump()
check bufferTracker.isLeaked() == false

var connTracker = getTracker(ConnectionTrackerName)
var connTracker = getTracker(SecureConnTrackerName)
# echo connTracker.dump()
check connTracker.isLeaked() == false

Expand Down

0 comments on commit 3956f3f

Please sign in to comment.