Skip to content

Commit

Permalink
Merge pull request #46 from seatgeek/sqs-sink
Browse files Browse the repository at this point in the history
Added a new SQS sink
  • Loading branch information
lorenzo committed Jun 25, 2019
2 parents 8a61bdc + cff38f3 commit bfb58d8
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 11 deletions.
4 changes: 3 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The sink type is configured using `$SINK_TYPE` environment variable. Valid value
- `redis`
- `kafka`
- `mongo`
- `sqs`
- `stdout`
- `syslog`

Expand All @@ -100,6 +101,8 @@ The `stdout` sink does not have any configuration, it will simply output the JSO

The `syslog` sink is configured using `$SINK_SYSLOG_PROTO` (e.g. `tcp`, `udp` - leave empty if logging to a local syslog socket), `$SINK_SYSLOG_ADDR` (e.g. `127.0.0.1:514` - leave empty if logging to a local syslog socket), and `$SINK_SYSLOG_TAG` (default: `nomad-firehose`).

The `sqs` sink is configured using `$SINK_SQS_QUEUE_URL` which should point to the queue URL provided by AWS. This queue is expected to be a FIFO queue.

### `allocations`

`nomad-firehose allocations` will monitor all allocation changes in the Nomad cluster and emit each task state as a new firehose event to the configured sink.
Expand Down
2 changes: 1 addition & 1 deletion command/allocations/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewFirehose() (*Firehose, error) {
return nil, err
}

sink, err := sink.GetSink()
sink, err := sink.GetSink("allocations")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/deployments/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewFirehose() (*Firehose, error) {
return nil, err
}

sink, err := sink.GetSink()
sink, err := sink.GetSink("deployments")
if err != nil {
log.Fatal(err)
os.Exit(1)
Expand Down
2 changes: 1 addition & 1 deletion command/evaluations/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func NewFirehose() (*Firehose, error) {
return nil, err
}

sink, err := sink.GetSink()
sink, err := sink.GetSink("evaluations")
if err != nil {
log.Fatal(err)
os.Exit(1)
Expand Down
4 changes: 1 addition & 3 deletions command/jobs/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,8 @@ import (
log "github.com/sirupsen/logrus"
)


type WatchJobListFunc func(job *nomad.JobListStub)


// Firehose ...
type FirehoseBase struct {
lastChangeIndex uint64
Expand All @@ -29,7 +27,7 @@ func NewFirehoseBase() (*FirehoseBase, error) {
return nil, err
}

sink, err := sink.GetSink()
sink, err := sink.GetSink("jobs")
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion command/nodes/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func NewFirehose() (*Firehose, error) {
return nil, err
}

sink, err := sink.GetSink()
sink, err := sink.GetSink("nodes")
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions sink/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
)

// GetSink ...
func GetSink() (Sink, error) {
func GetSink(resourceName string) (Sink, error) {
sinkType := os.Getenv("SINK_TYPE")
if sinkType == "" {
return nil, fmt.Errorf("Missing SINK_TYPE: amqp, http, kafka, kinesis, mongodb, nsq, rabbitmq, redis, stdout, syslog")
return nil, fmt.Errorf("Missing SINK_TYPE: amqp, http, kafka, kinesis, mongodb, nsq, rabbitmq, redis, sqs, stdout, syslog")
}

switch sinkType {
Expand All @@ -33,7 +33,9 @@ func GetSink() (Sink, error) {
return NewStdout()
case "syslog":
return NewSyslog()
case "sqs":
return NewSQS(resourceName)
default:
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, http, kafka, kinesis, mongodb, nsq, rabbitmq, redis, stdout, syslog", sinkType)
return nil, fmt.Errorf("Invalid SINK_TYPE: %s, Valid values: amqp, http, kafka, kinesis, mongodb, nsq, rabbitmq, redis, sqs, stdout, syslog", sinkType)
}
}
149 changes: 149 additions & 0 deletions sink/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package sink

import (
"fmt"
"os"
"strconv"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
log "github.com/sirupsen/logrus"
)

// SQS ...
type SQSSink struct {
session *session.Session
sqs *sqs.SQS
queueName string
groupId string
stopCh chan interface{}
putCh chan []byte
batchCh chan [][]byte
}

// NNewSQS ...
func NewSQS(groupId string) (*SQSSink, error) {
queueName := os.Getenv("SINK_SQS_QUEUE_URL")
if queueName == "" {
return nil, fmt.Errorf("[sink/sqs] Missing SINK_SQS_QUEUE_URL")
}

sess := session.Must(session.NewSession())
svc := sqs.New(sess)

return &SQSSink{
session: sess,
sqs: svc,
queueName: queueName,
groupId: groupId,
stopCh: make(chan interface{}),
putCh: make(chan []byte, 1000),
batchCh: make(chan [][]byte, 100),
}, nil
}

// Start ...
func (s *SQSSink) Start() error {
// Stop chan for all tasks to depend on
s.stopCh = make(chan interface{})

go s.batch()
go s.write()

// wait forever for a stop signal to happen
for {
select {
case <-s.stopCh:
break
}
break
}

return nil
}

// Stop ...
func (s *SQSSink) Stop() {
log.Infof("[sink/sqs] ensure writer queue is empty (%d messages left)", len(s.putCh))

for len(s.putCh) > 0 {
log.Infof("[sink/sqs] Waiting for queue to drain - (%d messages left)", len(s.putCh))
time.Sleep(1 * time.Second)
}

close(s.stopCh)
}

// Put ..
func (s *SQSSink) Put(data []byte) error {
s.putCh <- data

return nil
}

func (s *SQSSink) batch() {
buffer := make([][]byte, 0)
ticker := time.NewTicker(1 * time.Second)

for {
select {
case data := <-s.putCh:
buffer = append(buffer, data)

if len(buffer) == 10 {
s.batchCh <- buffer
buffer = make([][]byte, 0)
}

case _ = <-ticker.C:
// If there is anything else in the putCh, wait a little longer
if len(s.putCh) > 0 {
continue
}

if len(buffer) > 0 {
s.batchCh <- buffer
buffer = make([][]byte, 0)
}
}
}
}

func (s *SQSSink) write() {
log.Infof("[sink/sqs] Starting writer")

var id int64

for {
select {
case batch := <-s.batchCh:
entries := make([]*sqs.SendMessageBatchRequestEntry, 0)

for _, data := range batch {
mID := aws.String(strconv.FormatInt(id, 10))
entry := &sqs.SendMessageBatchRequestEntry{
Id: mID,
MessageBody: aws.String(string(data)),
MessageGroupId: aws.String(s.groupId),
MessageDeduplicationId: mID,
}

entries = append(entries, entry)
id = id + 1
}

_, err := s.sqs.SendMessageBatch(&sqs.SendMessageBatchInput{
Entries: entries,
QueueUrl: aws.String(s.queueName),
})

if err != nil {
log.Errorf("[sink/sqs] %s", err)
} else {
log.Infof("[sink/sqs] queued %d messages", len(batch))
}
}
}
}

0 comments on commit bfb58d8

Please sign in to comment.