Skip to content

Commit

Permalink
Add support for publishing to an AWS SQS
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Jun 3, 2018
1 parent fe424bc commit 11b6872
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 1 deletion.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
/vendor
/dist
/private
46 changes: 45 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,6 @@ func init() {
viper.BindPFlag("api.port", startCmd.PersistentFlags().Lookup("port"))
viper.BindPFlag("api.address", startCmd.PersistentFlags().Lookup("address"))
viper.SetDefault("api.admin-password", "changeme")
viper.SetDefault("aws.access-key-id", "")
viper.SetDefault("aws.secret-access-key", "")
}
26 changes: 26 additions & 0 deletions collector/aws/aws.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package aws

import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
)

const (
// Region is the default aws region
Region = "us-east-2"
// MaxRetries is the number of retries when connecting to aws
MaxRetries = 5
)

// Session is a global session handle
var Session *session.Session

// NewSession creates a new aws session
func NewSession(accessKeyID, secretAccessKey string) *session.Session {
return session.New(&aws.Config{
Region: aws.String(Region),
Credentials: credentials.NewStaticCredentials(accessKeyID, secretAccessKey, ""),
MaxRetries: aws.Int(MaxRetries),
})
}
35 changes: 35 additions & 0 deletions collector/aws/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package aws

import (
"errors"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
)

const (
// QueueURL is the URL of the message queue
queueURL = "https://sqs.us-east-2.amazonaws.com/082866812839/ooni-collector.fifo"
)

// SendMessage sends a message to the AWS SQS queue
func SendMessage(sess *session.Session, body string, groupID string) (*string, error) {
if sess == nil {
return nil, errors.New("invalid aws Session")
}

svc := sqs.New(sess)
sendParams := &sqs.SendMessageInput{
MessageBody: aws.String(body),
QueueUrl: aws.String(queueURL),
MessageGroupId: aws.String(groupID),
}
sendResp, err := svc.SendMessage(sendParams)

if err != nil {
return nil, err
}

return sendResp.MessageId, nil
}
15 changes: 15 additions & 0 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"

"github.com/ooni/collector/collector/api/v1"
"github.com/ooni/collector/collector/aws"
"github.com/ooni/collector/collector/middleware"
"github.com/ooni/collector/collector/paths"
"github.com/ooni/collector/collector/report"
Expand Down Expand Up @@ -39,6 +40,16 @@ func initDataRoot() error {
return nil
}

func initAWS() error {
accessKeyID := viper.GetString("aws.access-key-id")
secretAccessKey := viper.GetString("aws.secret-access-key")
if accessKeyID == "" {
return nil
}
aws.Session = aws.NewSession(accessKeyID, secretAccessKey)
return nil
}

// Start the collector server
func Start() {
var (
Expand All @@ -56,6 +67,10 @@ func Start() {
if err = initDataRoot(); err != nil {
log.WithError(err).Error("failed to init data root")
}
if err = initAWS(); err != nil {
log.WithError(err).Error("failed to init aws")
}

store := storage.New(paths.BadgerDir())
storageMw, err := middleware.InitStorageMiddleware(store)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions collector/report/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/apex/log"
"github.com/ooni/collector/collector/aws"
"github.com/ooni/collector/collector/info"
"github.com/ooni/collector/collector/paths"
"github.com/ooni/collector/collector/storage"
Expand Down Expand Up @@ -137,12 +138,23 @@ func CloseReport(store *storage.Storage, reportID string) error {
// There is no need to keep closed empty reports
os.Remove(meta.ReportFilePath)
}
meta.ReportFilePath = dstPath
meta.Closed = true
expiryTimers[reportID].Stop()

if err = store.SetReport(meta); err != nil {
return err
}
value, err := json.Marshal(meta)
if err != nil {
log.WithError(err).Error("failed to serialize meta")
return nil
}
_, err = aws.SendMessage(aws.Session, string(value), "report")
if err != nil {
log.WithError(err).Error("failed to publish to aws SQS")
return nil
}

return nil
}
Expand Down

0 comments on commit 11b6872

Please sign in to comment.