Skip to content
This repository has been archived by the owner on Nov 3, 2023. It is now read-only.

xep0198: request ack on every 25 enqueued stanzas #228

Merged
merged 1 commit into from
May 6, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions pkg/module/xep0198/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ const (
itemNotFound = "item-not-found"

nonceLength = 24

// unacknowledgedStanzaCount defines the stanza count interval at which an "r" stanza will be sent
unacknowledgedStanzaCount = 25
)

var errInvalidSMID = errors.New("xep0198: invalid stream identifier format")
Expand All @@ -69,14 +72,14 @@ type Config struct {

// RequestAckInterval defines the period of stream inactivity
// that should be waited before requesting acknowledgement.
RequestAckInterval time.Duration `fig:"request_ack_interval" default:"20s"`
RequestAckInterval time.Duration `fig:"request_ack_interval" default:"1m"`

// WaitForAckTimeout defines stanza acknowledgement timeout.
WaitForAckTimeout time.Duration `fig:"wait_for_ack_timeout" default:"30s"`

// MaxQueueSize defines maximum number of unacknowledged stanzas.
// When the limit is reached the c2s stream is terminated.
MaxQueueSize int `fig:"max_queue_size" default:"2500"`
MaxQueueSize int `fig:"max_queue_size" default:"250"`
}

// Stream represents a stream (XEP-0198) module type.
Expand Down Expand Up @@ -197,12 +200,17 @@ func (m *Stream) onElementSent(_ context.Context, execCtx *hook.ExecutionContext
}
sq.handleOut(stanza)

if sq.len() >= m.cfg.MaxQueueSize { // max queue size reached
qLen := sq.len()
switch {
case qLen >= m.cfg.MaxQueueSize:
_ = sq.stream().Disconnect(streamerror.E(streamerror.PolicyViolation))

level.Info(m.logger).Log("msg", "max queue size reached",
"id", stm.ID(), "username", stm.Username(), "resource", stm.Resource(),
)

case qLen%unacknowledgedStanzaCount == 0:
sq.requestAck()
}
return nil
}
Expand Down