Skip to content

Commit

Permalink
Add urgent flag to SendRPC in trace.proto
Browse files Browse the repository at this point in the history
  • Loading branch information
ppopth committed Mar 1, 2024
1 parent 3e3d6d7 commit dfd81e8
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 93 deletions.
2 changes: 1 addition & 1 deletion floodsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (fs *FloodSubRouter) Publish(msg *Message) {
// Drop it. The peer is too slow.
continue
}
fs.tracer.SendRPC(out, pid)
fs.tracer.SendRPC(out, pid, false)
}
}

Expand Down
22 changes: 11 additions & 11 deletions gossip_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,17 +167,17 @@ func (gt *gossipTracer) ValidateMessage(msg *Message) {
gt.fulfillPromise(msg)
}

func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) UndeliverableMessage(msg *Message) {}
func (gt *gossipTracer) AddPeer(p peer.ID, proto protocol.ID) {}
func (gt *gossipTracer) RemovePeer(p peer.ID) {}
func (gt *gossipTracer) Join(topic string) {}
func (gt *gossipTracer) Leave(topic string) {}
func (gt *gossipTracer) Graft(p peer.ID, topic string) {}
func (gt *gossipTracer) Prune(p peer.ID, topic string) {}
func (gt *gossipTracer) DuplicateMessage(msg *Message) {}
func (gt *gossipTracer) RecvRPC(rpc *RPC) {}
func (gt *gossipTracer) SendRPC(rpc *RPC, p peer.ID, urgent bool) {}
func (gt *gossipTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (gt *gossipTracer) UndeliverableMessage(msg *Message) {}

func (gt *gossipTracer) ThrottlePeer(p peer.ID) {
gt.Lock()
Expand Down
2 changes: 1 addition & 1 deletion gossipsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -1224,7 +1224,7 @@ func (gs *GossipSubRouter) doSendRPC(rpc *RPC, p peer.ID, q *rpcQueue, urgent bo
gs.doDropRPC(rpc, p, "queue full")
return
}
gs.tracer.SendRPC(rpc, p)
gs.tracer.SendRPC(rpc, p, urgent)
}

func fragmentRPC(rpc *RPC, limit int) ([]*RPC, error) {
Expand Down
175 changes: 109 additions & 66 deletions pb/trace.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pb/trace.proto
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ message TraceEvent {
message SendRPC {
optional bytes sendTo = 1;
optional RPCMeta meta = 2;
optional bool urgent = 3;
}

message DropRPC {
Expand Down
2 changes: 1 addition & 1 deletion peer_gater.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (pg *peerGater) ThrottlePeer(p peer.ID) {}

func (pg *peerGater) RecvRPC(rpc *RPC) {}

func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID) {}
func (pg *peerGater) SendRPC(rpc *RPC, p peer.ID, urgent bool) {}

func (pg *peerGater) DropRPC(rpc *RPC, p peer.ID) {}

Expand Down
4 changes: 2 additions & 2 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,7 +925,7 @@ func (p *PubSub) announce(topic string, sub bool) {
go p.announceRetry(pid, topic, sub)
continue
}
p.tracer.SendRPC(out, pid)
p.tracer.SendRPC(out, pid, false)
}
}

Expand Down Expand Up @@ -968,7 +968,7 @@ func (p *PubSub) doAnnounceRetry(pid peer.ID, topic string, sub bool) {
go p.announceRetry(pid, topic, sub)
return
}
p.tracer.SendRPC(out, pid)
p.tracer.SendRPC(out, pid, false)
}

// notifySubs sends a given message to all corresponding subscribers.
Expand Down
2 changes: 1 addition & 1 deletion randomsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func (rs *RandomSubRouter) Publish(msg *Message) {
rs.tracer.DropRPC(out, p)
continue
}
rs.tracer.SendRPC(out, p)
rs.tracer.SendRPC(out, p, false)
}
}

Expand Down
2 changes: 1 addition & 1 deletion score.go
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ func (ps *peerScore) ThrottlePeer(p peer.ID) {}

func (ps *peerScore) RecvRPC(rpc *RPC) {}

func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID) {}
func (ps *peerScore) SendRPC(rpc *RPC, p peer.ID, urgent bool) {}

func (ps *peerScore) DropRPC(rpc *RPC, p peer.ID) {}

Expand Down
12 changes: 6 additions & 6 deletions tag_tracer.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,9 @@ func (t *tagTracer) RejectMessage(msg *Message, reason string) {
}
}

func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
func (t *tagTracer) RecvRPC(rpc *RPC) {}
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) UndeliverableMessage(msg *Message) {}
func (t *tagTracer) RemovePeer(peer.ID) {}
func (t *tagTracer) ThrottlePeer(p peer.ID) {}
func (t *tagTracer) RecvRPC(rpc *RPC) {}
func (t *tagTracer) SendRPC(rpc *RPC, p peer.ID, urgent bool) {}
func (t *tagTracer) DropRPC(rpc *RPC, p peer.ID) {}
func (t *tagTracer) UndeliverableMessage(msg *Message) {}

0 comments on commit dfd81e8

Please sign in to comment.