Skip to content

Commit

Permalink
Merged master and updated
Browse files Browse the repository at this point in the history
  • Loading branch information
willdot committed Jul 11, 2023
1 parent 7cb5b6b commit e67d7af
Show file tree
Hide file tree
Showing 24 changed files with 1,403 additions and 594 deletions.
34 changes: 25 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ These are just examples of when I've wanted a tool that could do this.

## Installation 🛠️

``` sh
```bash
go install github.com/willdot/sendit@latest
```

## Useage 🧭
Basic useage. See different broker sections for broker specific details.

``` sh
```bash
sendit -body="body.json" -headers="headers.json" -url="localhost:1234" -repeat=3
```
* body (required) - the path to a file containing the data you wish to send as the body of the message
Expand All @@ -33,7 +33,7 @@ sendit -body="body.json" -headers="headers.json" -url="localhost:1234" -repeat=3
### RabbitMQ
You can either send directly to a queue OR to an exchange. You will be asked to select which option when you run the tool.

``` sh
```bash
sendit -body="body.json" -destination="test"
```
* destination(required) - the name of the queue or exchange to send the message to.
Expand All @@ -47,7 +47,7 @@ Headers should be in JSON format in a key / value format. eg:
```

### NATs
``` sh
```bash
sendit -body="body.json" -subject="test"
```
* subject(required) - the subject you wish to use for the message
Expand All @@ -60,20 +60,36 @@ Headers should be in JSON format in a key / array string format. eg:
}
```

### Kafka
``` sh
sendit -body="body.json" -topic="test"
### Redis
```bash
sendit -body="body.json" -channel="test"
```
* topic(required) - the topic you wish to send the message too
* channel(required) - the channel you wish to publish the message to

Note: Redis does not support headers.

### Google Pub/Sub
```bash
sendit -body="body.json" -topic="test-topic" -project_id="your-project-id" -disable_auth=true -url="localhost:8085"
```
* topic(required) - the topic you wish to send the message to
* project_id (required) - the ID of the project to use
* disable_auth (optional) - use this if you are testing with a local emulator
* url (optional) - use this to override the default for Google Cloud Services.

Headers should be in JSON format in a key / string format. eg:
``` json
{
"header1" : "value1",
"header2" : "value2"
"header2" : "value3"
}
```

Note: If you are using a local emulator, you will need to export the following:
```bash
PUBSUB_EMULATOR_HOST=localhost:8085
```

## Contributing 🤝

Issues and PRs welcome.
Expand Down
9 changes: 5 additions & 4 deletions kafka/kafka.go → brokers/kafka.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package kafka
package brokers

import (
"encoding/json"

"github.com/Shopify/sarama"
"github.com/pkg/errors"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
)

// KafkaPublisher is a publisher that can send messages to a Kafka server
Expand Down Expand Up @@ -34,15 +35,15 @@ func (p *KafkaPublisher) Shutdown() {
}

// Publish will send the provided message
func (p *KafkaPublisher) Publish(destination string, msgBody, headersData []byte) error {
headers, err := convertHeaders(headersData)
func (p *KafkaPublisher) Send(destination string, msg service.Message) error {
headers, err := convertHeaders(msg.Headers)
if err != nil {
return err
}

_, _, err = p.conn.SendMessage(&sarama.ProducerMessage{
Topic: destination,
Value: sarama.StringEncoder(msgBody),
Value: sarama.StringEncoder(msg.Body),
Headers: headers,
})
if err != nil {
Expand Down
13 changes: 7 additions & 6 deletions nats/nats.go → brokers/nats.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package nats
package brokers

import (
"encoding/json"

"github.com/nats-io/nats.go"
"github.com/pkg/errors"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
)

// NatsPublisher is a publisher that can send messages to a NATs server
Expand All @@ -31,14 +32,14 @@ func (p *NatsPublisher) Shutdown() {
p.conn.Close()
}

// Publish will send the provided message
func (p *NatsPublisher) Publish(destination string, msgBody, headersData []byte) error {
headers, err := convertHeaders(headersData)
// Send will send the provided message
func (p *NatsPublisher) Send(destination string, msg service.Message) error {
headers, err := convertNatsHeaders(msg.Headers)
if err != nil {
return err
}
err = p.conn.PublishMsg(&nats.Msg{
Data: msgBody,
Data: msg.Body,
Subject: destination,
Header: headers,
})
Expand All @@ -49,7 +50,7 @@ func (p *NatsPublisher) Publish(destination string, msgBody, headersData []byte)
return nil
}

func convertHeaders(headerData []byte) (nats.Header, error) {
func convertNatsHeaders(headerData []byte) (nats.Header, error) {
if headerData == nil {
return nil, nil
}
Expand Down
78 changes: 78 additions & 0 deletions brokers/pubsub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package brokers

import (
"context"
"encoding/json"

"cloud.google.com/go/pubsub"
"github.com/pkg/errors"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
"google.golang.org/api/option"
)

// GooglePubSubPublisher is a publisher that can send messages to a Google Pub/Sub server
type GooglePubSubPublisher struct {
client *pubsub.Client
}

// NewGooglePubSubPublisher will create a connection to a Google Pub/Sub server. Shutdown on the returned publisher should be called
// to close the connection once finished
func NewGooglePubSubPublisher(cfg *config.Config) (*GooglePubSubPublisher, error) {
options := make([]option.ClientOption, 0)
if cfg.GooglePubSubCfg.DisableAuth {
options = append(options, option.WithoutAuthentication())
}

if cfg.URL != "" {
options = append(options, option.WithEndpoint(cfg.URL))
}

client, err := pubsub.NewClient(context.Background(), cfg.GooglePubSubCfg.ProjectID, options...)
if err != nil {
return nil, errors.Wrap(err, "failed to connect to Google Pub/Sub")
}

return &GooglePubSubPublisher{
client: client,
}, nil
}

// Shutdown will close the Google Pub/Sub connection
func (p *GooglePubSubPublisher) Shutdown() {
_ = p.client.Close()
}

// Send will send the provided message
func (p *GooglePubSubPublisher) Send(destination string, msg service.Message) error {
headers, err := convertGooglePubSubHeaders(msg.Headers)
if err != nil {
return err
}
t := p.client.Topic(destination)
res := t.Publish(context.Background(), &pubsub.Message{
Data: msg.Body,
Attributes: headers,
})

_, err = res.Get(context.Background())
if err != nil {
return errors.Wrap(err, "failed to publish message")
}

return nil
}

func convertGooglePubSubHeaders(headerData []byte) (map[string]string, error) {
if headerData == nil {
return nil, nil
}

var headers map[string]string
err := json.Unmarshal(headerData, &headers)
if err != nil {
return nil, errors.Wrap(err, "failed to convert header data")
}

return headers, nil
}
15 changes: 8 additions & 7 deletions rabbit/rabbit.go → brokers/rabbit.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rabbit
package brokers

import (
"context"
Expand All @@ -8,6 +8,7 @@ import (
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
)

// DestinationType is where the message will be sent, either directly to a queue or to an exchange
Expand Down Expand Up @@ -56,9 +57,9 @@ func (r *RabbitPublisher) Shutdown() {
r.conn.Close()
}

// Publish will send the provided message
func (r *RabbitPublisher) Publish(destinationName string, msgBody, headersData []byte) error {
headers, err := convertHeaders(headersData)
// Send will send the provided message
func (r *RabbitPublisher) Send(destinationName string, msg service.Message) error {
headers, err := convertRabbitHeaders(msg.Headers)
if err != nil {
return err
}
Expand All @@ -72,16 +73,16 @@ func (r *RabbitPublisher) Publish(destinationName string, msgBody, headersData [

switch r.destinationType {
case DestinationTypeExchange:
return r.publishToExchange(c, destinationName, msgBody, headers)
return r.publishToExchange(c, destinationName, msg.Body, headers)
case DestinationTypeQueue:
return r.publishToQueue(c, destinationName, msgBody, headers)
return r.publishToQueue(c, destinationName, msg.Body, headers)
default:
}

return nil
}

func convertHeaders(headerData []byte) (map[string]interface{}, error) {
func convertRabbitHeaders(headerData []byte) (map[string]interface{}, error) {
if headerData == nil {
return nil, nil
}
Expand Down
47 changes: 47 additions & 0 deletions brokers/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package brokers

import (
"context"

"github.com/go-redis/redis/v8"
"github.com/pkg/errors"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
)

// RedisPublisher is a publisher that can send messages to a Redis server
type RedisPublisher struct {
client *redis.Client
}

// NewRedisPublisher will create a connection to a Redis server. Shutdown on the returned publisher should be called
// to close the connection once finished
func NewRedisPublisher(cfg *config.Config) (*RedisPublisher, error) {
client := redis.NewClient(&redis.Options{
Addr: cfg.URL,
})

err := client.Ping(context.Background()).Err()
if err != nil {
return nil, errors.Wrap(err, "failed to connect to Redis")
}

return &RedisPublisher{
client: client,
}, nil
}

// Shutdown will close the Redis connection
func (p *RedisPublisher) Shutdown() {
_ = p.client.Shutdown(context.Background())
}

// Send will send the provided message
func (p *RedisPublisher) Send(destination string, msg service.Message) error {
err := p.client.Publish(context.Background(), destination, msg.Body).Err()
if err != nil {
return errors.Wrap(err, "failed to send message")
}

return nil
}
Loading

0 comments on commit e67d7af

Please sign in to comment.