Skip to content

Commit

Permalink
Merge pull request #22 from nyaruka/status-interface
Browse files Browse the repository at this point in the history
Change MsgStatusUpdate concrete class to an interface backends implem…
  • Loading branch information
nicpottier committed Jul 12, 2017
2 parents fac8dc5 + 721a681 commit 3e879ba
Show file tree
Hide file tree
Showing 18 changed files with 204 additions and 173 deletions.
8 changes: 7 additions & 1 deletion backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,14 @@ type Backend interface {
// WriteMsg writes the passed in message to our backend
WriteMsg(Msg) error

// NewMsgStatusForID creates a new Status object for the given message id
NewMsgStatusForID(Channel, MsgID, MsgStatusValue) MsgStatus

// NewMsgStatusForExternalID creates a new Status object for the given external id
NewMsgStatusForExternalID(Channel, string, MsgStatusValue) MsgStatus

// WriteMsgStatus writes the passed in status update to our backend
WriteMsgStatus(*MsgStatusUpdate) error
WriteMsgStatus(MsgStatus) error

// WriteChannelLogs writes the passed in channel logs to our backend
WriteChannelLogs([]*ChannelLog) error
Expand Down
12 changes: 11 additions & 1 deletion backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,18 @@ func (b *backend) WriteMsg(m courier.Msg) error {
return writeMsg(b, m)
}

// NewStatusUpdateForID creates a new Status object for the given message id
func (b *backend) NewMsgStatusForID(channel courier.Channel, id courier.MsgID, status courier.MsgStatusValue) courier.MsgStatus {
return newMsgStatus(channel, id, "", status)
}

// NewStatusUpdateForID creates a new Status object for the given message id
func (b *backend) NewMsgStatusForExternalID(channel courier.Channel, externalID string, status courier.MsgStatusValue) courier.MsgStatus {
return newMsgStatus(channel, courier.NilMsgID, externalID, status)
}

// WriteMsgStatus writes the passed in MsgStatus to our store
func (b *backend) WriteMsgStatus(status *courier.MsgStatusUpdate) error {
func (b *backend) WriteMsgStatus(status courier.MsgStatus) error {
return writeMsgStatus(b, status)
}

Expand Down
14 changes: 7 additions & 7 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,19 @@ func (ts *MsgTestSuite) TestCheckMsgExists() {
knChannel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")

// check with invalid message id
err := checkMsgExists(ts.b, courier.NewStatusUpdateForID(knChannel, courier.NewMsgID(-1), courier.MsgStatus("S")))
err := checkMsgExists(ts.b, ts.b.NewMsgStatusForID(knChannel, courier.NewMsgID(-1), courier.MsgStatusValue("S")))
ts.Equal(err, courier.ErrMsgNotFound)

// check with valid message id
err = checkMsgExists(ts.b, courier.NewStatusUpdateForID(knChannel, courier.NewMsgID(10000), courier.MsgStatus("S")))
err = checkMsgExists(ts.b, ts.b.NewMsgStatusForID(knChannel, courier.NewMsgID(10000), courier.MsgStatusValue("S")))
ts.Nil(err)

// check with invalid external id
err = checkMsgExists(ts.b, courier.NewStatusUpdateForExternalID(knChannel, "ext-invalid", courier.MsgStatus("S")))
err = checkMsgExists(ts.b, ts.b.NewMsgStatusForExternalID(knChannel, "ext-invalid", courier.MsgStatusValue("S")))
ts.Equal(err, courier.ErrMsgNotFound)

// check with valid external id
status := courier.NewStatusUpdateForExternalID(knChannel, "ext1", courier.MsgStatus("S"))
status := ts.b.NewMsgStatusForExternalID(knChannel, "ext1", courier.MsgStatusValue("S"))
err = checkMsgExists(ts.b, status)
ts.Nil(err)
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func (ts *MsgTestSuite) TestStatus() {
now := time.Now().In(time.UTC)

// update by id
status := courier.NewStatusUpdateForID(channel, courier.NewMsgID(10001), courier.MsgSent)
status := ts.b.NewMsgStatusForID(channel, courier.NewMsgID(10001), courier.MsgSent)
err := ts.b.WriteMsgStatus(status)
ts.NoError(err)
m, err := readMsgFromDB(ts.b, courier.NewMsgID(10001))
Expand All @@ -154,7 +154,7 @@ func (ts *MsgTestSuite) TestStatus() {
ts.True(m.ModifiedOn_.After(now))

// update by external id
status = courier.NewStatusUpdateForExternalID(channel, "ext1", courier.MsgFailed)
status = ts.b.NewMsgStatusForExternalID(channel, "ext1", courier.MsgFailed)
err = ts.b.WriteMsgStatus(status)
ts.NoError(err)
m, err = readMsgFromDB(ts.b, courier.NewMsgID(10000))
Expand All @@ -163,7 +163,7 @@ func (ts *MsgTestSuite) TestStatus() {
ts.True(m.ModifiedOn_.After(now))

// no such external id
status = courier.NewStatusUpdateForExternalID(channel, "ext2", courier.MsgSent)
status = ts.b.NewMsgStatusForExternalID(channel, "ext2", courier.MsgSent)
err = ts.b.WriteMsgStatus(status)
ts.Error(err)
}
Expand Down
22 changes: 11 additions & 11 deletions backends/rapidpro/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ func writeMsgSeen(b *backend, msg *DBMsg) {

// DBMsg is our base struct to represent msgs both in our JSON and db representations
type DBMsg struct {
OrgID_ OrgID `json:"org_id" db:"org_id"`
ID_ courier.MsgID `json:"id" db:"id"`
UUID_ courier.MsgUUID `json:"uuid"`
Direction_ MsgDirection `json:"direction" db:"direction"`
Status_ courier.MsgStatus `json:"status" db:"status"`
Visibility_ MsgVisibility `json:"visibility" db:"visibility"`
Priority_ MsgPriority `json:"priority" db:"priority"`
URN_ courier.URN `json:"urn"`
Text_ string `json:"text" db:"text"`
Attachments_ []string `json:"attachments"`
ExternalID_ string `json:"external_id" db:"external_id"`
OrgID_ OrgID `json:"org_id" db:"org_id"`
ID_ courier.MsgID `json:"id" db:"id"`
UUID_ courier.MsgUUID `json:"uuid"`
Direction_ MsgDirection `json:"direction" db:"direction"`
Status_ courier.MsgStatusValue `json:"status" db:"status"`
Visibility_ MsgVisibility `json:"visibility" db:"visibility"`
Priority_ MsgPriority `json:"priority" db:"priority"`
URN_ courier.URN `json:"urn"`
Text_ string `json:"text" db:"text"`
Attachments_ []string `json:"attachments"`
ExternalID_ string `json:"external_id" db:"external_id"`

ChannelID_ ChannelID `json:"channel_id" db:"channel_id"`
ContactID_ ContactID `json:"contact_id" db:"contact_id"`
Expand Down
68 changes: 41 additions & 27 deletions backends/rapidpro/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,21 @@ import (
"github.com/nyaruka/courier"
)

// WriteMsgStatus writes the passed in status to the database, queueing it to our spool in case the database is down
func writeMsgStatus(b *backend, status *courier.MsgStatusUpdate) error {
// create our msg status object
dbStatus := &DBMsgStatus{
ChannelUUID: status.Channel.UUID(),
ID: status.ID,
ExternalID: status.ExternalID,
Status: status.Status,
ModifiedOn: status.CreatedOn,
// newMsgStatus creates a new DBMsgStatus for the passed in parameters
func newMsgStatus(channel courier.Channel, id courier.MsgID, externalID string, status courier.MsgStatusValue) *DBMsgStatus {
return &DBMsgStatus{
ChannelUUID_: channel.UUID(),
ID_: id,
ExternalID_: externalID,
Status_: status,
ModifiedOn_: time.Now().In(time.UTC),
}
}

// writeMsgStatus writes the passed in status to the database, queueing it to our spool in case the database is down
func writeMsgStatus(b *backend, status courier.MsgStatus) error {
// create our msg status object
dbStatus := status.(*DBMsgStatus)

err := writeMsgStatusToDB(b, dbStatus)
if err == courier.ErrMsgNotFound {
Expand All @@ -33,11 +38,6 @@ func writeMsgStatus(b *backend, status *courier.MsgStatusUpdate) error {
err = courier.WriteToSpool(b.config.SpoolDir, "statuses", dbStatus)
}

// update our msg id on our passed in msg
if dbStatus.ID != courier.NilMsgID {
status.ID = dbStatus.ID
}

return err
}

Expand All @@ -47,13 +47,13 @@ SELECT m."id" FROM "msgs_msg" m INNER JOIN "channels_channel" c ON (m."channel_i
const selectMsgIDForExternalID = `
SELECT m."id" FROM "msgs_msg" m INNER JOIN "channels_channel" c ON (m."channel_id" = c."id") WHERE (m."external_id" = $1 AND c."uuid" = $2)`

func checkMsgExists(b *backend, status *courier.MsgStatusUpdate) (err error) {
func checkMsgExists(b *backend, status courier.MsgStatus) (err error) {
var id int64

if status.ID != courier.NilMsgID {
err = b.db.QueryRow(selectMsgIDForID, status.ID, status.Channel.UUID()).Scan(&id)
} else if status.ExternalID != "" {
err = b.db.QueryRow(selectMsgIDForExternalID, status.ExternalID, status.Channel.UUID()).Scan(&id)
if status.ID() != courier.NilMsgID {
err = b.db.QueryRow(selectMsgIDForID, status.ID(), status.ChannelUUID()).Scan(&id)
} else if status.ExternalID() != "" {
err = b.db.QueryRow(selectMsgIDForExternalID, status.ExternalID(), status.ChannelUUID()).Scan(&id)
} else {
return fmt.Errorf("no id or external id for status update")
}
Expand All @@ -80,9 +80,9 @@ WHERE (msgs_msg.external_id = :external_id AND channels_channel.uuid = :channel_
func writeMsgStatusToDB(b *backend, status *DBMsgStatus) error {
var rows *sqlx.Rows
var err error
if status.ID != courier.NilMsgID {
if status.ID() != courier.NilMsgID {
rows, err = b.db.NamedQuery(updateMsgID, status)
} else if status.ExternalID != "" {
} else if status.ExternalID() != "" {
rows, err = b.db.NamedQuery(updateMsgExternalID, status)
} else {
return fmt.Errorf("attempt to update msg status without id or external id")
Expand All @@ -93,7 +93,7 @@ func writeMsgStatusToDB(b *backend, status *DBMsgStatus) error {

// scan and read the id of the msg that was updated
if rows.Next() {
rows.Scan(&status.ID)
rows.Scan(&status.ID_)
} else {
return courier.ErrMsgNotFound
}
Expand All @@ -120,9 +120,23 @@ func (b *backend) flushStatusFile(filename string, contents []byte) error {

// DBMsgStatus represents a status update on a message
type DBMsgStatus struct {
ChannelUUID courier.ChannelUUID `json:"channel_uuid" db:"channel_uuid"`
ID courier.MsgID `json:"msg_id,omitempty" db:"msg_id"`
ExternalID string `json:"external_id,omitempty" db:"external_id"`
Status courier.MsgStatus `json:"status" db:"status"`
ModifiedOn time.Time `json:"modified_on" db:"modified_on"`
ChannelUUID_ courier.ChannelUUID `json:"channel_uuid" db:"channel_uuid"`
ID_ courier.MsgID `json:"msg_id,omitempty" db:"msg_id"`
ExternalID_ string `json:"external_id,omitempty" db:"external_id"`
Status_ courier.MsgStatusValue `json:"status" db:"status"`
ModifiedOn_ time.Time `json:"modified_on" db:"modified_on"`

logs []*courier.ChannelLog
}

func (s *DBMsgStatus) ChannelUUID() courier.ChannelUUID { return s.ChannelUUID_ }
func (s *DBMsgStatus) ID() courier.MsgID { return s.ID_ }

func (s *DBMsgStatus) ExternalID() string { return s.ExternalID_ }
func (s *DBMsgStatus) SetExternalID(id string) { s.ExternalID_ = id }

func (s *DBMsgStatus) Logs() []*courier.ChannelLog { return s.logs }
func (s *DBMsgStatus) AddLog(log *courier.ChannelLog) { s.logs = append(s.logs, log) }

func (s *DBMsgStatus) Status() courier.MsgStatusValue { return s.Status_ }
func (s *DBMsgStatus) SetStatus(status courier.MsgStatusValue) { s.Status_ = status }
4 changes: 2 additions & 2 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type ChannelReceiveMsgFunc func(Channel, http.ResponseWriter, *http.Request) ([]

// ChannelUpdateStatusFunc is the interface ChannelHandler functions must satisfy to handle incoming
// status requests. The Server will take care of looking up the channel by UUID before passing it to this function.
type ChannelUpdateStatusFunc func(Channel, http.ResponseWriter, *http.Request) ([]*MsgStatusUpdate, error)
type ChannelUpdateStatusFunc func(Channel, http.ResponseWriter, *http.Request) ([]MsgStatus, error)

// ChannelActionHandlerFunc is the interface ChannelHandler functions must satisfy to handle other types
// of requests. These generic handlers should only be used if they are not dealing with receiving messages
Expand All @@ -24,7 +24,7 @@ type ChannelHandler interface {
Initialize(Server) error
ChannelType() ChannelType
ChannelName() string
SendMsg(Msg) (*MsgStatusUpdate, error)
SendMsg(Msg) (MsgStatus, error)
}

// RegisterHandler adds a new handler for a channel type, this is called by individual handlers when they are initialized
Expand Down
16 changes: 8 additions & 8 deletions handlers/africastalking/africastalking.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type statusRequest struct {
Status string `validate:"required" name:"status"`
}

var statusMapping = map[string]courier.MsgStatus{
var statusMapping = map[string]courier.MsgStatusValue{
"Success": courier.MsgDelivered,
"Sent": courier.MsgSent,
"Buffered": courier.MsgSent,
Expand Down Expand Up @@ -94,7 +94,7 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter,
}

// StatusMessage is our HTTP handler function for status updates
func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.MsgStatusUpdate, error) {
func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.MsgStatus, error) {
// get our params
atStatus := &statusRequest{}
err := handlers.DecodeAndValidateForm(atStatus, r)
Expand All @@ -108,17 +108,17 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter,
}

// write our status
status := courier.NewStatusUpdateForExternalID(channel, atStatus.ID, msgStatus)
status := h.Backend().NewMsgStatusForExternalID(channel, atStatus.ID, msgStatus)
err = h.Backend().WriteMsgStatus(status)
if err != nil {
return nil, err
}

return []*courier.MsgStatusUpdate{status}, courier.WriteStatusSuccess(w, r, status)
return []courier.MsgStatus{status}, courier.WriteStatusSuccess(w, r, status)
}

// SendMsg sends the passed in message, returning any error
func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
isSharedStr := msg.Channel().ConfigForKey(configIsShared, false)
isShared, _ := isSharedStr.(bool)

Expand Down Expand Up @@ -151,7 +151,7 @@ func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
rr, err := utils.MakeHTTPRequest(req)

// record our status and log
status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored)
status := h.Backend().NewMsgStatusForID(msg.Channel(), msg.ID(), courier.MsgErrored)
status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr))
if err != nil {
return status, err
Expand All @@ -165,8 +165,8 @@ func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {

// grab the external id if we can
externalID, _ := jsonparser.GetString([]byte(rr.Body), "SMSMessageData", "Recipients", "[0]", "messageId")
status.Status = courier.MsgWired
status.ExternalID = externalID
status.SetStatus(courier.MsgWired)
status.SetExternalID(externalID)

return status, nil
}
16 changes: 8 additions & 8 deletions handlers/blackmyna/blackmyna.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ type bmMessage struct {
From string `validate:"required" name:"from"`
}

var bmStatusMapping = map[int]courier.MsgStatus{
var bmStatusMapping = map[int]courier.MsgStatusValue{
1: courier.MsgDelivered,
2: courier.MsgFailed,
8: courier.MsgSent,
16: courier.MsgFailed,
}

// StatusMessage is our HTTP handler function for status updates
func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]*courier.MsgStatusUpdate, error) {
func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter, r *http.Request) ([]courier.MsgStatus, error) {
// get our params
bmStatus := &bmStatus{}
err := handlers.DecodeAndValidateForm(bmStatus, r)
Expand All @@ -91,17 +91,17 @@ func (h *handler) StatusMessage(channel courier.Channel, w http.ResponseWriter,
}

// write our status
status := courier.NewStatusUpdateForExternalID(channel, bmStatus.ID, msgStatus)
status := h.Backend().NewMsgStatusForExternalID(channel, bmStatus.ID, msgStatus)
err = h.Backend().WriteMsgStatus(status)
if err != nil {
return nil, err
}

return []*courier.MsgStatusUpdate{status}, courier.WriteStatusSuccess(w, r, status)
return []courier.MsgStatus{status}, courier.WriteStatusSuccess(w, r, status)
}

// SendMsg sends the passed in message, returning any error
func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
username := msg.Channel().StringConfigForKey(courier.ConfigUsername, "")
if username == "" {
return nil, fmt.Errorf("no username set for BM channel")
Expand Down Expand Up @@ -130,7 +130,7 @@ func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
rr, err := utils.MakeHTTPRequest(req)

// record our status and log
status := courier.NewStatusUpdateForID(msg.Channel(), msg.ID(), courier.MsgErrored)
status := h.Backend().NewMsgStatusForID(msg.Channel(), msg.ID(), courier.MsgErrored)
status.AddLog(courier.NewChannelLogFromRR(msg.Channel(), msg.ID(), rr))
if err != nil {
return status, err
Expand All @@ -142,8 +142,8 @@ func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
return status, errors.Errorf("no external id returned in body")
}

status.Status = courier.MsgWired
status.ExternalID = externalID
status.SetStatus(courier.MsgWired)
status.SetExternalID(externalID)

return status, nil
}
Expand Down
2 changes: 1 addition & 1 deletion handlers/dummy/dummy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (h *handler) Initialize(s courier.Server) error {
}

// SendMsg sends the passed in message, returning any error
func (h *handler) SendMsg(msg courier.Msg) (*courier.MsgStatusUpdate, error) {
func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
time.Sleep(time.Second * 1)
return nil, nil
}
Loading

0 comments on commit 3e879ba

Please sign in to comment.