-
Notifications
You must be signed in to change notification settings - Fork 3
/
auto.go
151 lines (131 loc) · 4.15 KB
/
auto.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Package occupancyemail provides an automation that creates a digest email of occupancy statistics.
// The automation periodically uses the OccupancySensorHistoryApi to fetch occupancy records, analyses them,
// formats an email using html/template, and sends it to some recipients using smtp.
package occupancyemail
// NOTE: There's an e2e test in cmd/tools/test-occupancyemail/main.go
import (
"context"
"time"
"go.uber.org/zap"
"google.golang.org/protobuf/types/known/timestamppb"
timepb "github.com/smart-core-os/sc-api/go/types/time"
"github.com/vanti-dev/sc-bos/pkg/auto"
"github.com/vanti-dev/sc-bos/pkg/auto/occupancyemail/config"
"github.com/vanti-dev/sc-bos/pkg/gen"
"github.com/vanti-dev/sc-bos/pkg/task"
"github.com/vanti-dev/sc-bos/pkg/task/service"
)
const AutoName = "occupancyemail"
var Factory auto.Factory = factory{}
type factory struct{}
func (f factory) New(services auto.Services) service.Lifecycle {
a := &autoImpl{Services: services}
a.Service = service.New(service.MonoApply(a.applyConfig), service.WithParser(config.ReadBytes))
a.Logger = a.Logger.Named(AutoName)
return a
}
type autoImpl struct {
*service.Service[config.Root]
auto.Services
}
func (a *autoImpl) applyConfig(ctx context.Context, cfg config.Root) error {
logger := a.Logger
logger = logger.With(zap.String("snmp.host", cfg.Destination.Host), zap.Int("snmp.port", cfg.Destination.Port))
var ohClient gen.OccupancySensorHistoryClient
if err := a.Node.Client(&ohClient); err != nil {
return err
}
sendTime := cfg.Destination.SendTime
now := cfg.Now
if now == nil {
now = a.Now
}
if now == nil {
now = time.Now
}
go func() {
t := now()
for {
next := sendTime.Next(t)
select {
case <-ctx.Done():
return
case <-time.After(time.Until(next)):
// Use the time we were planning on running instead of the current time.
// We do this to make output more predictable
t = next
}
attrs := Attrs{
Now: t,
Stats: []Stats{{Source: cfg.Source}},
}
stats := &attrs.Stats[0]
days := make(map[time.Time]OccupancyStats) // the time.Time key should be at 00:00 for the day
rangeStart := t.Add(-7 * 24 * time.Hour)
ohReq := &gen.ListOccupancyHistoryRequest{
Name: cfg.Source.Name,
Period: &timepb.Period{
StartTime: timestamppb.New(rangeStart),
EndTime: timestamppb.New(t),
},
}
for {
ohResp, err := retryT(ctx, func(ctx context.Context) (*gen.ListOccupancyHistoryResponse, error) {
return ohClient.ListOccupancyHistory(ctx, ohReq)
})
if err != nil {
logger.Warn("failed to fetch occupancy history", zap.Error(err))
break
}
for _, r := range ohResp.GetOccupancyRecords() {
if pc := r.GetOccupancy().GetPeopleCount(); pc > stats.Last7Days.MaxPeopleCount {
stats.Last7Days.MaxPeopleCount = pc
}
day := startOfDay(r.GetRecordTime().AsTime().In(t.Location()))
if pc := r.GetOccupancy().GetPeopleCount(); pc > days[day].MaxPeopleCount {
s := days[day]
s.MaxPeopleCount = pc
days[day] = s
}
}
ohReq.PageToken = ohResp.GetNextPageToken()
if ohReq.PageToken == "" {
break
}
}
// process days into stats
for dt := startOfDay(rangeStart); dt.Before(t); dt = startOfDay(dt.Add(30 * time.Hour)) {
stats.Days = append(stats.Days, DayStats{
Date: dt,
OccupancyStats: days[dt],
})
}
err := retry(ctx, func(ctx context.Context) error {
return sendEmail(cfg.Destination, attrs)
})
if err != nil {
logger.Warn("failed to send email", zap.Error(err))
} else {
logger.Info("email sent")
}
}
}()
return nil
}
func retry(ctx context.Context, f func(context.Context) error) error {
return task.Run(ctx, func(ctx context.Context) (task.Next, error) {
return 0, f(ctx)
}, task.WithBackoff(10*time.Second, 10*time.Minute), task.WithRetry(40))
}
func retryT[T any](ctx context.Context, f func(context.Context) (T, error)) (T, error) {
var t T
err := retry(ctx, func(ctx context.Context) error {
var err error
t, err = f(ctx)
return err
})
return t, err
}
func startOfDay(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}