Skip to content

Commit

Permalink
chore: refactoring peer storage (#2243)
Browse files Browse the repository at this point in the history
  • Loading branch information
SionoiS committed Nov 27, 2023
1 parent b31c182 commit c301e88
Show file tree
Hide file tree
Showing 4 changed files with 156 additions and 148 deletions.
67 changes: 29 additions & 38 deletions tests/test_peer_storage.nim
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
{.used.}

import
std/options,
testutils/unittests,
eth/p2p/discoveryv5/enr,
libp2p/crypto/crypto
import
../../waku/common/databases/db_sqlite,
../../waku/node/peer_manager/peer_manager,
../../waku/node/peer_manager/peer_store/waku_peer_storage,
../../waku/waku_enr,
./testlib/wakucore


Expand All @@ -19,14 +22,22 @@ suite "Peer Storage":

# Test Peer
peerLoc = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet()
peerKey = generateEcdsaKey()
peerKey = generateSecp256k1Key()
peer = PeerInfo.new(peerKey, @[peerLoc])
peerProto = "/waku/2/default-waku/codec"
connectedness = Connectedness.CanConnect
disconn = 999999
stored = RemotePeerInfo(
topics = @["/waku/2/rs/2/0", "/waku/2/rs/2/1"]

# Create ENR
var enrBuilder = EnrBuilder.init(peerKey)
enrBuilder.withShardedTopics(topics).expect("Valid topics")
let record = enrBuilder.build().expect("Valid record")

let stored = RemotePeerInfo(
peerId: peer.peerId,
addrs: @[peerLoc],
enr: some(record),
protocols: @[peerProto],
publicKey: peerKey.getPublicKey().tryGet(),
connectedness: connectedness,
Expand All @@ -36,72 +47,52 @@ suite "Peer Storage":

# Test insert and retrieve

require storage.put(peer.peerId, stored, connectedness, disconn).isOk
require storage.put(stored).isOk

var responseCount = 0

# Fetched variables from callback
var resPeerId: PeerId
# Fetched variable from callback
var resStoredInfo: RemotePeerInfo
var resConnectedness: Connectedness
var resDisconnect: int64

proc data(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
proc data(storedInfo: RemotePeerInfo) =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let res = storage.getAll(data)

check:
res.isErr == false
responseCount == 1
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CanConnect
resDisconnect == disconn
resStoredInfo.connectedness == connectedness
resStoredInfo.disconnectTime == disconn

assert resStoredInfo.enr.isSome(), "The ENR info wasn't properly stored"
check: resStoredInfo.enr.get() == record

# Test replace and retrieve (update an existing entry)
require storage.put(peer.peerId, stored, Connectedness.CannotConnect, disconn + 10).isOk
stored.connectedness = CannotConnect
stored.disconnectTime = disconn + 10
stored.enr = none(Record)
require storage.put(stored).isOk

responseCount = 0
proc replacedData(peerId: PeerID, storedInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.raises: [Defect].} =
proc replacedData(storedInfo: RemotePeerInfo) =
responseCount += 1

# Note: cannot use `check` within `{.raises: [Defect].}` block
# @TODO: /Nim/lib/pure/unittest.nim(577, 16) Error: can raise an unlisted exception: Exception
# These flags are checked outside this block.
resPeerId = peerId
resStoredInfo = storedInfo
resConnectedness = connectedness
resDisconnect = disconnectTime

let repRes = storage.getAll(replacedData)

check:
repRes.isErr == false
responseCount == 1
resPeerId == peer.peerId
resStoredInfo.peerId == peer.peerId
resStoredInfo.addrs == @[peerLoc]
resStoredInfo.protocols == @[peerProto]
resStoredInfo.publicKey == peerKey.getPublicKey().tryGet()
# TODO: For compatibility, we don't store connectedness and disconnectTime
#resStoredInfo.connectedness == connectedness
#resStoredInfo.disconnectTime == disconn
resConnectedness == Connectedness.CannotConnect
resDisconnect == disconn + 10
resStoredInfo.connectedness == Connectedness.CannotConnect
resStoredInfo.disconnectTime == disconn + 10
resStoredInfo.enr.isNone()
56 changes: 35 additions & 21 deletions waku/node/peer_manager/peer_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -101,19 +101,15 @@ proc calculateBackoff(initialBackoffInSec: int,
# Helper functions #
####################

proc insertOrReplace(ps: PeerStorage,
peerId: PeerID,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64 = 0) =
# Insert peer entry into persistent storage, or replace existing entry with updated info
let res = ps.put(peerId, remotePeerInfo, connectedness, disconnectTime)
if res.isErr:
warn "failed to store peers", err = res.error
proc insertOrReplace(ps: PeerStorage, remotePeerInfo: RemotePeerInfo) =
## Insert peer entry into persistent storage, or replace existing entry with updated info
ps.put(remotePeerInfo).isOkOr:
warn "failed to store peers", err = error
waku_peers_errors.inc(labelValues = ["storage_failure"])
return

proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownOrigin) =
# Adds peer to manager for the specified protocol
## Adds peer to manager for the specified protocol

if remotePeerInfo.peerId == pm.switch.peerInfo.peerId:
# Do not attempt to manage our unmanageable self
Expand All @@ -140,7 +136,9 @@ proc addPeer*(pm: PeerManager, remotePeerInfo: RemotePeerInfo, origin = UnknownO

# Add peer to storage. Entry will subsequently be updated with connectedness information
if not pm.storage.isNil:
pm.storage.insertOrReplace(remotePeerInfo.peerId, remotePeerInfo, NotConnected)
remotePeerInfo.connectedness = NotConnected

pm.storage.insertOrReplace(remotePeerInfo)

# Connects to a given node. Note that this function uses `connect` and
# does not provide a protocol. Streams for relay (gossipsub) are created
Expand Down Expand Up @@ -231,16 +229,26 @@ proc dialPeer(pm: PeerManager,
return none(Connection)

proc loadFromStorage(pm: PeerManager) =
## Load peers from storage, if available

debug "loading peers from storage"
# Load peers from storage, if available

var amount = 0
proc onData(peerId: PeerID, remotePeerInfo: RemotePeerInfo, connectedness: Connectedness, disconnectTime: int64) =
trace "loading peer", peerId=peerId, connectedness=connectedness

if peerId == pm.switch.peerInfo.peerId:
proc onData(remotePeerInfo: RemotePeerInfo) =
let peerId = remotePeerInfo.peerId

if pm.switch.peerInfo.peerId == peerId:
# Do not manage self
return

trace "loading peer",
peerId = peerId,
address = remotePeerInfo.addrs,
protocols = remotePeerInfo.protocols,
agent = remotePeerInfo.agent,
version = remotePeerInfo.protoVersion

# nim-libp2p books
pm.peerStore[AddressBook][peerId] = remotePeerInfo.addrs
pm.peerStore[ProtoBook][peerId] = remotePeerInfo.protocols
Expand All @@ -250,18 +258,20 @@ proc loadFromStorage(pm: PeerManager) =

# custom books
pm.peerStore[ConnectionBook][peerId] = NotConnected # Reset connectedness state
pm.peerStore[DisconnectBook][peerId] = disconnectTime
pm.peerStore[DisconnectBook][peerId] = remotePeerInfo.disconnectTime
pm.peerStore[SourceBook][peerId] = remotePeerInfo.origin

if remotePeerInfo.enr.isSome():
pm.peerStore[ENRBook][peerId] = remotePeerInfo.enr.get()

amount.inc()

let res = pm.storage.getAll(onData)
if res.isErr:
warn "failed to load peers from storage", err = res.error
pm.storage.getAll(onData).isOkOr:
warn "loading peers from storage failed", err = error
waku_peers_errors.inc(labelValues = ["storage_load_failure"])
return

debug "successfully queried peer storage", amount = amount
debug "recovered peers from storage", amount = amount

proc canBeConnected*(pm: PeerManager,
peerId: PeerId): bool =
Expand Down Expand Up @@ -385,8 +395,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =

pm.peerStore[ConnectionBook][peerId] = connectedness
pm.peerStore[DirectionBook][peerId] = direction

if not pm.storage.isNil:
pm.storage.insertOrReplace(peerId, pm.peerStore.get(peerId), connectedness, getTime().toUnix)
var remotePeerInfo = pm.peerStore.get(peerId)
remotePeerInfo.disconnectTime = getTime().toUnix

pm.storage.insertOrReplace(remotePeerInfo)

proc new*(T: type PeerManager,
switch: Switch,
Expand Down
18 changes: 9 additions & 9 deletions waku/node/peer_manager/peer_store/peer_storage.nim
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ type

PeerStorageResult*[T] = Result[T, string]

DataProc* = proc(peerId: PeerID, remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness, disconnectTime: int64) {.closure, raises: [Defect].}
DataProc* = proc(remotePeerInfo: RemotePeerInfo) {.closure, raises: [Defect].}

# PeerStorage interface
method put*(db: PeerStorage,
peerId: PeerID,
remotePeerInfo: RemotePeerInfo,
connectedness: Connectedness,
disconnectTime: int64): PeerStorageResult[void] {.base.} = discard

method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[bool] {.base.} = discard
method put*(
db: PeerStorage,
remotePeerInfo: RemotePeerInfo
): PeerStorageResult[void] {.base.} =
return err("Unimplemented")

method getAll*(db: PeerStorage, onData: DataProc): PeerStorageResult[void] {.base.} =
return err("Unimplemented")

0 comments on commit c301e88

Please sign in to comment.