-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
116 lines (92 loc) · 2.55 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package main
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"syscall"
"time"
"github.com/nats-io/nats.go"
"github.com/synternet/data-layer-sdk/pkg/options"
"github.com/synternet/price-publisher/internal/config"
"github.com/synternet/price-publisher/internal/service"
"github.com/synternet/price-publisher/pkg/cmc"
)
func main() {
cfg, err := config.Init()
if err != nil {
panic(fmt.Errorf("failed to parse config: %w", err))
}
slog.Info("Config", "PublishIntervalSec", cfg.PublishIntervalSec)
nkey, jwt, err := CreateUser(cfg.NatsConfig.NKey)
if err != nil {
panic(fmt.Errorf("failed to create JWT: %w", err))
}
conn, err := options.MakeNats("Price Publisher", cfg.NatsConfig.Urls, "", *nkey, *jwt, "", "", "")
if err != nil {
panic(fmt.Errorf("failed to connect to NATS %s: %w", cfg.NatsConfig.Urls, err))
}
setErrorHandlers(conn)
slog.Info("Connected to NATS", "URLS", cfg.NatsConfig.Urls)
allMsgChan := make(service.AllMessageChannel, 1024)
sPub := service.New(conn, context.Background(), cfg.NatsConfig.PrefixName, cfg.NatsConfig.PublisherName, allMsgChan)
sPub.Start()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
c := make(chan os.Signal, 2)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-c
slog.Info("Service interrupted. Exiting...")
cancel()
}()
go sPub.Serve(ctx)
ticker := time.NewTicker(time.Duration(cfg.PublishIntervalSec) * time.Second)
defer ticker.Stop()
cmcConfig := config.CmcConfig{
Ids: cfg.CmcConfig.Ids,
ApiKey: cfg.CmcConfig.ApiKey,
}
defer sPub.Close()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
quotes, err := cmc.RetrievePrices(cmcConfig)
if err != nil {
slog.Error(err.Error())
continue
}
symbolQuotes := make(map[string]cmc.QuoteInfo)
for id, dataItem := range quotes.Data {
usdQuote, ok := dataItem.Quote["USD"]
if !ok {
slog.Info("USD quote not found", "ID", id)
continue
}
symbolQuotes[dataItem.Symbol] = cmc.QuoteInfo{
Price: usdQuote.Price,
PercentChange24: usdQuote.PercentChange24h,
LastUpdated: usdQuote.LastUpdated.Unix(),
}
}
allMsgChan <- symbolQuotes
}
}
}
func setErrorHandlers(conn *nats.Conn) {
if conn == nil {
return
}
conn.SetErrorHandler(func(_ *nats.Conn, _ *nats.Subscription, err error) {
slog.Error("NATS error", err)
})
conn.SetDisconnectHandler(func(c *nats.Conn) {
slog.Error("NATS disconnected", c.LastError())
})
conn.SetReconnectHandler(func(_ *nats.Conn) {
slog.Info("NATS reconnected")
})
}