forked from nyaruka/courier
-
Notifications
You must be signed in to change notification settings - Fork 0
/
status.go
166 lines (136 loc) · 5.89 KB
/
status.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package rapidpro
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"log"
"os"
"time"
"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
)
// newMsgStatus creates a new DBMsgStatus for the passed in parameters
func newMsgStatus(channel courier.Channel, id courier.MsgID, externalID string, status courier.MsgStatusValue) *DBMsgStatus {
return &DBMsgStatus{
ChannelUUID_: channel.UUID(),
ID_: id,
ExternalID_: externalID,
Status_: status,
ModifiedOn_: time.Now().In(time.UTC),
}
}
// writeMsgStatus writes the passed in status to the database, queueing it to our spool in case the database is down
func writeMsgStatus(ctx context.Context, b *backend, status courier.MsgStatus) error {
dbStatus := status.(*DBMsgStatus)
err := writeMsgStatusToDB(ctx, b, dbStatus)
if err == courier.ErrMsgNotFound {
return err
}
// failed writing, write to our spool instead
if err != nil {
err = courier.WriteToSpool(b.config.SpoolDir, "statuses", dbStatus)
}
return err
}
const selectMsgIDForID = `
SELECT m."id" FROM "msgs_msg" m INNER JOIN "channels_channel" c ON (m."channel_id" = c."id") WHERE (m."id" = $1 AND c."uuid" = $2)`
const selectMsgIDForExternalID = `
SELECT m."id" FROM "msgs_msg" m INNER JOIN "channels_channel" c ON (m."channel_id" = c."id") WHERE (m."external_id" = $1 AND c."uuid" = $2)`
func checkMsgExists(b *backend, status courier.MsgStatus) (err error) {
var id int64
if status.ID() != courier.NilMsgID {
err = b.db.QueryRow(selectMsgIDForID, status.ID(), status.ChannelUUID()).Scan(&id)
} else if status.ExternalID() != "" {
err = b.db.QueryRow(selectMsgIDForExternalID, status.ExternalID(), status.ChannelUUID()).Scan(&id)
} else {
return fmt.Errorf("no id or external id for status update")
}
if err == sql.ErrNoRows {
return courier.ErrMsgNotFound
}
return err
}
// the craziness below lets us update our status to 'F' and schedule retries without knowing anything about the message
const updateMsgID = `
UPDATE msgs_msg SET
status = CASE WHEN :status = 'E' THEN CASE WHEN error_count >= 2 OR status = 'F' THEN 'F' ELSE 'E' END ELSE :status END,
error_count = CASE WHEN :status = 'E' THEN error_count + 1 ELSE error_count END,
next_attempt = CASE WHEN :status = 'E' THEN NOW() + (5 * (error_count+1) * interval '1 minutes') ELSE next_attempt END,
external_id = CASE WHEN :external_id != '' THEN :external_id ELSE external_id END,
sent_on = CASE WHEN :status = 'W' THEN NOW() ELSE sent_on END,
modified_on = :modified_on
WHERE msgs_msg.id IN
(SELECT msgs_msg.id
FROM msgs_msg INNER JOIN channels_channel ON (msgs_msg.channel_id = channels_channel.id)
WHERE (msgs_msg.id = :msg_id AND channels_channel.uuid = :channel_uuid))
RETURNING msgs_msg.id
`
const updateMsgExternalID = `
UPDATE msgs_msg SET
status = CASE WHEN :status = 'E' THEN CASE WHEN error_count >= 2 OR status = 'F' THEN 'F' ELSE 'E' END ELSE :status END,
error_count = CASE WHEN :status = 'E' THEN error_count + 1 ELSE error_count END,
next_attempt = CASE WHEN :status = 'E' THEN NOW() + (5 * (error_count+1) * interval '1 minutes') ELSE next_attempt END,
sent_on = CASE WHEN :status = 'W' THEN NOW() ELSE sent_on END,
modified_on = :modified_on
WHERE msgs_msg.id IN
(SELECT msgs_msg.id
FROM msgs_msg INNER JOIN channels_channel ON (msgs_msg.channel_id = channels_channel.id)
WHERE (msgs_msg.external_id = :external_id AND channels_channel.uuid = :channel_uuid))
RETURNING msgs_msg.id
`
// writeMsgStatusToDB writes the passed in msg status to our db
func writeMsgStatusToDB(ctx context.Context, b *backend, status *DBMsgStatus) error {
var rows *sqlx.Rows
var err error
if status.ID() != courier.NilMsgID {
rows, err = b.db.NamedQueryContext(ctx, updateMsgID, status)
} else if status.ExternalID() != "" {
rows, err = b.db.NamedQueryContext(ctx, updateMsgExternalID, status)
} else {
return fmt.Errorf("attempt to update msg status without id or external id")
}
if err != nil {
return err
}
defer rows.Close()
// scan and read the id of the msg that was updated
if rows.Next() {
rows.Scan(&status.ID_)
} else {
return courier.ErrMsgNotFound
}
return nil
}
func (b *backend) flushStatusFile(filename string, contents []byte) error {
status := &DBMsgStatus{}
err := json.Unmarshal(contents, status)
if err != nil {
log.Printf("ERROR unmarshalling spool file '%s', renaming: %s\n", filename, err)
os.Rename(filename, fmt.Sprintf("%s.error", filename))
return nil
}
// try to flush to our db
return writeMsgStatusToDB(context.Background(), b, status)
}
//-----------------------------------------------------------------------------
// MsgStatusUpdate implementation
//-----------------------------------------------------------------------------
// DBMsgStatus represents a status update on a message
type DBMsgStatus struct {
ChannelUUID_ courier.ChannelUUID `json:"channel_uuid" db:"channel_uuid"`
ID_ courier.MsgID `json:"msg_id,omitempty" db:"msg_id"`
ExternalID_ string `json:"external_id,omitempty" db:"external_id"`
Status_ courier.MsgStatusValue `json:"status" db:"status"`
ModifiedOn_ time.Time `json:"modified_on" db:"modified_on"`
logs []*courier.ChannelLog
}
func (s *DBMsgStatus) EventID() int64 { return s.ID_.Int64 }
func (s *DBMsgStatus) ChannelUUID() courier.ChannelUUID { return s.ChannelUUID_ }
func (s *DBMsgStatus) ID() courier.MsgID { return s.ID_ }
func (s *DBMsgStatus) ExternalID() string { return s.ExternalID_ }
func (s *DBMsgStatus) SetExternalID(id string) { s.ExternalID_ = id }
func (s *DBMsgStatus) Logs() []*courier.ChannelLog { return s.logs }
func (s *DBMsgStatus) AddLog(log *courier.ChannelLog) { s.logs = append(s.logs, log) }
func (s *DBMsgStatus) Status() courier.MsgStatusValue { return s.Status_ }
func (s *DBMsgStatus) SetStatus(status courier.MsgStatusValue) { s.Status_ = status }