Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental changes for inflight issue #58

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,12 @@ func main() {
durable: durable,
messageHandler: messageHandler,
startOption: stan.StartWithLastReceived(),
maxInFlight: stan.MaxInflight(config.MaxInflight),
maxInFlight: config.MaxInflight,
ackWait: config.AckWait,
}

log.Printf("[Parameters] AckWait: %s, MaxInflight: %d\n", config.AckWait, config.MaxInflight)

if initErr := natsQueue.connect(); initErr != nil {
log.Panic(initErr)
}
Expand Down
8 changes: 7 additions & 1 deletion readconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,13 @@ type QueueWorkerConfig struct {
FunctionSuffix string
DebugPrintBody bool
WriteDebug bool
MaxInflight int

// MaxInflight is the number of inflight messages for a given
// AckWait time-window
MaxInflight int

// AckWait is the grace period in which messages must be acknowledged or
// face being re-delivered.
AckWait time.Duration
MaxReconnect int
ReconnectDelay time.Duration
Expand Down
17 changes: 11 additions & 6 deletions types.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@ type NATSQueue struct {
subject string
qgroup string
durable string
ackWait time.Duration
messageHandler func(*stan.Msg)
startOption stan.SubscriptionOption
maxInFlight stan.SubscriptionOption
subscription stan.Subscription

startOption stan.SubscriptionOption
ackWait time.Duration
maxInFlight int

subscription stan.Subscription
}

// connect creates a subscription to NATS Streaming
Expand All @@ -51,7 +53,11 @@ func (q *NATSQueue) connect() error {

q.reconnect()
}),

stan.PubAckWait(q.ackWait),
stan.MaxPubAcksInflight(q.maxInFlight),
)

if err != nil {
return fmt.Errorf("can't connect to %s: %v", q.natsURL, err)
}
Expand All @@ -62,7 +68,6 @@ func (q *NATSQueue) connect() error {
q.conn = nc

log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
log.Println("Wait for ", q.ackWait)

subscription, err := q.conn.QueueSubscribe(
q.subject,
Expand All @@ -71,7 +76,7 @@ func (q *NATSQueue) connect() error {
stan.DurableName(q.durable),
stan.AckWait(q.ackWait),
q.startOption,
q.maxInFlight,
stan.MaxInflight(q.maxInFlight),
)

if err != nil {
Expand Down