Skip to content
This repository has been archived by the owner on Jul 23, 2023. It is now read-only.

Test Cases #2

Merged
merged 17 commits into from
Apr 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,23 @@
# pubsub
Embeddable Lightweight Pub/Sub in Go

## Motivation

After using several Pub/Sub systems in writing production softwares for sometime, I decided to write one very simple, embeddable, light-weight Pub/Sub system using only native Go functionalities i.e. Go routines, Go channels.

I found Go channels are **MPSC** i.e. multiple producers can push onto same channel, but there's only one consumer. You're very much free to use multiple consumers on single channel, but they will start competing for messages being published on channel.

Good thing is that Go channels are concurrent-safe. So I considered extending it to make in-application communication more flexible. Below is what provided by this embeddable piece of software.

✌️ | Producer | Consumer
--- | --: | --:
Single | ✅ | ✅
Multiple | ✅ | ✅

## Design

![architecture](./sc/architecture.jpg)

## Usage

Here's an [example](./example/main.go)
79 changes: 79 additions & 0 deletions example/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package main

import (
"context"
"log"
"time"

"github.com/itzmeanjan/pubsub"
)

func main() {
broker := pubsub.New()

ctx, cancel := context.WithCancel(context.Background())
go broker.Start(ctx)
defer cancel()
// Just waiting little while to give pub/sub broker enough time to get up & running
<-time.After(time.Duration(1) * time.Millisecond)

// At max 16 messages to be kept bufferred at a time
subscriber := broker.Subscribe(16, "topic_1", "topic_2")
if subscriber == nil {
log.Printf("Failed to subscribe to topics\n")
return
}

// Publish arbitrary byte data on N-many topics, without concerning whether all
// topics having at least 1 subscriber or not
//
// During publishing if some topic doesn't have certain subscriber, it won't receive
// message later when it joins
msg := pubsub.Message{Topics: []string{"topic_1", "topic_2", "topic_3"}, Data: []byte("hello")}
published, on := broker.Publish(&msg)
if !published {
log.Printf("Failed to publish message to topics\n")
return
}

log.Printf("✅ Published `hello` to %d topics\n", on)

for {
// Non-blocking calls, returns immediately if finds nothing in buffer
msg := subscriber.Next()
if msg == nil {
break
}

log.Printf("✅ Received `%s` on topic `%s`\n", msg.Data, msg.Topic)
}

// Attempt to receive from channel if something already available
//
// If not found, wait for `duration` & reattempt
//
// This time return without blocking
if msg := subscriber.BNext(time.Duration(1) * time.Millisecond); msg != nil {
log.Printf("✅ Received `%s` on topic `%s`\n", msg.Data, msg.Topic)
} else {
log.Printf("✅ Nothing else to receive\n")
}

if subscribed, _ := subscriber.AddSubscription(broker, "topic_3"); subscribed {
log.Printf("✅ Subscribed to `topic_3`\n")
}

if unsubscribed, _ := subscriber.Unsubscribe(broker, "topic_1"); unsubscribed {
log.Printf("✅ Unsubscribed from `topic_1`\n")
}

if unsubscribed, from := subscriber.UnsubscribeAll(broker); unsubscribed {
log.Printf("✅ Unsubscribed from %d topic(s)\n", from)
}

// Calling this multiple times doesn't cause harm, but not required
if subscriber.Close() {
log.Printf("✅ Destroyed subscriber\n")
}

}
58 changes: 51 additions & 7 deletions pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type PubSub struct {
// can be routed to various topics
func New() *PubSub {
return &PubSub{
Alive: true,
Alive: false,
Index: 1,
MessageChan: make(chan *PublishRequest, 1),
SubscriberIdChan: make(chan chan uint64, 1),
Expand All @@ -32,8 +32,14 @@ func New() *PubSub {

// Start - Handles request from publishers & subscribers, so that
// message publishing can be abstracted
//
// Consider running it as a go routine
func (p *PubSub) Start(ctx context.Context) {

// Because pub/sub system is now running
// & it's ready to process requests
p.Alive = true

for {

select {
Expand All @@ -56,14 +62,27 @@ func (p *PubSub) Start(ctx context.Context) {

for _, sub := range subs {

msg := PublishedMessage{
Data: req.Message.Data,
Topic: topic,
}

// Checking whether receiver channel has enough buffer space
// to hold this message or not
if len(sub) < cap(sub) {

// As byte slices are reference types i.e. if we
// just pass it over channel to subscribers & either
// of publishers/ subscribers make any modification
// to that slice, it'll be reflected for all parties involved
//
// So it's better to give everyone their exclusive copy
copied := make([]byte, len(req.Message.Data))
n := copy(copied, req.Message.Data)
if n != len(req.Message.Data) {
continue
}

msg := PublishedMessage{
Data: copied,
Topic: topic,
}

sub <- &msg
publishedOn++
}
Expand Down Expand Up @@ -118,6 +137,10 @@ func (p *PubSub) Start(ctx context.Context) {
unsubscribedFrom++
}

if len(subs) == 0 {
delete(p.Subscribers, req.Topics[i])
}

}

}
Expand Down Expand Up @@ -159,6 +182,10 @@ func (p *PubSub) Subscribe(cap uint64, topics ...string) *Subscriber {

if p.Alive {

if len(topics) == 0 {
return nil
}

idGenChan := make(chan uint64)
p.SubscriberIdChan <- idGenChan

Expand Down Expand Up @@ -192,6 +219,10 @@ func (p *PubSub) AddSubscription(subscriber *Subscriber, topics ...string) (bool

if p.Alive {

if len(topics) == 0 {
return true, 0
}

_subscriber := &Subscriber{
Id: subscriber.Id,
Channel: subscriber.Channel,
Expand All @@ -207,6 +238,7 @@ func (p *PubSub) AddSubscription(subscriber *Subscriber, topics ...string) (bool
}

_subscriber.Topics[topics[i]] = true
subscriber.Topics[topics[i]] = true

}

Expand All @@ -232,8 +264,12 @@ func (p *PubSub) Unsubscribe(subscriber *Subscriber, topics ...string) (bool, ui

if p.Alive {

if len(topics) == 0 {
return true, 0
}

_topics := make([]string, 0, len(topics))
for i := 0; i < len(_topics); i++ {
for i := 0; i < len(topics); i++ {

if state, ok := subscriber.Topics[topics[i]]; ok {
if state {
Expand All @@ -244,6 +280,10 @@ func (p *PubSub) Unsubscribe(subscriber *Subscriber, topics ...string) (bool, ui

}

if len(_topics) == 0 {
return true, 0
}

resChan := make(chan uint64)
p.UnsubscribeChan <- &UnsubscriptionRequest{
Id: subscriber.Id,
Expand Down Expand Up @@ -276,6 +316,10 @@ func (p *PubSub) UnsubscribeAll(subscriber *Subscriber) (bool, uint64) {

}

if len(topics) == 0 {
return true, 0
}

resChan := make(chan uint64)
p.UnsubscribeChan <- &UnsubscriptionRequest{
Id: subscriber.Id,
Expand Down
Loading