Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: unsubscribe from subscription #3557

Merged
merged 2 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/executor/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ type testSuiteRunGetter interface {
}

type subscriptor interface {
Subscribe(string, subscription.Subscriber)
Subscribe(string, subscription.Subscriber) subscription.Subscription
}

type queueConfigurer[T any] struct {
Expand Down
4 changes: 2 additions & 2 deletions server/executor/test_suite_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,11 +192,11 @@ func runTestSuiteRunnerTest(t *testing.T, withErrors bool, assert func(t *testin

return nil
})
subscriptionManager.Subscribe(transactionRun.ResourceID(), sf)
subscription := subscriptionManager.Subscribe(transactionRun.ResourceID(), sf)

select {
case finalRun := <-done:
subscriptionManager.Unsubscribe(transactionRun.ResourceID(), sf.ID()) //cleanup to avoid race conditions
subscription.Unsubscribe()
assert(t, finalRun)
case <-time.After(10 * time.Second):
t.Log("timeout after 10 second")
Expand Down
6 changes: 5 additions & 1 deletion server/http/websocket/unsubscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,11 @@ func (e unsubscribeCommandExecutor) Execute(conn *websocket.Conn, message []byte
return
}

e.subscriptionManager.Unsubscribe(msg.Resource, msg.SubscriptionId)
subscription := e.subscriptionManager.GetSubscription(msg.Resource, msg.SubscriptionId)
err = subscription.Unsubscribe()
if err != nil {
conn.WriteJSON(ErrorMessage(fmt.Errorf("could not unsubscribe: %w", err)))
}

conn.WriteJSON(UnsubscribeSuccess())
}
28 changes: 23 additions & 5 deletions server/subscription/in_memory_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,46 @@ import (
)

type inMemoryManager struct {
subscriptions map[string][]Subscriber
subscribers map[string][]Subscriber
subscriptions *subscriptionStorage
mutex sync.Mutex
}

type inMemorySubscription struct {
unsubscribeFn func()
}

func (s *inMemorySubscription) Unsubscribe() error {
s.unsubscribeFn()
return nil
}

func (m *inMemoryManager) getSubscribers(resourceID string) []Subscriber {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.subscriptions[resourceID]
return m.subscribers[resourceID]
}

func (m *inMemoryManager) setSubscribers(resourceID string, subscribers []Subscriber) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.subscriptions[resourceID] = subscribers
m.subscribers[resourceID] = subscribers
}

func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) {
func (m *inMemoryManager) Subscribe(resourceID string, subscriber Subscriber) Subscription {
subscribers := append(m.getSubscribers(resourceID), subscriber)
m.setSubscribers(resourceID, subscribers)

return &inMemorySubscription{
unsubscribeFn: func() { m.unsubscribe(resourceID, subscriber.ID()) },
}
}

func (m *inMemoryManager) GetSubscription(resourceID string, subscriptionID string) Subscription {
return m.subscriptions.Get(resourceID, subscriptionID)
}

func (m *inMemoryManager) Unsubscribe(resourceID string, subscriptionID string) {
func (m *inMemoryManager) unsubscribe(resourceID string, subscriptionID string) {
subscribers := m.getSubscribers(resourceID)

updated := make([]Subscriber, 0, len(subscribers)-1)
Expand Down
4 changes: 2 additions & 2 deletions server/subscription/in_memory_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func TestManagerUnsubscribe(t *testing.T) {
Content: "Test was deleted",
}

manager.Subscribe("test:1", subscriber)
subscription := manager.Subscribe("test:1", subscriber)
manager.PublishUpdate(message1)

assert.Equal(t, message1.Type, receivedMessage.Type)
assert.Equal(t, message1.Content, receivedMessage.Content)

manager.Unsubscribe("test:1", subscriber.ID())
subscription.Unsubscribe()
manager.PublishUpdate(message2)

assert.Equal(t, message1.Type, receivedMessage.Type, "subscriber should not be notified after unsubscribed")
Expand Down
41 changes: 37 additions & 4 deletions server/subscription/manager.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package subscription

import (
"fmt"
"sync"

"github.com/nats-io/nats.go"
)

type Manager interface {
Subscribe(resourceID string, subscriber Subscriber)
Unsubscribe(resourceID string, subscriptionID string)
Subscribe(resourceID string, subscriber Subscriber) Subscription
GetSubscription(resourceID string, subscriptionID string) Subscription

PublishUpdate(message Message)
Publish(resourceID string, message any)
}

type Subscription interface {
Unsubscribe() error
}

type optFn func(*options)

type options struct {
Expand All @@ -34,11 +39,39 @@ func NewManager(opts ...optFn) Manager {
}

if currentOptions.conn != nil {
return &natsManager{currentOptions.conn}
return &natsManager{
currentOptions.conn,
newSubscriptionStorage(),
}
}

return &inMemoryManager{
subscriptions: make(map[string][]Subscriber),
subscribers: make(map[string][]Subscriber),
subscriptions: newSubscriptionStorage(),
mutex: sync.Mutex{},
}
}

type subscriptionStorage struct {
subscriptions map[string]Subscription
}

func newSubscriptionStorage() *subscriptionStorage {
return &subscriptionStorage{
subscriptions: make(map[string]Subscription),
}
}

func (s *subscriptionStorage) Get(resourceID, subscriberID string) Subscription {
key := s.key(resourceID, subscriberID)
return s.subscriptions[key]
}

func (s *subscriptionStorage) Set(resourceID, subscriberID string, subscription Subscription) {
key := s.key(resourceID, subscriberID)
s.subscriptions[key] = subscription
}

func (s *subscriptionStorage) key(resourceID, subscriberID string) string {
return fmt.Sprintf("%s-%s", resourceID, subscriberID)
}
15 changes: 9 additions & 6 deletions server/subscription/nats_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ import (
)

type natsManager struct {
conn *nats.Conn
conn *nats.Conn
subscriptions *subscriptionStorage
}

func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) {
_, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) {
func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) Subscription {
subscription, err := m.conn.Subscribe(resourceID, func(msg *nats.Msg) {
decoded, err := DecodeMessage(msg.Data)
if err != nil {
log.Printf("cannot unmarshall incoming nats message: %s", err.Error())
Expand All @@ -25,12 +26,14 @@ func (m *natsManager) Subscribe(resourceID string, subscriber Subscriber) {
})
if err != nil {
log.Printf("cannot subscribe to nats topic: %s", err.Error())
return
return nil
}

return subscription
}

func (m *natsManager) Unsubscribe(resourceID string, subscriptionID string) {
panic("nats unsubscribe not implemented")
func (m *natsManager) GetSubscription(resourceID string, subscriptionID string) Subscription {
return m.subscriptions.Get(resourceID, subscriptionID)
}

func (m *natsManager) PublishUpdate(message Message) {
Expand Down
7 changes: 0 additions & 7 deletions server/subscription/subscription.go

This file was deleted.

5 changes: 2 additions & 3 deletions server/testconnection/otlp.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ func (t *OTLPConnectionTester) GetSpanCount(ctx context.Context, opts ...GetSpan
return nil
})

t.subscriptionManager.Subscribe(topicName, subscriber)
// TODO: implement subscription
// defer t.subscriptionManager.Unsubscribe(topicName, subscriber.ID())
subscription := t.subscriptionManager.Subscribe(topicName, subscriber)
defer subscription.Unsubscribe()

t.subscriptionManager.Publish(GetSpanCountTopicName(WithTenantID(tenantID)), OTLPConnectionTestRequest{})

Expand Down