Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pub/Sub for Dynamic Consumers #1436

Closed
DavidKmn opened this issue Aug 10, 2020 · 1 comment
Closed

Pub/Sub for Dynamic Consumers #1436

DavidKmn opened this issue Aug 10, 2020 · 1 comment
Labels

Comments

@DavidKmn
Copy link

DavidKmn commented Aug 10, 2020

I am trying to understand how we could use notify dynamically created subscribers about messages being published. Reviewing previous issues (#785) I followed the advice and integrated a singe pubSub for the whole application. (by the way the ring client still requires a dummy channel, issue #459 only removed the dummy channel requirement for the normal redis client). What we have now is:


func (svc *StandardService) Start() error {
	if !svc.isRunning() {
		svc.cli = redis.NewClient(&redis.Options{
			Addr:        svc.Configuration.Address,
			PoolSize:    svc.Configuration.PoolSize,
			IdleTimeout: 240 * time.Second,
		})
		err := svc.Ping()
		if err != nil {
			return errors.Wrap(err, "Failed to start Redis ClusterService")
		}
		if svc.ps == nil {
			svc.ps = svc.cli.Subscribe(context.Background())
		}
		go svc.startListening()
		svc.setRunning(true)
	}

	return nil
}

func (svc *StandardService) startListening() {
	ch := svc.ps.Channel()
	defer svc.ps.Close()
	for msg := range ch {
		consumers := getConsumers(msg.Channel)
		for i, _ := range consumers {
			err := consumers[i](msg.Channel, []byte(msg.Payload))
			if err != nil {
				print(err)
			}
		}
	}
}

func subscribe(pubSub *redis.PubSub, ctx context.Context, subscription SubscriptionHandler, channels ...string) error {
	err := pubSub.Subscribe(ctx, channels...)

	if err != nil {
		return err
	}

	for _, c := range channels {
		subscribers[c] = append(subscribers[c], subscription)
	}

	return nil
}

// SubscriptionHandler is called for each new message.
type SubscriptionHandler func(channel string, data []byte) error

var subscribers = make(map[string][]SubscriptionHandler)

func getConsumers(c string) []SubscriptionHandler {
	return subscribers[c]
}

However this does not seem like a great way, and has issues. First off we have a map that is prone to concurrent read/writes, this can be fixed by obviously we have performance implications. Second of all now we also have memory and GC implications due to a map that has ref types as both keys and values, with a large number of subscribers we would be putting a lot of pressure on the GC. Any suggestions on how we could solve this without the bottlenecks outlined?

@github-actions
Copy link

This issue is marked stale. It will be closed in 30 days if it is not updated.

@github-actions github-actions bot added the Stale label Sep 22, 2023
@github-actions github-actions bot closed this as not planned Won't fix, can't repro, duplicate, stale Dec 15, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant