Skip to content
This repository has been archived by the owner on Mar 26, 2019. It is now read-only.

Commit

Permalink
Address PR #52 review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
kdeme authored and zah committed Nov 28, 2018
1 parent ccea6dc commit 420298d
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 47 deletions.
Expand Up @@ -109,7 +109,7 @@ type
allowP2P: bool

bloom: Bloom # cached bloom filter of all topics of filter
handler: Option[FilterMsgHandler]
handler: FilterMsgHandler
queue: seq[ReceivedMessage]

Filters* = Table[string, Filter]
Expand Down Expand Up @@ -545,8 +545,6 @@ proc add*(self: var Queue, msg: Message): bool =
self.itemHashes.excl(last)

# check for duplicate
# NOTE: Could also track if duplicates come from the same peer and disconnect
# from that peer. Is this tracking overhead worth it though?
if self.itemHashes.containsOrIncl(msg):
return false
else:
Expand All @@ -561,15 +559,15 @@ proc newFilter*(src = none[PublicKey](), privateKey = none[PrivateKey](),
powReq: powReq, allowP2P: allowP2P, bloom: toBloom(topics))

proc subscribeFilter*(filters: var Filters, filter: Filter,
handler = none[FilterMsgHandler]()): string =
handler:FilterMsgHandler = nil): string =
# NOTE: Should we allow a filter without a key? Encryption is mandatory in v6?
# Check if asymmetric _and_ symmetric key? Now asymmetric just has precedence.
let id = generateRandomID()
var filter = filter
if handler.isSome():
filter.handler = handler
if handler.isNil():
filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
else:
filter.queue = newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)
filter.handler = handler

filters.add(id, filter)
debug "Filter added", filter = id
Expand Down Expand Up @@ -628,16 +626,16 @@ proc notify*(filters: var Filters, msg: Message) =
pow: msg.pow,
hash: msg.hash)
# Either run callback or add to queue
if filter.handler.isSome():
filter.handler.get()(receivedMsg)
else:
if filter.handler.isNil():
filter.queue.insert(receivedMsg)
else:
filter.handler(receivedMsg)

proc getFilterMessages*(filters: var Filters, filterId: string): seq[ReceivedMessage] =
result = @[]
if filters.contains(filterId):
if filters[filterId].handler.isNone():
result = filters[filterId].queue
if filters[filterId].handler.isNil():
shallowCopy(result, filters[filterId].queue)
filters[filterId].queue =
newSeqOfCap[ReceivedMessage](defaultFilterQueueCapacity)

Expand Down Expand Up @@ -715,7 +713,9 @@ p2pProtocol Whisper(version = whisperVersion,
whisperPeer.trusted = false
whisperPeer.initialized = true

asyncCheck peer.run()
if not whisperNet.config.isLightNode:
asyncCheck peer.run()

debug "Whisper peer initialized"

onPeerDisconnected do (peer: Peer, reason: DisconnectionReason) {.gcsafe.}:
Expand Down Expand Up @@ -747,9 +747,18 @@ p2pProtocol Whisper(version = whisperVersion,
# await peer.disconnect(SubprotocolReason)
continue

# This peer send it thus should not receive it again
peer.state.received.incl(msg)
# This peer send this message thus should not receive it again.
# If this peer has the message in the `received` set already, this means
# it was either already received here from this peer or send to this peer.
# Either way it will be in our queue already (and the peer should know
# this) and this peer is sending duplicates.
if peer.state.received.containsOrIncl(msg):
warn "Peer sending duplicate messages"
# await peer.disconnect(SubprotocolReason)
continue

# This can still be a duplicate message, but from another peer than
# the peer who send the message.
if peer.networkState.queue.add(msg):
# notify filters of this message
peer.networkState.filters.notify(msg)
Expand Down Expand Up @@ -817,10 +826,7 @@ proc run(peer: Peer) {.async.} =

whisperPeer.running = true
while whisperPeer.running:
# XXX: shouldn't this be outside of the loop?
# In case we are runinng a light node, we have nothing to do here?
if not whisperNet.config.isLightNode:
peer.processQueue()
peer.processQueue()
await sleepAsync(300)

proc pruneReceived(node: EthereumNode) =
Expand Down Expand Up @@ -906,7 +912,7 @@ proc postMessage*(node: EthereumNode, pubKey = none[PublicKey](),
return false

proc subscribeFilter*(node: EthereumNode, filter: Filter,
handler = none[FilterMsgHandler]()): string =
handler:FilterMsgHandler = nil): string =
return node.protocolState(Whisper).filters.subscribeFilter(filter, handler)

proc unsubscribeFilter*(node: EthereumNode, filterId: string): bool =
Expand All @@ -922,16 +928,20 @@ proc filtersToBloom*(node: EthereumNode): Bloom =
proc setPowRequirement*(node: EthereumNode, powReq: float64) {.async.} =
# NOTE: do we need a tolerance of old PoW for some time?
node.protocolState(Whisper).config.powRequirement = powReq
var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper):
# asyncCheck peer.powRequirement(cast[uint](powReq))
await peer.powRequirement(cast[uint](powReq))
futures.add(peer.powRequirement(cast[uint](powReq)))

await all(futures)

proc setBloomFilter*(node: EthereumNode, bloom: Bloom) {.async.} =
# NOTE: do we need a tolerance of old bloom filter for some time?
node.protocolState(Whisper).config.bloom = bloom
var futures: seq[Future[void]] = @[]
for peer in node.peers(Whisper):
# asyncCheck peer.bloomFilterExchange(@bloom)
await peer.bloomFilterExchange(@bloom)
futures.add(peer.bloomFilterExchange(@bloom))

await all(futures)

proc setMaxMessageSize*(node: EthereumNode, size: uint32): bool =
if size > defaultMaxMsgSize:
Expand All @@ -946,12 +956,11 @@ proc setPeerTrusted*(node: EthereumNode, peerId: NodeId): bool =
peer.state(Whisper).trusted = true
return true

# XXX: should probably only be allowed before connection is made,
# as there exists no message to communicate to peers that it is a light node
# How to arrange that?
# NOTE: Should be run before connection is made with peers
proc setLightNode*(node: EthereumNode, isLightNode: bool) =
node.protocolState(Whisper).config.isLightNode = isLightNode

# NOTE: Should be run before connection is made with peers
proc configureWhisper*(node: EthereumNode, config: WhisperConfig) =
node.protocolState(Whisper).config = config

10 changes: 5 additions & 5 deletions tests/shh_basic_client.nim
Expand Up @@ -9,7 +9,7 @@

import
sequtils, options, strutils, parseopt, asyncdispatch2,
eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol],
eth_keys, rlp, eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol],
eth_p2p/[discovery, enode, peer_pool]

const
Expand Down Expand Up @@ -153,21 +153,21 @@ if config.watch:
# filter encrypted asym
discard node.subscribeFilter(newFilter(privateKey = some(encPrivateKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted asym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
privateKey = some(encPrivateKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted sym
discard node.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)
# filter encrypted sym + signed
discard node.subscribeFilter(newFilter(some(signPublicKey),
symKey = some(symKey),
topics = @[topic]),
some((FilterMsgHandler)handler))
handler)

if config.post:
# encrypted asym
Expand Down
2 changes: 1 addition & 1 deletion tests/tshh.nim
Expand Up @@ -11,7 +11,7 @@ import
sequtils, options, unittest, times, tables,
nimcrypto/hash,
eth_keys, rlp,
eth_p2p/rlpx_protocols/shh_protocol as whisper
eth_p2p/rlpx_protocols/whisper_protocol as whisper

suite "Whisper payload":
test "should roundtrip without keys":
Expand Down
25 changes: 12 additions & 13 deletions tests/tshh_connect.nim
Expand Up @@ -9,7 +9,7 @@

import
sequtils, options, unittest, tables, asyncdispatch2, rlp, eth_keys,
eth_p2p, eth_p2p/rlpx_protocols/[shh_protocol], eth_p2p/[discovery, enode]
eth_p2p, eth_p2p/rlpx_protocols/[whisper_protocol], eth_p2p/[discovery, enode]

proc localAddress(port: int): Address =
let port = Port(port)
Expand Down Expand Up @@ -100,18 +100,18 @@ asyncTest "Filters with encryption and signing":
# Filters
# filter for encrypted asym
filters.add(node1.subscribeFilter(newFilter(privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), some(handler1)))
topics = @[topic]), handler1))
# filter for encrypted asym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
privateKey = some(encryptKeyPair.seckey),
topics = @[topic]), some(handler2)))
topics = @[topic]), handler2))
# filter for encrypted sym
filters.add(node1.subscribeFilter(newFilter(symKey = some(symKey),
topics = @[topic]), some(handler3)))
topics = @[topic]), handler3))
# filter for encrypted sym + signed
filters.add(node1.subscribeFilter(newFilter(some(signKeyPair.pubkey),
symKey = some(symKey),
topics = @[topic]), some(handler4)))
topics = @[topic]), handler4))
# Messages
# encrypted asym
check true == node2.postMessage(some(encryptKeyPair.pubkey), ttl = 5,
Expand Down Expand Up @@ -153,8 +153,8 @@ asyncTest "Filters with topics":
check msg.decoded.payload == payloads[1]
futures[1].complete(1)

var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), some(handler1))
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), some(handler2))
var filter1 = node1.subscribeFilter(newFilter(topics = @[topic1]), handler1)
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic2]), handler2)

check:
true == node2.postMessage(ttl = 3, topic = topic1, payload = payloads[0])
Expand Down Expand Up @@ -183,9 +183,9 @@ asyncTest "Filters with PoW":
futures[1].complete(1)

var filter1 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 0),
some(handler1))
handler1)
var filter2 = node1.subscribeFilter(newFilter(topics = @[topic], powReq = 10),
some(handler2))
handler2)

check:
true == node2.postMessage(ttl = 2, topic = topic, payload = payload)
Expand Down Expand Up @@ -228,7 +228,7 @@ asyncTest "Bloomfilter blocking":
proc handler(msg: ReceivedMessage) =
check msg.decoded.payload == payload
f.complete(1)
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), some(handler))
var filter = node1.subscribeFilter(newFilter(topics = filterTopics), handler)
await node1.setBloomFilter(node1.filtersToBloom())

check true == node2.postMessage(ttl = 1, topic = sendTopic1, payload = payload)
Expand Down Expand Up @@ -298,10 +298,9 @@ asyncTest "Queue pruning":
asyncTest "Light node posting":
let topic = [byte 0, 0, 0, 0]
node1.setLightNode(true)
var result = node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10))

check:
result == false
node1.postMessage(ttl = 2, topic = topic, payload = repeat(byte 0, 10)) == false
node1.protocolState(Whisper).queue.items.len == 0

node1.setLightNode(false)
Expand All @@ -314,7 +313,7 @@ asyncTest "P2P":
f.complete(1)

var filter = node1.subscribeFilter(newFilter(topics = @[topic], allowP2P = true),
some(handler))
handler)
check:
true == node1.setPeerTrusted(toNodeId(node2.keys.pubkey))
true == node2.postMessage(ttl = 2, topic = topic,
Expand Down
2 changes: 1 addition & 1 deletion tests/tshh_connect_mocked.nim
Expand Up @@ -9,7 +9,7 @@

import
options, unittest, asyncdispatch2, rlp, eth_keys,
eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[shh_protocol]
eth_p2p, eth_p2p/mock_peers, eth_p2p/rlpx_protocols/[whisper_protocol]

proc localAddress(port: int): Address =
let port = Port(port)
Expand Down

0 comments on commit 420298d

Please sign in to comment.