Skip to content

Commit

Permalink
feat(pubsub): add type paramter for event field
Browse files Browse the repository at this point in the history
  • Loading branch information
bradub committed May 31, 2024
1 parent 54c5054 commit 1e34378
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 119 deletions.
28 changes: 14 additions & 14 deletions pubsub/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ go 1.21
toolchain go1.21.2

require (
github.com/IBM/sarama v1.41.3
github.com/IBM/sarama v1.43.2
github.com/Shopify/sarama v1.38.1
github.com/ThreeDotsLabs/watermill v1.3.5
github.com/ThreeDotsLabs/watermill-kafka/v2 v2.5.0
github.com/google/uuid v1.3.1
github.com/matryer/is v1.4.0
github.com/google/uuid v1.6.0
github.com/matryer/is v1.4.1
github.com/xdg-go/scram v1.1.2
go.uber.org/zap v1.26.0
go.uber.org/zap v1.27.0
go.uber.org/zap/exp v0.2.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/eapache/go-resiliency v1.4.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
Expand All @@ -32,20 +32,20 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/lithammer/shortuuid/v3 v3.0.7 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
github.com/xdg-go/stringprep v1.0.4 // indirect
go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.43.0 // indirect
go.opentelemetry.io/otel v1.19.0 // indirect
go.opentelemetry.io/otel/metric v1.19.0 // indirect
go.opentelemetry.io/otel/trace v1.19.0 // indirect
go.opentelemetry.io/otel v1.27.0 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
go.opentelemetry.io/otel/trace v1.27.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/text v0.13.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/text v0.15.0 // indirect
)
72 changes: 36 additions & 36 deletions pubsub/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 13 additions & 13 deletions pubsub/inmem/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,30 @@ import (
)

// Ensure type inmem.PubSub implements interface pubsub.PublishSubscriber.
var _ pubsub.PublishSubscriber[any] = (*PubSub[any])(nil)
var _ pubsub.PublishSubscriber[string, any] = (*PubSub[string, any])(nil)

// PubSub represents a PubSub backed my an in memory storage.
type PubSub[T any] struct {
type PubSub[T, P any] struct {
mu sync.Mutex

// map having channels as keys and subscriptions as value
channelsSubs map[string]map[*Subscription[T]]struct{}
channelsSubs map[string]map[*Subscription[T, P]]struct{}

// eventBufferSize is the buffer size of the channel for each subscription.
eventBufferSize int
}

// NewPubSub returns a new instance of PubSub backed
// by an in memory storage.
func NewPubSub[T any](eventBufferSize int) *PubSub[T] {
return &PubSub[T]{
channelsSubs: make(map[string]map[*Subscription[T]]struct{}),
func NewPubSub[T, P any](eventBufferSize int) *PubSub[T, P] {
return &PubSub[T, P]{
channelsSubs: make(map[string]map[*Subscription[T, P]]struct{}),
eventBufferSize: eventBufferSize,
}
}

// Publish publishes event to all the subscriptions of the channels provided.
func (ps *PubSub[T]) Publish(event pubsub.Event[T], channels ...string) error {
func (ps *PubSub[T, P]) Publish(event pubsub.Event[T, P], channels ...string) error {
// Ensure at least one channel is provided.
if len(channels) == 0 {
return ErrNoChannel
Expand Down Expand Up @@ -75,16 +75,16 @@ func (ps *PubSub[T]) Publish(event pubsub.Event[T], channels ...string) error {
var ErrNoChannel = errors.New("no channel given")

// Subscribe creates a new subscription for the provided channels.
func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], error) {
func (ps *PubSub[T, P]) Subscribe(channels ...string) (pubsub.Subscription[T, P], error) {
// Ensure at least one channel is provided.
if len(channels) == 0 {
return nil, ErrNoChannel
}

// Create a new subscription.
sub := &Subscription[T]{
sub := &Subscription[T, P]{
channels: channels,
c: make(chan pubsub.Event[T], ps.eventBufferSize),
c: make(chan pubsub.Event[T, P], ps.eventBufferSize),
pubsub: ps,
}

Expand All @@ -97,7 +97,7 @@ func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], erro
subs, ok := ps.channelsSubs[c]
if !ok {
// Create the subs map if it does not exist.
subs = make(map[*Subscription[T]]struct{})
subs = make(map[*Subscription[T, P]]struct{})
ps.channelsSubs[c] = subs
}

Expand All @@ -115,7 +115,7 @@ func (ps *PubSub[T]) Subscribe(channels ...string) (pubsub.Subscription[T], erro
// This method wraps the removeSubscription method
// with the mutexes. So it's safe to be from external
// entities.
func (ps *PubSub[T]) Unsubscribe(sub *Subscription[T]) {
func (ps *PubSub[T, P]) Unsubscribe(sub *Subscription[T, P]) {
ps.mu.Lock()
defer ps.mu.Unlock()

Expand All @@ -124,7 +124,7 @@ func (ps *PubSub[T]) Unsubscribe(sub *Subscription[T]) {

// removeSubscription closes the subscriptions go channel and
// removes it from the pubsubs storage.
func (ps *PubSub[T]) removeSubscription(sub *Subscription[T]) {
func (ps *PubSub[T, P]) removeSubscription(sub *Subscription[T, P]) {
// Only close the underlying channel once.
sub.once.Do(func() {
close(sub.c)
Expand Down
8 changes: 4 additions & 4 deletions pubsub/inmem/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestPubSub_SubscribeSuccess(t *testing.T) {
channelC = "c"
)

ps := NewPubSub[string](eventBufferSize)
ps := NewPubSub[string, string](eventBufferSize)

subA, err := ps.Subscribe(channelA)
i.NoErr(err)
Expand All @@ -30,7 +30,7 @@ func TestPubSub_SubscribeSuccess(t *testing.T) {
i.NoErr(err)

// Publish event for first 2 subscriptions.
_ = ps.Publish(pubsub.Event[string]{Type: "test", Payload: "test"}, channelA)
_ = ps.Publish(pubsub.Event[string, string]{Type: "test", Payload: "test"}, channelA)

select {
case <-subA.C():
Expand Down Expand Up @@ -60,12 +60,12 @@ func TestPubSub_UnsubscribeSuccess(t *testing.T) {
channelA = "a"
)

ps := NewPubSub[string](eventBufferSize)
ps := NewPubSub[string, string](eventBufferSize)

subscription, err := ps.Subscribe(channelA)
i.NoErr(err)

err = ps.Publish(pubsub.Event[string]{Type: "test", Payload: "test"}, channelA)
err = ps.Publish(pubsub.Event[string, string]{Type: "test", Payload: "test"}, channelA)
i.NoErr(err)

err = subscription.Close()
Expand Down
Loading

0 comments on commit 1e34378

Please sign in to comment.