Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JetStream Work Queue - PullSubscribe & Fetch #833

Closed
T-J-L opened this issue Sep 27, 2021 · 5 comments
Closed

JetStream Work Queue - PullSubscribe & Fetch #833

T-J-L opened this issue Sep 27, 2021 · 5 comments
Assignees

Comments

@T-J-L
Copy link

T-J-L commented Sep 27, 2021

Hello,

I’m working on a POC with nats JetStream, to implement a durable work queue. The jobs will be low throughput, long running jobs, which may run for as long as 20 minutes. There will be multiple services publishing and receiving from a single subject.

I’m trying to accept the message, mark it as in progress using an upper bound for the processing time, perform the work, then ack once complete.

The issue I’m having is with js.PullSubscribe and sub.Fetch. If I use nats.Context(ctx) with Fetch, I seem to receive zero or one messages, the subscription then stops receiving any further messages.

I'm new to nats so I expect this is user error, any help would be appreciated. See an example below which reflects the scenario. I am expecting this to continuously publish and receive messages.

Versions:

  • go version go1.17.1 darwin/amd64
  • github.com/nats-io/nats.go v1.12.3
  • Docker image nats:2.6.1 (command -js -m 8222 -n nats_js_1)
package main

import (
	"context"
	"errors"
	"fmt"
	"log"
	"time"

	"github.com/nats-io/nats.go"
)

func main() {

	// Connect to NATS
	nc, err := nats.Connect(nats.DefaultURL)
	if err != nil {
		log.Fatalln(err)
	}
	defer nc.Close()

	// Create JetStream Context
	js, err := nc.JetStream(nats.PublishAsyncMaxPending(256))
	if err != nil {
		log.Fatalln(err)
	}

	// Create WorkQueue stream
	_, err = js.AddStream(&nats.StreamConfig{
		Name:       "WORK",
		Subjects:   []string{"WORK.*"},
		Storage:    nats.FileStorage,
		Retention:  nats.WorkQueuePolicy,
		Discard:    nats.DiscardNew,
		Duplicates: 5 * time.Minute,
		MaxMsgs:    200,
		MaxBytes:   -1,
	})
	if err != nil {
		log.Fatalln(err)
	}

	// Create a pull subscription.
	sub, err := js.PullSubscribe("WORK.request", "WORK")
	if err != nil {
		log.Fatalln(err)
	}

	// Monitor sub
	go func() {
		ticker := time.NewTicker(10 * time.Second)
		for range ticker.C {
			con, err := sub.ConsumerInfo()
			if err != nil {
				log.Fatalln(err)
			}
			log.Printf("NumPending=%v NumWaiting=%v\n", con.NumPending, con.NumWaiting)
		}
	}()

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	// Simulate consumer
	go func() {
		for {
			msgs, err := sub.Fetch(1, nats.Context(ctx))
			if err != nil {
				if errors.Is(err, context.Canceled) {
					log.Println("Cancelled")
					return
				}
				log.Println("Fetch failed", err)
				continue
			}
			for _, msg := range msgs {

				log.Println("Received message", string(msg.Data))

				// Accept message and allow 10 seconds for work
				if err := msg.InProgress(nats.AckWait(10*time.Second), nats.Context(ctx)); err != nil {
					log.Println("InProgress", err)
					continue
				}

				time.Sleep(3 * time.Second) // Simulate work

				// Acknowledge the message as completed.
				if err := msg.AckSync(); err != nil {
					log.Println("AckSync", err)
					continue
				}
			}
		}
	}()

	// Simulate publisher
	sent := 0
	ticker := time.NewTicker(20 * time.Second)
	for range ticker.C {
		msg := fmt.Sprintf("Message %v", sent)
		log.Printf("Publish: %v\n", msg)
		ack, err := js.Publish("WORK.request", []byte(msg), nats.MsgId(fmt.Sprintf("%v", sent)))
		if err != nil {
			log.Fatalln(err)
		}
		sent++

		if ack.Duplicate {
			log.Println("Not published due to last message still in flight")
		}
	}
}
2021/09/27 12:05:21 NumPending=0 NumWaiting=1
2021/09/27 12:05:31 Publish: Message 0
2021/09/27 12:05:31 NumPending=0 NumWaiting=1
2021/09/27 12:05:41 NumPending=1 NumWaiting=0
2021/09/27 12:05:51 Publish: Message 1
2021/09/27 12:05:51 Not published due to last message still in flight
2021/09/27 12:05:51 NumPending=1 NumWaiting=0
2021/09/27 12:06:01 NumPending=1 NumWaiting=0
@T-J-L T-J-L changed the title JetStream PullSubscribe JetStream Work Queue - PullSubscribe & Fetch Sep 27, 2021
@T-J-L
Copy link
Author

T-J-L commented Sep 27, 2021

Note I have also tried explicitly creating the consumer and using Bind, but the results were the same, so simplified the above example.

	// Create a Consumer
	_, err = js.AddConsumer("WORK", &nats.ConsumerConfig{
		Durable:         "WORK",
		FilterSubject:   "WORK.request",
		DeliverPolicy:   nats.DeliverAllPolicy,
		AckPolicy:       nats.AckExplicitPolicy,
		AckWait:         30 * time.Second,
		MaxDeliver:      -1,
		ReplayPolicy:    nats.ReplayInstantPolicy,
		SampleFrequency: "100",
	})
	if err != nil {
		log.Fatalln(err)
	}

Edit - The above is now working. The example was invalid due to the static MsgId. I'll try and get a simple example replicated.

@T-J-L
Copy link
Author

T-J-L commented Sep 27, 2021

I've been testing this further, and I wonder if the usage of a context without a timeout in:

msgs, err := sub.Fetch(1, nats.Context(ctx))

is an issue, it seems everything works correctly if I use a child context per fetch with a short timeout, e.g., 10 seconds. I had assumed this would block indefinitely until a message was received.

@wallyqs
Copy link
Member

wallyqs commented Sep 27, 2021

Hi @T-J-L yes would recommend for now to use an explicit timeout, by default the timeout that context uses is the same one as the JetStream context which by default is 5*time.Second. Also in the following call:

msg.InProgress(nats.AckWait(10*time.Second), nats.Context(ctx))

The context will be given preference and since it does not have a timeout it could potentially block, we will update the client so that you cannot pass both options.

@T-J-L
Copy link
Author

T-J-L commented Sep 28, 2021

Thank you @wallyqs, removing the contexts has solved the problem. I've included my current fetch loop; this picks up new messages as soon as they are available but takes up to 30 seconds to react to the shutdown; this is acceptable for my application.

For information, I tried the method of creating a context per fetch call, and there seems to be pauses between the messages being received. I would have to replicate this again, but it looked like rather than waiting for 30 seconds for a message, it would check for a message, then pause until the timeout was reached. As the messages would be picked up on the next loop.

I’m happy with the current approach, so feel free to close this. Thanks again for your assistance.

	for {
		msgs, err := sub.Fetch(1, nats.MaxWait(30*time.Second))
		select {
		case <-ctx.Done():
			return 
		default:
		}
		if err != nil {
			if errors.Is(err, nats.ErrTimeout) {
				continue
			}
			s.Logger.Info("fetch failed", zap.Error(err))
			continue
		}

		for _, msg := range msgs {
			// ...
		}
	}

@wallyqs wallyqs transferred this issue from nats-io/nats-server Sep 28, 2021
@wallyqs
Copy link
Member

wallyqs commented Oct 6, 2021

Thanks @T-J-L for the report. This has been fixed via #838

@wallyqs wallyqs closed this as completed Oct 6, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants