/
queue.go
93 lines (82 loc) · 2.14 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package queues
import (
"fmt"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sqs"
"github.com/sirupsen/logrus"
"playhead/db"
)
type Queue struct {
Config *Config
Database *db.Database
Context *Context
}
func (q *Queue) NewContext() *Context {
return &Context{
Logger: logrus.StandardLogger(),
Database: q.Database,
}
}
func New() (q *Queue, err error) {
q = &Queue{}
q.Config, err = InitConfig()
if err != nil {
return nil, err
}
dbConfig, err := db.InitConfig()
if err != nil {
return nil, err
}
q.Database, err = db.New(dbConfig)
if err != nil {
return nil, err
}
return q, err
}
func (q *Queue) Close() error {
return q.Database.Close()
}
func (q *Queue) deleteQMessage(message *sqs.Message, qUrl string) {
if _, err := q.getSQSSession().DeleteMessage(&sqs.DeleteMessageInput{
QueueUrl: aws.String(qUrl),
ReceiptHandle: message.ReceiptHandle,
}); err != nil {
ErrorHandler(err)
logrus.Errorf("Error removing message from queue: %v\n", err)
} else {
logrus.Debug(fmt.Sprintf("Deleted +%v\n", message))
}
}
func (q *Queue) getSession() (*session.Session, error) {
// sess = session.Must(session.NewSessionWithOptions(session.Options{
// AssumeRoleTokenProvider: stscreds.StdinTokenProvider,
// SharedConfigState: session.SharedConfigEnable,
// Config: aws.Config{
// Region: aws.String(getAwsRegion),
// CredentialsChainVerboseErrors: aws.Bool(true),
// },
// }))
if len(q.Config.CmsQueueUrl) > 0 {
fmt.Println("Using cms q ", q.Config.CmsQueueUrl)
if sess, err := session.NewSession(&aws.Config{LogLevel: aws.LogLevel(3), DisableSSL: aws.Bool(true), Region: aws.String(q.Config.AwsRegion)}); err == nil {
return sess, nil
} else {
logrus.Errorf(fmt.Sprintf("Error +%v\n", err))
return nil, err
}
} else {
if sess, err := session.NewSession(&aws.Config{Region: aws.String(q.Config.AwsRegion)}); !ErrorHandler(err) {
return sess, nil
} else {
return nil, err
}
}
}
func (q *Queue) getSQSSession() *sqs.SQS {
if sess, err := q.getSession(); err != nil {
panic(err)
} else {
return sqs.New(sess)
}
}