Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nil pointer #455

Open
Davincible opened this issue Jun 3, 2023 · 5 comments
Open

Nil pointer #455

Davincible opened this issue Jun 3, 2023 · 5 comments

Comments

@Davincible
Copy link

I'm getting a nil pointer on streams.go:176. Not sure why. Maybe invalid stream/subject

@ripienaar
Copy link
Collaborator

What version?

please show the stack trace.

@Davincible
Copy link
Author

Davincible commented Jun 4, 2023

I'm sorry, didn't save the full stack trace & can't find it anymore. Moved on to using the new nats-io/jetstream package.

Version was latest, I checked and it was this line;

stream.cfg = &info.Config
. Info must have been nil since stream is accessed earlier

Code I used
func (s *StreamCacheStorage[T]) listStreamMessages(subj string) ([]T, error) {
	if len(subj) == 0 {
		return []T{}, nil
	}

	stream, err := s.jsmM.LoadStream(s.streamName)
	if err != nil {
		return nil, fmt.Errorf("load stream: %w", err)
	}

	pager, err := stream.PageContents(
		jsm.PagerFilterSubject(subj),
		jsm.PagerSize(batchLimit),
	)
	if err != nil {
		return nil, fmt.Errorf("page content: %w", err)
	}

	var msgs []*nats.Msg

	hasMore := true
	for hasMore {
		msg, last, err := pager.NextMsg(context.Background())
		if !last && err != nil {
			return nil, fmt.Errorf("fetch next stream page: %w", err)
		}

		msgs = append(msgs, msg)

		hasMore = !last
	}

	return unmarshalMsgs[T](msgs)
}

Since the consumers were not being deleted, I ended up with 20k something consumers, that might have had something to do with it. Only showed up at scale in prod system. Local testing went fine

@ripienaar
Copy link
Collaborator

Why were consumers not deleted? Consumers are set to auto remove after a hour of idle time, are you running a particularly old server?

InactiveThreshold(time.Hour),

@ripienaar
Copy link
Collaborator

ripienaar commented Jun 17, 2023

Apart from that, I would not use stream pager for what you are doing. Make a consumer and just fetch the messages. This repo is mainly orientated to needs of the CLI/Terraform/GHA features, you should use nats.go instead.

@Davincible
Copy link
Author

It was a new server. Initially used this lib as I couldn't get what I wanted with main repo, and looked at the CLI for an example. I do have it working now with the new jetstream package though, not using this code anymore.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants