Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Move connection management into networking layer #351

Merged
merged 15 commits into from
Apr 18, 2020
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
21 changes: 4 additions & 17 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -745,32 +745,19 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) {
func (e *Engine) PeerConnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
l, ok := e.ledgerMap[p]

_, ok := e.ledgerMap[p]
if !ok {
l = newLedger(p)
e.ledgerMap[p] = l
e.ledgerMap[p] = newLedger(p)
}

l.lk.Lock()
defer l.lk.Unlock()
l.ref++
}

// PeerDisconnected is called when a peer disconnects.
func (e *Engine) PeerDisconnected(p peer.ID) {
e.lock.Lock()
defer e.lock.Unlock()
l, ok := e.ledgerMap[p]
if !ok {
return
}

l.lk.Lock()
defer l.lk.Unlock()
l.ref--
if l.ref <= 0 {
delete(e.ledgerMap, p)
}
delete(e.ledgerMap, p)
}

// If the want is a want-have, and it's below a certain size, send the full
Expand Down
4 changes: 0 additions & 4 deletions internal/decision/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ type ledger struct {
// wantList is a (bounded, small) set of keys that Partner desires.
wantList *wl.Wantlist

// ref is the reference count for this ledger, its used to ensure we
// don't drop the reference to this ledger in multi-connection scenarios
ref int

lk sync.RWMutex
}

Expand Down
139 changes: 45 additions & 94 deletions internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ const (
defaultRebroadcastInterval = 30 * time.Second
// maxRetries is the number of times to attempt to send a message before
// giving up
maxRetries = 10
maxRetries = 3
sendTimeout = 30 * time.Second
// maxMessageSize is the maximum message size in bytes
maxMessageSize = 1024 * 1024 * 2
// sendErrorBackoff is the time to wait before retrying to connect after
Expand All @@ -46,7 +47,7 @@ const (
// sender.
type MessageNetwork interface {
ConnectTo(context.Context, peer.ID) error
NewMessageSender(context.Context, peer.ID) (bsnet.MessageSender, error)
NewMessageSender(context.Context, peer.ID, *bsnet.MessageSenderOpts) (bsnet.MessageSender, error)
Latency(peer.ID) time.Duration
Ping(context.Context, peer.ID) ping.Result
Self() peer.ID
Expand All @@ -55,14 +56,14 @@ type MessageNetwork interface {
// MessageQueue implements queue of want messages to send to peers.
type MessageQueue struct {
ctx context.Context
shutdown func()
p peer.ID
network MessageNetwork
dhTimeoutMgr DontHaveTimeoutManager
maxMessageSize int
sendErrorBackoff time.Duration

outgoingWork chan time.Time
done chan struct{}

// Take lock whenever any of these variables are modified
wllock sync.Mutex
Expand Down Expand Up @@ -169,8 +170,10 @@ func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeo
func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
maxMsgSize int, sendErrorBackoff time.Duration, dhTimeoutMgr DontHaveTimeoutManager) *MessageQueue {

ctx, cancel := context.WithCancel(ctx)
mq := &MessageQueue{
ctx: ctx,
shutdown: cancel,
p: p,
network: network,
dhTimeoutMgr: dhTimeoutMgr,
Expand All @@ -179,7 +182,6 @@ func newMessageQueue(ctx context.Context, p peer.ID, network MessageNetwork,
peerWants: newRecallWantList(),
cancels: cid.NewSet(),
outgoingWork: make(chan time.Time, 1),
done: make(chan struct{}),
rebroadcastInterval: defaultRebroadcastInterval,
sendErrorBackoff: sendErrorBackoff,
priority: maxPriority,
Expand Down Expand Up @@ -300,12 +302,17 @@ func (mq *MessageQueue) Startup() {

// Shutdown stops the processing of messages for a message queue.
func (mq *MessageQueue) Shutdown() {
close(mq.done)
mq.shutdown()
}

func (mq *MessageQueue) onShutdown() {
// Shut down the DONT_HAVE timeout manager
mq.dhTimeoutMgr.Shutdown()

// Reset the streamMessageSender
if mq.sender != nil {
_ = mq.sender.Reset()
}
}

func (mq *MessageQueue) runQueue() {
Expand Down Expand Up @@ -351,15 +358,7 @@ func (mq *MessageQueue) runQueue() {
// in sendMessageDebounce. Send immediately.
workScheduled = time.Time{}
mq.sendIfReady()
case <-mq.done:
if mq.sender != nil {
mq.sender.Close()
}
return
case <-mq.ctx.Done():
if mq.sender != nil {
_ = mq.sender.Reset()
}
return
}
}
Expand Down Expand Up @@ -409,12 +408,12 @@ func (mq *MessageQueue) sendIfReady() {
}

func (mq *MessageQueue) sendMessage() {
err := mq.initializeSender()
sender, err := mq.initializeSender()
if err != nil {
log.Infof("cant open message sender to peer %s: %s", mq.p, err)
// TODO: cant connect, what now?
// TODO: should we stop using this connection and clear the want list
// to avoid using up memory?
// If we fail to initialize the sender, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not open message sender to peer %s: %s", mq.p, err)
mq.Shutdown()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand All @@ -435,23 +434,25 @@ func (mq *MessageQueue) sendMessage() {
wantlist := message.Wantlist()
mq.logOutgoingMessage(wantlist)

// Try to send this message repeatedly
for i := 0; i < maxRetries; i++ {
if mq.attemptSendAndRecovery(message) {
// We were able to send successfully.
onSent()
if err := sender.SendMsg(mq.ctx, message); err != nil {
// If the message couldn't be sent, the networking layer will
// emit a Disconnect event and the MessageQueue will get cleaned up
log.Infof("Could not send message to peer %s: %s", mq.p, err)
mq.Shutdown()
return
}

mq.simulateDontHaveWithTimeout(wantlist)
// We were able to send successfully.
onSent()
dirkmc marked this conversation as resolved.
Show resolved Hide resolved

// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
}
// Set a timer to wait for responses
mq.simulateDontHaveWithTimeout(wantlist)

return
}
// If the message was too big and only a subset of wants could be
// sent, schedule sending the rest of the wants in the next
// iteration of the event loop.
if mq.hasPendingWork() {
mq.signalWorkReady()
}
}

Expand Down Expand Up @@ -620,69 +621,19 @@ func (mq *MessageQueue) extractOutgoingMessage(supportsHave bool) (bsmsg.BitSwap
return mq.msg, onMessageSent
}

func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
if err != nil {
return err
}
mq.sender = nsender
return nil
}

func (mq *MessageQueue) attemptSendAndRecovery(message bsmsg.BitSwapMessage) bool {
err := mq.sender.SendMsg(mq.ctx, message)
if err == nil {
return true
}

log.Infof("bitswap send error: %s", err)
_ = mq.sender.Reset()
mq.sender = nil

select {
case <-mq.done:
return true
case <-mq.ctx.Done():
return true
case <-time.After(mq.sendErrorBackoff):
// wait 100ms in case disconnect notifications are still propagating
log.Warn("SendMsg errored but neither 'done' nor context.Done() were set")
}

err = mq.initializeSender()
if err != nil {
log.Infof("couldnt open sender again after SendMsg(%s) failed: %s", mq.p, err)
return true
}

// TODO: Is this the same instance for the remote peer?
// If its not, we should resend our entire wantlist to them
/*
if mq.sender.InstanceID() != mq.lastSeenInstanceID {
wlm = mq.getFullWantlistMessage()
func (mq *MessageQueue) initializeSender() (bsnet.MessageSender, error) {
if mq.sender == nil {
opts := &bsnet.MessageSenderOpts{
MaxRetries: maxRetries,
SendTimeout: sendTimeout,
SendErrorBackoff: sendErrorBackoff,
}
nsender, err := mq.network.NewMessageSender(mq.ctx, mq.p, opts)
if err != nil {
return nil, err
}
*/
return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()

err := network.ConnectTo(conctx, p)
if err != nil {
return nil, err
}

nsender, err := network.NewMessageSender(ctx, p)
if err != nil {
return nil, err
mq.sender = nsender
}

return nsender, nil
return mq.sender, nil
}