Skip to content

Commit

Permalink
feat: able to get message sending status now (#58)
Browse files Browse the repository at this point in the history
  • Loading branch information
songquanpeng committed May 7, 2023
1 parent 1cade7a commit 4530104
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 27 deletions.
16 changes: 9 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,17 @@ _✨ 搭建专属于你的消息推送服务,支持多种消息推送方式,
+ Telegram 机器人,
+ Discord 群机器人,
+ 群组消息,可以将多个推送通道组合成一个群组,然后向群组发送消息,可以实现一次性推送到多个渠道的功能。
2. 多种用户登录注册方式:
2. 支持在 Web 端管理发送的消息,支持自动刷新。
3. 支持异步消息。
4. 多种用户登录注册方式:
+ 邮箱登录注册以及通过邮箱进行密码重置。
+ [GitHub 开放授权](https://github.com/settings/applications/new)
+ 微信公众号授权(需要额外部署 [WeChat Server](https://github.com/songquanpeng/wechat-server))。
3. 支持 Markdown。
4. 支持用户管理。
5. Cloudflare Turnstile 用户校验。
6. 支持在线发布公告,设置关于界面以及页脚。
7. 支持在 Web 端管理发送的消息,支持自动刷新。
8. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)
5. 支持 Markdown。
6. 支持用户管理。
7. Cloudflare Turnstile 用户校验。
8. 支持在线发布公告,设置关于界面以及页脚。
9. API 兼容其他消息推送服务,例如 [Server 酱](https://sct.ftqq.com/)

## 用途
1. [整合进自己的博客系统,每当有人登录时发微信消息提醒](https://github.com/songquanpeng/blog/blob/486d63e96ef7906a6c767653a20ec2d3278e9a4a/routes/user.js#L27)
Expand Down Expand Up @@ -180,6 +181,7 @@ proxy_send_timeout 300s;
7. `to`:选填,推送给指定用户,如果不填则默认推送给自己,受限于具体的消息推送方式,有些推送方式不支持此项。
1. `@all`:推送给所有用户。
2. `user1|user2|user3`:推送给多个用户,用户之间使用 `|` 分隔。
8. `async`:选填,如果设置为 `true` 则消息推送将在后台异步进行,返回结果包含 `uuid` 字段,可用于后续[获取消息发送状态](./docs/API.md#通过消息 UUID 获取消息发送状态)。
3. `POST` 请求方式:字段与上面 `GET` 请求方式保持一致。
+ 注意:请求体编码格式为 `application/json`,`v0.3.2` 版本起支持 Post Form。
Expand Down
63 changes: 63 additions & 0 deletions channel/message-queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package channel

import (
"message-pusher/common"
"message-pusher/model"
)

var AsyncMessageQueue chan int
var AsyncMessageQueueSize = 128
var AsyncMessageSenderNum = 2

func init() {
AsyncMessageQueue = make(chan int, AsyncMessageQueueSize)
for i := 0; i < AsyncMessageSenderNum; i++ {
go asyncMessageSender()
}
}

// LoadAsyncMessages loads async pending messages from database.
// We have to wait the database connection is ready.
func LoadAsyncMessages() {
ids, err := model.GetAsyncPendingMessageIds()
if err != nil {
common.FatalLog("failed to load async pending messages: " + err.Error())
}
for _, id := range ids {
AsyncMessageQueue <- id
}
}

func asyncMessageSenderHelper(message *model.Message) error {
user, err := model.GetUserById(message.UserId, false)
if err != nil {
return err
}
channel_, err := model.GetChannelByName(message.Channel, user.Id)
if err != nil {
return err
}
return SendMessage(message, user, channel_)
}

func asyncMessageSender() {
for {
id := <-AsyncMessageQueue
message, err := model.GetMessageById(id)
if err != nil {
common.SysError("async message sender error: " + err.Error())
continue
}
err = asyncMessageSenderHelper(message)
status := common.MessageSendStatusFailed
if err != nil {
common.SysError("async message sender error: " + err.Error())
} else {
status = common.MessageSendStatusSent
}
err = message.UpdateStatus(status)
if err != nil {
common.SysError("async message sender error: " + err.Error())
}
}
}
9 changes: 5 additions & 4 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,11 @@ const (
)

const (
MessageSendStatusUnknown = 0
MessageSendStatusPending = 1
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
MessageSendStatusUnknown = 0
MessageSendStatusPending = 1
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
MessageSendStatusAsyncPending = 4
)

const (
Expand Down
49 changes: 41 additions & 8 deletions controller/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ func GetPushMessage(c *gin.Context) {
Desp: c.Query("desp"),
Short: c.Query("short"),
OpenId: c.Query("openid"),
Async: c.Query("async") == "true",
}
keepCompatible(&message)
pushMessageHelper(c, &message)
Expand All @@ -55,6 +56,7 @@ func PostPushMessage(c *gin.Context) {
Desp: c.PostForm("desp"),
Short: c.PostForm("short"),
OpenId: c.PostForm("openid"),
Async: c.PostForm("async") == "true",
}
if message == (model.Message{}) {
// Looks like the user is using JSON
Expand Down Expand Up @@ -142,13 +144,15 @@ func pushMessageHelper(c *gin.Context, message *model.Message) {
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"uuid": message.Link,
})
}

func saveAndSendMessage(user *model.User, message *model.Message, channel_ *model.Channel) error {
if channel_.Status != common.ChannelStatusEnabled {
return errors.New("该渠道已被禁用")
}
common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
message.Link = common.GetUUID()
if message.URL == "" {
message.URL = fmt.Sprintf("%s/message/%s", common.ServerAddress, message.Link)
Expand All @@ -158,25 +162,36 @@ func saveAndSendMessage(user *model.User, message *model.Message, channel_ *mode
defer func() {
// Update the status of the message
status := common.MessageSendStatusFailed
if success {
status = common.MessageSendStatusSent
if message.Async {
status = common.MessageSendStatusAsyncPending
} else {
if success {
status = common.MessageSendStatusSent
}
}
err := message.UpdateStatus(status)
if err != nil {
common.SysError("failed to update the status of the message: " + err.Error())
}
if message.Async {
channel.AsyncMessageQueue <- message.Id
}
}()
err := message.UpdateAndInsert(user.Id)
if err != nil {
return err
}
} else {
if message.Async {
return errors.New("异步发送消息需要用户具备消息持久化的权限")
}
message.Link = "unsaved" // This is for user to identify whether the message is saved
}
err := channel.SendMessage(message, user, channel_)
common.MessageCount += 1 // We don't need to use atomic here because it's not a critical value
if err != nil {
return err
if !message.Async {
err := channel.SendMessage(message, user, channel_)
if err != nil {
return err
}
}
success = true
return nil // After this line, the message status will be updated
Expand Down Expand Up @@ -258,7 +273,7 @@ func GetUserMessages(c *gin.Context) {
func GetMessage(c *gin.Context) {
messageId, _ := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
message, err := model.GetMessageById(messageId, userId)
message, err := model.GetMessageByIds(messageId, userId)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
Expand All @@ -274,6 +289,24 @@ func GetMessage(c *gin.Context) {
return
}

func GetMessageStatus(c *gin.Context) {
link := c.Param("link")
status, err := model.GetMessageStatusByLink(link)
if err != nil {
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
})
return
}
c.JSON(http.StatusOK, gin.H{
"success": true,
"message": "",
"status": status,
})
return
}

func SearchMessages(c *gin.Context) {
keyword := c.Query("keyword")
messages, err := model.SearchMessages(keyword)
Expand All @@ -296,7 +329,7 @@ func ResendMessage(c *gin.Context) {
messageId, _ := strconv.Atoi(c.Param("id"))
userId := c.GetInt("id")
helper := func() error {
message, err := model.GetMessageById(messageId, userId)
message, err := model.GetMessageByIds(messageId, userId)
message.Id = 0
if err != nil {
return err
Expand Down
27 changes: 26 additions & 1 deletion docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,29 @@
1. 官方 WebSocket 桌面客户端实现:https://github.com/songquanpeng/personal-assistant
2. 待补充

欢迎在此提交你的客户端实现。
欢迎在此提交你的客户端实现。


## 通过消息 UUID 获取消息发送状态
1. API 端点为:`https://<domain>:<port>/api/message/status/<uuid>`
2. 由于使用的是消息的 UUID 而非 ID,因此此处不需要鉴权,
3. 返回内容示例:
```json
{
"success": true,
"message": "",
"status": 2
}
```
4. 返回内容字段:
1. `success`:本次请求是否成功
2. `message`:错误信息
3. `status`:消息状态码。
5. 消息状态码定义如下:
```
MessageSendStatusUnknown = 0
MessageSendStatusPending = 1
MessageSendStatusSent = 2
MessageSendStatusFailed = 3
MessageSendStatusAsyncPending = 4
```
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func main() {
if err != nil {
common.FatalLog(err)
}
go channel.LoadAsyncMessages()
defer func() {
err := model.CloseDB()
if err != nil {
Expand Down
36 changes: 30 additions & 6 deletions model/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,15 @@ type Message struct {
HTMLContent string `json:"html_content" gorm:"-:all"`
Timestamp int64 `json:"timestamp" gorm:"type:bigint"`
Link string `json:"link" gorm:"unique;index"`
To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s)
Status int `json:"status" gorm:"default:0"` // pending, sent, failed
OpenId string `json:"openid" gorm:"-:all"` // alias for to
Desp string `json:"desp" gorm:"-:all"` // alias for content
Short string `json:"short" gorm:"-:all"` // alias for description
To string `json:"to" gorm:"column:to"` // if specified, will send to this user(s)
Status int `json:"status" gorm:"default:0;index"` // pending, sent, failed
OpenId string `json:"openid" gorm:"-:all"` // alias for to
Desp string `json:"desp" gorm:"-:all"` // alias for content
Short string `json:"short" gorm:"-:all"` // alias for description
Async bool `json:"async" gorm:"-"` // if true, will send message asynchronously
}

func GetMessageById(id int, userId int) (*Message, error) {
func GetMessageByIds(id int, userId int) (*Message, error) {
if id == 0 || userId == 0 {
return nil, errors.New("id 或 userId 为空!")
}
Expand All @@ -34,6 +35,20 @@ func GetMessageById(id int, userId int) (*Message, error) {
return &message, err
}

func GetMessageById(id int) (*Message, error) {
if id == 0 {
return nil, errors.New("id 为空!")
}
message := Message{Id: id}
err := DB.Where(message).First(&message).Error
return &message, err
}

func GetAsyncPendingMessageIds() (ids []int, err error) {
err = DB.Model(&Message{}).Where("status = ?", common.MessageSendStatusAsyncPending).Pluck("id", &ids).Error
return ids, err
}

func GetMessageByLink(link string) (*Message, error) {
if link == "" {
return nil, errors.New("link 为空!")
Expand All @@ -43,6 +58,15 @@ func GetMessageByLink(link string) (*Message, error) {
return &message, err
}

func GetMessageStatusByLink(link string) (int, error) {
if link == "" {
return common.MessageSendStatusUnknown, errors.New("link 为空!")
}
message := Message{}
err := DB.Where("link = ?", link).Select("status").First(&message).Error
return message.Status, err
}

func GetMessagesByUserId(userId int, startIdx int, num int) (messages []*Message, err error) {
err = DB.Where("user_id = ?", userId).Order("id desc").Limit(num).Offset(startIdx).Find(&messages).Error
return messages, err
Expand Down
1 change: 1 addition & 0 deletions router/api-router.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func SetApiRouter(router *gin.Engine) {
{
messageRoute.GET("/", middleware.UserAuth(), controller.GetUserMessages)
messageRoute.GET("/search", middleware.UserAuth(), controller.SearchMessages)
messageRoute.GET("/status/:link", controller.GetMessageStatus)
messageRoute.POST("/resend/:id", middleware.UserAuth(), controller.ResendMessage)
messageRoute.GET("/:id", middleware.UserAuth(), controller.GetMessage)
messageRoute.DELETE("/", middleware.RootAuth(), controller.DeleteAllMessages)
Expand Down
8 changes: 7 additions & 1 deletion web/src/components/MessagesTable.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ function renderStatus(status) {
case 1:
return (
<Label basic color='olive'>
投递中...
正在投递
</Label>
);
case 2:
Expand All @@ -32,6 +32,12 @@ function renderStatus(status) {
发送失败
</Label>
);
case 4:
return (
<Label basic color='orange'>
已在队列
</Label>
);
default:
return (
<Label basic color='grey'>
Expand Down

0 comments on commit 4530104

Please sign in to comment.