forked from nytimes/gizmo
/
sub.go
120 lines (97 loc) · 2.58 KB
/
sub.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package service
import (
"encoding/json"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/NYTimes/gizmo/config"
"github.com/NYTimes/gizmo/config/combined"
"github.com/NYTimes/gizmo/pubsub"
"github.com/NYTimes/gizmo/pubsub/aws"
"github.com/NYTimes/logrotate"
"github.com/Sirupsen/logrus"
"github.com/go-kit/kit/metrics/provider"
"github.com/golang/protobuf/proto"
"github.com/NYTimes/gizmo/examples/nyt"
)
var (
Log = logrus.New()
sub pubsub.Subscriber
metrics provider.Provider
client nyt.Client
articles []nyt.SemanticConceptArticle
)
type Config struct {
*combined.Config
MostPopularToken string
SemanticToken string
}
func Init() {
var cfg *Config
config.LoadJSONFile("./config.json", &cfg)
config.SetLogOverride(cfg.Log)
if *cfg.Log != "" {
lf, err := logrotate.NewFile(*cfg.Log)
if err != nil {
Log.Fatalf("unable to access log file: %s", err)
}
Log.Out = lf
Log.Formatter = &logrus.JSONFormatter{}
} else {
Log.Out = os.Stderr
}
pubsub.Log = Log
var err error
cfg.Metrics.Prefix = metricsNamespace()
metrics = cfg.Metrics.NewProvider()
client = nyt.NewClient(cfg.MostPopularToken, cfg.SemanticToken)
sub, err = aws.NewSubscriber(cfg.SQS)
if err != nil {
Log.Fatal("unable to init SQS: ", err)
}
}
func Run() (err error) {
stream := sub.Start()
totalMsgsConsumed := metrics.NewCounter("total-consumed")
errorCount := metrics.NewCounter("error-count")
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT)
Log.Infof("received kill signal %s", <-ch)
err = sub.Stop()
}()
var article nyt.SemanticConceptArticle
for msg := range stream {
totalMsgsConsumed.Add(1)
if err = proto.Unmarshal(msg.Message(), &article); err != nil {
Log.Error("unable to unmarshal article from SQS: ", err)
errorCount.Add(1)
if err = msg.Done(); err != nil {
Log.Error("unable to delete message from SQS: ", err)
}
continue
}
// do something!
fmt.Println("Most Recent Article on 'Cats':")
out, _ := json.MarshalIndent(article, "", " ")
fmt.Fprint(os.Stdout, string(out))
articles = append(articles, article)
if err = msg.Done(); err != nil {
Log.WithFields(logrus.Fields{
"article": article,
}).Error("unable to delete message from SQS: ", err)
}
}
return err
}
func metricsNamespace() string {
// get only server base name
name, _ := os.Hostname()
name = strings.SplitN(name, ".", 2)[0]
// set it up to be paperboy.servername
name = strings.Replace(name, "-", ".", 1)
// add the 'apps' prefix to keep things neat
return "apps." + name + ".cats-subscriber"
}