Skip to content

Commit

Permalink
Tweaks for integration with RapidPro, introduce null library for null…
Browse files Browse the repository at this point in the history
…able db ints
  • Loading branch information
nicpottier committed Jul 25, 2017
1 parent 49f180f commit 1003787
Show file tree
Hide file tree
Showing 18 changed files with 126 additions and 79 deletions.
6 changes: 3 additions & 3 deletions backends/rapidpro/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) {
defer rc.Close()

dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
found, err := redis.Bool(rc.Do("sismember", dateKey, msg.ID()))
found, err := redis.Bool(rc.Do("sismember", dateKey, msg.ID().String()))
if err != nil {
return false, err
}
Expand All @@ -108,7 +108,7 @@ func (b *backend) WasMsgSent(msg courier.Msg) (bool, error) {
}

dateKey = fmt.Sprintf(sentSetName, time.Now().Add(time.Hour*-24).In(time.UTC).Format("2006_01_02"))
found, err = redis.Bool(rc.Do("sismember", dateKey, msg.ID()))
found, err = redis.Bool(rc.Do("sismember", dateKey, msg.ID().String()))
return found, err
}

Expand All @@ -123,7 +123,7 @@ func (b *backend) MarkOutgoingMsgComplete(msg courier.Msg, status courier.MsgSta
// mark as sent in redis as well if this was actually wired or sent
if status != nil && (status.Status() == courier.MsgSent || status.Status() == courier.MsgWired) {
dateKey := fmt.Sprintf(sentSetName, time.Now().In(time.UTC).Format("2006_01_02"))
rc.Do("sadd", dateKey, msg.ID())
rc.Do("sadd", dateKey, msg.ID().String())
}
}

Expand Down
33 changes: 33 additions & 0 deletions backends/rapidpro/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,39 @@ func (ts *MsgTestSuite) getChannel(cType string, cUUID string) *DBChannel {
return channel.(*DBChannel)
}

func (ts *MsgTestSuite) TestMsgUnmarshal() {
msgJSON := `{
"status": "P",
"direction": "O",
"attachments": null,
"queued_on": null,
"text": "Test message 21",
"contact_id": 30,
"contact_urn_id": 14,
"error_count": 0,
"modified_on": "2017-07-21T19:22:23.254133Z",
"id": 204,
"channel_uuid": "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba",
"uuid": "54c893b9-b026-44fc-a490-50aed0361c3f",
"next_attempt": "2017-07-21T19:22:23.254182Z",
"urn": "telegram:3527065",
"org_id": 1,
"created_on": "2017-07-21T19:22:23.242757Z",
"sent_on": null,
"priority": 1000,
"channel_id": 11,
"response_to_id": 15,
"external_id": null
}`

msg := DBMsg{}
err := json.Unmarshal([]byte(msgJSON), &msg)
ts.NoError(err)
ts.Equal(msg.ChannelUUID_.String(), "f3ad3eb6-d00d-4dc3-92e9-9f34f32940ba")
ts.Equal(msg.ChannelID_, courier.ChannelID(11))
ts.Equal(msg.ExternalID_, "")
}

func (ts *MsgTestSuite) TestCheckMsgExists() {
knChannel := ts.getChannel("KN", "dbc126ed-66bc-4e28-b67b-81dc3327c95d")

Expand Down
2 changes: 1 addition & 1 deletion backends/rapidpro/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func getChannel(b *backend, channelType courier.ChannelType, channelUUID courier
const lookupChannelFromUUIDSQL = `
SELECT org_id, id, uuid, channel_type, scheme, address, country, config
FROM channels_channel
WHERE uuid = $1 AND is_active = true`
WHERE uuid = $1 AND is_active = true AND org_id IS NOT NULL`

// ChannelForUUID attempts to look up the channel with the passed in UUID, returning it
func loadChannelFromDB(b *backend, channel *DBChannel, channelType courier.ChannelType, uuid courier.ChannelUUID) error {
Expand Down
6 changes: 4 additions & 2 deletions backends/rapidpro/contact.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package rapidpro
import (
"time"

null "gopkg.in/guregu/null.v3"

"database/sql"

"github.com/jmoiron/sqlx"
Expand All @@ -12,11 +14,11 @@ import (

// ContactID is our representation of our database contact id
type ContactID struct {
sql.NullInt64
null.Int
}

// NilContactID represents our nil value for ContactID
var NilContactID = ContactID{sql.NullInt64{Int64: 0, Valid: false}}
var NilContactID = ContactID{null.NewInt(0, false)}

const insertContactSQL = `
INSERT INTO contacts_contact(org_id, is_active, is_blocked, is_test, is_stopped, uuid, created_on, modified_on, created_by_id, modified_by_id, name)
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
)

const insertLogSQL = `
INSERT INTO channels_channellog("channel_id", "msg_id", "description", "is_error", "url", "request", "response", "response_status", "created_on", "request_time")
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
INSERT INTO channels_channellog("channel_id", "msg_id", "description", "is_error", "method", "url", "request", "response", "response_status", "created_on", "request_time")
VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
`

// WriteChannelLog writes the passed in channel log to the database, we do not queue on errors but instead just throw away the log
Expand All @@ -26,7 +26,7 @@ func writeChannelLog(b *backend, log *courier.ChannelLog) error {
description = fmt.Sprintf("Error: %s", log.Error)
}

_, err := b.db.Exec(insertLogSQL, dbChan.ID(), log.MsgID, description, log.Error != "", log.URL,
_, err := b.db.Exec(insertLogSQL, dbChan.ID(), log.MsgID, description, log.Error != "", log.Method, log.URL,
log.Request, log.Response, log.StatusCode, log.CreatedOn, log.Elapsed/time.Millisecond)

return err
Expand Down
8 changes: 5 additions & 3 deletions backends/rapidpro/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (

func notifyRapidPro(config *config.Courier, body url.Values) error {
// build our request
req, err := http.NewRequest("POST", config.RapidproHandleURL, strings.NewReader(body.Encode()))
req, err := http.NewRequest("POST", fmt.Sprintf(config.RapidproHandleURL, body.Get("action")), strings.NewReader(body.Encode()))

// this really should never happen, but if it does we only log it
if err != nil {
Expand All @@ -24,7 +24,7 @@ func notifyRapidPro(config *config.Courier, body url.Values) error {
}

req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
req.Header.Set("AUTHORIZATION", fmt.Sprintf("Token %s", config.RapidproToken))
req.Header.Set("Authorization", fmt.Sprintf("Token %s", config.RapidproToken))
_, err = utils.MakeHTTPRequest(req)

return err
Expand All @@ -39,12 +39,14 @@ func newNotifier(config *config.Courier) *notifier {

func (n *notifier) addHandleMsgNotification(msgID courier.MsgID) {
body := url.Values{}
body.Add("action", "handle_message")
body.Add("message_id", msgID.String())
n.notifications <- body
}

func (n *notifier) addStopContactNotification(contactID ContactID) {
body := url.Values{}
body.Add("action", "stop_contact")
body.Add("contact_id", fmt.Sprintf("%d", contactID.Int64))
n.notifications <- body
}
Expand All @@ -68,7 +70,7 @@ func (n *notifier) start(backend *backend) {
// we failed, append it to our retries
if err != nil {
if !lastError {
log.WithError(err).Error("error notifying rapidpro")
log.WithError(err).WithField("body", body).Error("error notifying rapidpro")
}
n.retries = append(n.retries, body)
lastError = true
Expand Down
6 changes: 3 additions & 3 deletions backends/rapidpro/org.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package rapidpro

import "database/sql"
import null "gopkg.in/guregu/null.v3"

// OrgID is our type for database Org ids
type OrgID struct {
sql.NullInt64
null.Int
}

// NilOrgID is our nil value for OrgID
var NilOrgID = OrgID{sql.NullInt64{Int64: 0, Valid: false}}
var NilOrgID = OrgID{null.NewInt(0, false)}
6 changes: 4 additions & 2 deletions backends/rapidpro/urn.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ import (
"database/sql"
"strings"

null "gopkg.in/guregu/null.v3"

"github.com/jmoiron/sqlx"
"github.com/nyaruka/courier"
)

// ContactURNID represents a contact urn's id
type ContactURNID struct {
sql.NullInt64
null.Int
}

// NilContactURNID is our nil value for ContactURNID
var NilContactURNID = ContactURNID{sql.NullInt64{Int64: 0, Valid: false}}
var NilContactURNID = ContactURNID{null.NewInt(0, false)}

// NewDBContactURN returns a new ContactURN object for the passed in org, contact and string urn, this is not saved to the DB yet
func newDBContactURN(org OrgID, channelID courier.ChannelID, contactID ContactID, urn courier.URN) *DBContactURN {
Expand Down
2 changes: 1 addition & 1 deletion handlers/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {

// build our request
form := map[string]string{
"id": fmt.Sprintf("%d", msg.ID()),
"id": msg.ID().String(),
"text": courier.GetTextAndAttachments(msg),
"to": msg.URN().Path(),
"to_no_plus": strings.TrimPrefix(msg.URN().Path(), "+"),
Expand Down
2 changes: 1 addition & 1 deletion handlers/kannel/kannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
return nil, fmt.Errorf("no send url set for KN channel")
}

dlrURL := fmt.Sprintf("%s%s%s/?id=%d&status=%%d", h.Server().Config().BaseURL, "/c/kn/", msg.Channel().UUID(), msg.ID())
dlrURL := fmt.Sprintf("%s%s%s/?id=%s&status=%%s", h.Server().Config().BaseURL, "/c/kn/", msg.Channel().UUID(), msg.ID().String())

// build our request
form := url.Values{
Expand Down
4 changes: 3 additions & 1 deletion handlers/telegram/telegram.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func (h *handler) ReceiveMessage(channel courier.Channel, w http.ResponseWriter,
func (h *handler) sendMsgPart(msg courier.Msg, token string, path string, form url.Values) (string, *courier.ChannelLog, error) {
sendURL := fmt.Sprintf("%s/bot%s/%s", telegramAPIURL, token, path)
req, err := http.NewRequest(http.MethodPost, sendURL, strings.NewReader(form.Encode()))
req.Header.Add("Content-Type", "application/x-www-form-urlencoded")

rr, err := utils.MakeHTTPRequest(req)

// build our channel log
Expand Down Expand Up @@ -207,7 +209,7 @@ func (h *handler) SendMsg(msg courier.Msg) (courier.MsgStatus, error) {
status.AddLog(log)

default:
status.AddLog(courier.NewChannelLog(msg.Channel(), msg.ID(), "", courier.NilStatusCode,
status.AddLog(courier.NewChannelLog(msg.Channel(), msg.ID(), "", "", courier.NilStatusCode,
fmt.Errorf("unknown media type: %s", mediaType), "", "", time.Duration(0), time.Now()))
hasError = true
}
Expand Down
5 changes: 4 additions & 1 deletion log.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
const NilStatusCode int = 417

// NewChannelLog creates a new channel log for the passed in channel, id, and request and response info
func NewChannelLog(channel Channel, msgID MsgID, url string, statusCode int, err error,
func NewChannelLog(channel Channel, msgID MsgID, method string, url string, statusCode int, err error,
request string, response string, elapsed time.Duration, createdOn time.Time) *ChannelLog {

errString := ""
Expand All @@ -22,6 +22,7 @@ func NewChannelLog(channel Channel, msgID MsgID, url string, statusCode int, err
return &ChannelLog{
Channel: channel,
MsgID: msgID,
Method: method,
URL: url,
StatusCode: statusCode,
Error: errString,
Expand All @@ -37,6 +38,7 @@ func NewChannelLogFromRR(channel Channel, msgID MsgID, rr *utils.RequestResponse
return &ChannelLog{
Channel: channel,
MsgID: msgID,
Method: rr.Method,
URL: rr.URL,
StatusCode: rr.StatusCode,
Error: "",
Expand All @@ -57,6 +59,7 @@ func (l *ChannelLog) String() string {
type ChannelLog struct {
Channel Channel
MsgID MsgID
Method string
URL string
StatusCode int
Error string
Expand Down
42 changes: 12 additions & 30 deletions msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,57 +2,39 @@ package courier

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"time"

null "gopkg.in/guregu/null.v3"

uuid "github.com/satori/go.uuid"
)

// ErrMsgNotFound is returned when trying to queue the status for a Msg that doesn't exit
var ErrMsgNotFound = errors.New("message not found")

// MsgID is our typing of the db int type
type MsgID int64
type MsgID struct {
null.Int
}

// NewMsgID creates a new MsgID for the passed in int64
func NewMsgID(id int64) MsgID {
return MsgID(id)
}

// UnmarshalText satisfies text unmarshalling so ids can be decoded from forms
func (i *MsgID) UnmarshalText(text []byte) (err error) {
id, err := strconv.ParseInt(string(text), 10, 64)
*i = MsgID(id)
if err != nil {
return err
}
return err
}

// UnmarshalJSON satisfies json unmarshalling so ids can be decoded from JSON
func (i *MsgID) UnmarshalJSON(bytes []byte) (err error) {
var id int64
err = json.Unmarshal(bytes, &id)
*i = MsgID(id)
return err
}

// MarshalJSON satisfies json marshalling so ids can be encoded to JSON
func (i *MsgID) MarshalJSON() ([]byte, error) {
return json.Marshal(int64(*i))
return MsgID{null.NewInt(id, true)}
}

// String satisfies the Stringer interface
func (i *MsgID) String() string {
return fmt.Sprintf("%d", i)
func (i MsgID) String() string {
if i.Valid {
return strconv.FormatInt(i.Int64, 10)
}
return "null"
}

// NilMsgID is our nil value for MsgID
var NilMsgID = MsgID(0)
var NilMsgID = MsgID{null.NewInt(0, false)}

// MsgUUID is the UUID of a message which has been received
type MsgUUID struct {
Expand Down
Loading

0 comments on commit 1003787

Please sign in to comment.