Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Split msgs in iwant response if bigger than limit #944

Merged
merged 13 commits into from
Oct 2, 2023

Conversation

diegomrsantos
Copy link
Collaborator

@diegomrsantos diegomrsantos commented Aug 21, 2023

closes #887

@codecov
Copy link

codecov bot commented Aug 22, 2023

Codecov Report

Merging #944 (a4f1f60) into unstable (56599f5) will increase coverage by 0.03%.
Report is 1 commits behind head on unstable.
The diff coverage is 100.00%.

Impacted file tree graph

@@             Coverage Diff              @@
##           unstable     #944      +/-   ##
============================================
+ Coverage     83.07%   83.10%   +0.03%     
============================================
  Files            91       91              
  Lines         15290    15297       +7     
============================================
+ Hits          12702    12713      +11     
+ Misses         2588     2584       -4     
Files Coverage Δ
libp2p/protocols/pubsub/gossipsub.nim 85.40% <100.00%> (-0.81%) ⬇️
libp2p/protocols/pubsub/pubsubpeer.nim 92.50% <100.00%> (+0.89%) ⬆️
libp2p/protocols/pubsub/rpc/messages.nim 52.85% <100.00%> (+6.08%) ⬆️

... and 15 files with indirect coverage changes

@diegomrsantos diegomrsantos force-pushed the split-msgs-iwant-response branch 2 times, most recently from 3e5a638 to 8983767 Compare August 22, 2023 14:48
# Check if the encoded message size exceeds the maxMessageSize
if encoded.len > p.maxMessageSize:
var controlSent = false
# Split the RPCMsg into individual messages and send them separately
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this feels like it should be a responsibility of the caller - otherwise, we should change the send function tot take a list of message instead of a full RPCMsg - also, a single message might also be bigger than the limit so the utility of this change seems limited compared to the complexity it brings.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I only did it here to use the encoded message and not add the overhead of calculating the message size.

If individual messages bigger than the size limit are a common case, we should revisit the message limit. This change allows us to send messages below the size limit that can be sent individually but not as a group.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now we can split the RPCMsg before calling send. But then we need to create a seq to pass it. Wdyt @arnetheduck ?

@@ -272,7 +272,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte]) {.raises: [], async.} =

await conn.close() # This will clean up the send connection

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool): seq[RPCMsg] {.raises: [].} =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the code related to an iwant response uses send https://github.com/status-im/nim-libp2p/blob/0cdfb0fedcb5f19199c3f3607dd6ff67740fb937/libp2p/protocols/pubsub/gossipsub.nim#L298-L300.

But I can also move this solution to sendEncoded, as it is called by send. Wdyt?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, no preference here

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw in the current implementation, I split the msgs after p.sendObservers(mm) and sendMetrics(mm). Do those procs require splitting too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say no

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But to do that we need to create an RPCMsg, add Messages, and calculate its size.

the size can be pre-computed - this would be the other way to write the above loop (this is what most protobuf encoders do btw - they precompute size then allocate memory in a single shot).

the overhead of rpcmsg itself is static so it can easily be accounted for - this would be the other way to implement this, ie precompute sizes and split based on them and the rpcmsg overhead itself.

Efficiency is indeed key in gossipsub because of its multiplicative nature - any overhead quickly explodes as message counts go up - at this point, we should be mindful of it and indeed, one of the best things we could do in terms of overall efficiency of gossipsub (and therefore nimbus) would be to improve the efficiency of rpcmsg encoding and decoding in general.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so how about this?

1 - We create a len proc for RPCMsg and all other types used within it that don't have one already.
2 - We use those procs to estimate the serialized size of a RPCMsg and split it based on that. Let's say len(serializedRPCMsg) = 1.1 * len(rpcMsg) - guessing a 10% Protobuf overhead.
3 - We send each RPCMsg individually.

A pseud-code below:

Function splitRPCMsg(rpcMsg: RPCMsg, maxSize: int) -> List[RPCMsg]:
    Initialize newRPCMsgs as an empty list of RPCMsg objects
    Initialize currentRPCMsg as a new RPCMsg object with the same non-message fields. We sent them only in the first RPCMsg, the next ones have only Messages.
    Initialize currentSize = len(currentRPCMsg)  # This includes the size of control and other fields

    For each msg in rpcMsg.messages:
        msgSize = len(msg)

        # Check if adding the next message will exceed maxSize
        If (currentSize + msgSize) * 1.1 > maxSize:
            Add currentRPCMsg to newRPCMsgs
            Initialize currentRPCMsg as a new, empty RPCMsg object
            currentSize = 0

        # Add the message to the current RPCMsg
        Add msg to currentRPCMsg.messages
        currentSize += msgSize

    # Check if there is a non-empty currentRPCMsg left to be added
    If currentSize > 0:
        Add currentRPCMsg to newRPCMsgs

    Return newRPCMsgs

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be done deterministically, there's code in nim-protobuf-serialization showing how - also, unless there's need for it, better to return the encoded messages (or even better, turn this into an iterator which yields the encoded messages so that there is no complex seq involved

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly we don't have to be super precise, even if we just split once we reach limit / 2 without counting pb overhead, each message would still be >500kb with probably <50 bytes overhead (or 0.01% overhead, that's assuming 1mb max message size)
That's why I also proposed originally to split in the handleIWant directly (or somewhere like that). Even just sending each reply in a separate message wouldn't have crazy overhead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't use nim-protobuf-serialization in libp2p. Tried importing it and using computeObjectSize, but got errors about lack of pragma annotations and so on.

@@ -116,3 +116,36 @@ func shortLog*(m: RPCMsg): auto =
messages: mapIt(m.messages, it.shortLog),
control: m.control.get(ControlMessage()).shortLog
)

proc len(peerInfo: PeerInfoMsg): int =
return peerInfo.peerId.len + peerInfo.signedPeerRecord.len
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return peerInfo.peerId.len + peerInfo.signedPeerRecord.len
peerInfo.peerId.len + peerInfo.signedPeerRecord.len

..

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
var currentRPCMsg = RPCMsg(subscriptions: rpcMsg.subscriptions, control: rpcMsg.control,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this copy needs to be avoided when no splitting is needed

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe when no split is needed the iterator isn't called:

  if encoded.len > p.maxMessageSize:
    for encodedSplitMsg in splitRPCMsg(p, msg, p.maxMessageSize, anonymize):
      asyncSpawn p.sendEncoded(encodedSplitMsg)
  else:
    # If the message size is within limits, send it as is
    debug "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
    asyncSpawn p.sendEncoded(encoded)

Comment on lines +24 to +44
type
TestGossipSub* = ref object of GossipSub

proc getPubSubPeer*(p: TestGossipSub, peerId: PeerId): PubSubPeer =
proc getConn(): Future[Connection] =
p.switch.dial(peerId, GossipSubCodec)

let pubSubPeer = PubSubPeer.new(peerId, getConn, nil, GossipSubCodec, 1024 * 1024)
debug "created new pubsub peer", peerId

p.peers[peerId] = pubSubPeer

onNewPeer(p, pubSubPeer)
pubSubPeer

proc randomPeerId*(): PeerId =
try:
PeerId.init(PrivateKey.random(ECDSA, rng[]).get()).tryGet()
except CatchableError as exc:
raise newException(Defect, exc.msg)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this?
If you remove it, you can also make onNewPeer private again

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This already existed, I just moved it to this file.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it cause I tried to use getPubSubPeer in my test, but it isn't necessary anymore. I tried to revert it, but I got multiple errors as testgossipinternal calls onNewPeer and it works only because gossipsub file was included there. I can no longer do that as I import utils in testgossipinternal now and it causes an error. Yeah, I know, messy.

proc send*(p: PubSubPeer, msg: RPCMsg, anonymize: bool) {.raises: [].} =
trace "sending msg to peer", peer = p, rpcMsg = shortLog(msg)
iterator splitRPCMsg(peer: PubSubPeer, rpcMsg: RPCMsg, maxSize: int, anonymize: bool): seq[byte] =
var currentRPCMsg = RPCMsg(subscriptions: rpcMsg.subscriptions, control: rpcMsg.control,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code (& the len part) seems very prone to failure, if we add a field to RPCMsg, update a field, etc
We should at the very least add a comment to the RPCMsg declaration to warn about that, and ideally find a better solution

It seems that the best solution would be to use nim-protobuf-ser, but we can't yet unfortunately

As an alternative, wouldn't it be simpler to modify encodeRpcMsg? It could take a maxSize, and return a seq[seq[byte]], or even just concatenate the PBs directly internally (would require some tweaks to the pubsubpeer)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think adding the comment is a good and easy-to-do improvement. But not sure about splitting in encodeRpcMsg, it seems too much abstraction leakage.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the comment

@Menduist Menduist mentioned this pull request Sep 20, 2023
Copy link
Contributor

@Menduist Menduist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great job!

@diegomrsantos diegomrsantos merged commit 7587181 into unstable Oct 2, 2023
10 checks passed
@diegomrsantos diegomrsantos deleted the split-msgs-iwant-response branch October 2, 2023 09:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

IWANT replies can be bigger than the pubsub message limit
3 participants