Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acking a JetStream message for the 3rd time blocks code execution #1452

Closed
ioanluca opened this issue Oct 30, 2023 · 4 comments
Closed

Acking a JetStream message for the 3rd time blocks code execution #1452

ioanluca opened this issue Oct 30, 2023 · 4 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@ioanluca
Copy link

ioanluca commented Oct 30, 2023

Observed behavior

Acking a JetStream message for the 3rd time blocks code execution.

Expected behavior

Returns an error, just like when acking the message for the 2nd time.

Server and client version

  • client: v1.28.0
  • server: v2.9.11

Host environment

~ go version
go version go1.21.3 darwin/arm64
~ hostinfo
Mach kernel version:
	 Darwin Kernel Version 23.0.0: Fri Sep 15 14:41:43 PDT 2023; root:xnu-10002.1.13~1/RELEASE_ARM64_T6000
Kernel configured for up to 10 processors.
10 processors are physically available.
10 processors are logically available.a
Processor type: arm64e (ARM64E)
Processors active: 0 1 2 3 4 5 6 7 8 9
Primary memory available: 32.00 gigabytes

Steps to reproduce

package main

import (
	"context"
	"testing"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
	"github.com/stretchr/testify/assert"
	"github.com/stretchr/testify/require"

	"github.com/nats-io/nats-server/v2/server"
	"github.com/nats-io/nats-server/v2/test"
)

func Test(t *testing.T) {
	opts := test.DefaultTestOptions
	opts.Port = server.RANDOM_PORT
	opts.JetStream = true
	s := test.RunServer(&opts)
	defer s.Shutdown()

	nc, err := nats.Connect(s.ClientURL())
	require.NoError(t, err)
	defer nc.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second*20)
	defer cancel()

	js, err := jetstream.New(nc)
	require.NoError(t, err)

	stream, err := js.CreateStream(ctx, jetstream.StreamConfig{
		Name:     "test",
		Subjects: []string{"a"},
		Storage:  jetstream.FileStorage,
		Replicas: 1,
	})
	require.NoError(t, err)

	con, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
		Durable:       "test-con",
		AckPolicy:     jetstream.AckExplicitPolicy,
		DeliverPolicy: jetstream.DeliverAllPolicy,
		AckWait:       5 * time.Second,
	})
	require.NoError(t, err)

	_, err = js.Publish(ctx, "a", []byte("test1"))
	assert.NoError(t, err)

	_, err = js.Publish(ctx, "a", []byte("test2"))
	assert.NoError(t, err)

	batch, err := con.Fetch(1)
	require.NoError(t, err)

	msg := <-batch.Messages()

	//Ack 1
	err = msg.Ack()
	if err != nil {
		t.Error("ack err:", err)
	}

	//InProgress 1
	time.Sleep(time.Second * 2)
	err = msg.InProgress()
	if err != nil {
		t.Error("in progress err:", err)
	}

	//InProgress 2 (blocks)
	time.Sleep(time.Second * 2)
	err = msg.InProgress() // WARNING: code blocks
	if err != nil {
		t.Log(err)
	}

	<-ctx.Done()
}
@ioanluca ioanluca added the defect Suspected defect such as a bug or regression label Oct 30, 2023
@piotrpio
Copy link
Collaborator

Hello @ioanluca, thank you for reporting the issue.

The bug was fixed already in v1.29.0, it was caused by not handling mutex correctly: #1364

@ioanluca
Copy link
Author

thanks @piotrpio

any idea why the deadlock was happening on the 3rd attempt?

@piotrpio
Copy link
Collaborator

Yes - it had to do with this code block: https://github.com/nats-io/nats.go/blob/main/jetstream/message.go#L219

1st attempt: mutex is locked, m.ackd is false, mx is unlocked. At the end of function execution we set m.ackd to true.
2nd attempt: mutex is locked, m.ackd is true, we return without releasing the lock.
3rd attempt: blocks because the lock was not released on previous attempt.

So it basically was only locking after the first ErrMsgAlreadyAckd.

@ioanluca
Copy link
Author

ioanluca commented Oct 31, 2023

makes sense, thanks for the explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

2 participants