-
Notifications
You must be signed in to change notification settings - Fork 390
/
nodeevents.go
174 lines (149 loc) · 4.93 KB
/
nodeevents.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// Copyright (C) 2019 Storj Labs, Inc.
// See LICENSE for copying information.
package satellitedb
import (
"context"
"time"
"github.com/zeebo/errs"
"go.uber.org/zap"
"storj.io/common/dbutil/pgutil"
"storj.io/common/storj"
"storj.io/common/uuid"
"storj.io/storj/satellite/nodeevents"
"storj.io/storj/satellite/satellitedb/dbx"
)
var _ nodeevents.DB = (*nodeEvents)(nil)
type nodeEvents struct {
db *satelliteDB
}
// Insert a node event into the node events table.
func (ne *nodeEvents) Insert(ctx context.Context, email string, lastIPPort *string, nodeID storj.NodeID, eventType nodeevents.Type) (nodeEvent nodeevents.NodeEvent, err error) {
defer mon.Task()(&ctx)(&err)
id, err := uuid.New()
if err != nil {
return nodeEvent, err
}
name, err := eventType.Name()
if err != nil {
return nodeEvent, err
}
var optional dbx.NodeEvent_Create_Fields
if lastIPPort != nil {
optional.LastIpPort = dbx.NodeEvent_LastIpPort(*lastIPPort)
}
entry, err := ne.db.Create_NodeEvent(ctx, dbx.NodeEvent_Id(id.Bytes()), dbx.NodeEvent_Email(email), dbx.NodeEvent_NodeId(nodeID.Bytes()), dbx.NodeEvent_Event(int(eventType)), optional)
if err != nil {
return nodeEvent, err
}
ne.db.log.Info("node event inserted", zap.String("name", name), zap.String("email", email), zap.String("node ID", nodeID.String()))
return fromDBX(entry)
}
// GetLatestByEmailAndEvent gets latest node event by email and event type.
func (ne *nodeEvents) GetLatestByEmailAndEvent(ctx context.Context, email string, event nodeevents.Type) (nodeEvent nodeevents.NodeEvent, err error) {
defer mon.Task()(&ctx)(&err)
dbxNE, err := ne.db.First_NodeEvent_By_Email_And_Event_OrderBy_Desc_CreatedAt(ctx, dbx.NodeEvent_Email(email), dbx.NodeEvent_Event(int(event)))
if err != nil {
return nodeEvent, err
}
return fromDBX(dbxNE)
}
// GetByID get a node event by id.
func (ne *nodeEvents) GetByID(ctx context.Context, id uuid.UUID) (nodeEvent nodeevents.NodeEvent, err error) {
defer mon.Task()(&ctx)(&err)
dbxNE, err := ne.db.Get_NodeEvent_By_Id(ctx, dbx.NodeEvent_Id(id[:]))
if err != nil {
return nodeEvent, err
}
return fromDBX(dbxNE)
}
// GetNextBatch gets the next batch of events to combine into an email.
// It selects one item that was inserted before 'firstSeenBefore', then selects
// all entries with the same email and event so that they can be combined into a
// single email.
func (ne *nodeEvents) GetNextBatch(ctx context.Context, firstSeenBefore time.Time) (events []nodeevents.NodeEvent, err error) {
defer mon.Task()(&ctx)(&err)
rows, err := ne.db.QueryContext(ctx, `
SELECT node_events.id, node_events.email, node_events.last_ip_port, node_events.node_id, node_events.event
FROM node_events
INNER JOIN (
SELECT email, event
FROM node_events
WHERE created_at < $1
AND email_sent is NULL
ORDER BY last_attempted ASC NULLS FIRST, created_at ASC
LIMIT 1
) as t
ON node_events.email = t.email
AND node_events.event = t.event
WHERE node_events.email_sent IS NULL
`, firstSeenBefore)
if err != nil {
return nil, err
}
defer func() { err = errs.Combine(err, rows.Close()) }()
for rows.Next() {
var idBytes []byte
var email string
var lastIPPort *string
var nodeIDBytes []byte
var event int
err = rows.Scan(&idBytes, &email, &lastIPPort, &nodeIDBytes, &event)
if err != nil {
return nil, err
}
id, err := uuid.FromBytes(idBytes)
if err != nil {
return nil, err
}
nodeID, err := storj.NodeIDFromBytes(nodeIDBytes)
if err != nil {
return nil, err
}
events = append(events, nodeevents.NodeEvent{
ID: id,
Email: email,
LastIPPort: lastIPPort,
NodeID: nodeID,
Event: nodeevents.Type(event),
})
}
return events, rows.Err()
}
func fromDBX(dbxNE *dbx.NodeEvent) (event nodeevents.NodeEvent, err error) {
id, err := uuid.FromBytes(dbxNE.Id)
if err != nil {
return event, err
}
nodeID, err := storj.NodeIDFromBytes(dbxNE.NodeId)
if err != nil {
return event, err
}
return nodeevents.NodeEvent{
ID: id,
Email: dbxNE.Email,
LastIPPort: dbxNE.LastIpPort,
NodeID: nodeID,
Event: nodeevents.Type(dbxNE.Event),
CreatedAt: dbxNE.CreatedAt,
LastAttempted: dbxNE.LastAttempted,
EmailSent: dbxNE.EmailSent,
}, nil
}
// UpdateEmailSent updates email_sent for a group of rows.
func (ne *nodeEvents) UpdateEmailSent(ctx context.Context, ids []uuid.UUID, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = ne.db.ExecContext(ctx, `
UPDATE node_events SET email_sent = $1
WHERE id = ANY($2::bytea[])
`, timestamp, pgutil.UUIDArray(ids))
return err
}
// UpdateLastAttempted updates last_attempted for a group of rows.
func (ne *nodeEvents) UpdateLastAttempted(ctx context.Context, ids []uuid.UUID, timestamp time.Time) (err error) {
defer mon.Task()(&ctx)(&err)
_, err = ne.db.ExecContext(ctx, `
UPDATE node_events SET last_attempted = $1
WHERE id = ANY($2::bytea[])
`, timestamp, pgutil.UUIDArray(ids))
return err
}