Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add table of sending message for set message status on weak net . #398

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion internal/conversation_msg/sdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,12 @@ func (c *Conversation) updateMsgStatusAndTriggerConversation(ctx context.Context
s.ServerMsgID = serverMsgID
err := c.db.UpdateMessageTimeAndStatus(ctx, lc.ConversationID, clientMsgID, serverMsgID, sendTime, status)
if err != nil {
// log.Error("", "send message update message status error", sendTime, status, clientMsgID, serverMsgID, err.Error())
log.ZWarn(ctx, "send message update message status error", err,
"sendTime", sendTime, "status", status, "clientMsgID", clientMsgID, "serverMsgID", serverMsgID)
}
err = c.db.DeleteSendingMessage(ctx, lc.ConversationID, clientMsgID)
if err != nil {
log.ZWarn(ctx, "send message delete sending message error", err)
}
lc.LatestMsg = utils.StructToJsonString(s)
lc.LatestMsgSendTime = sendTime
Expand Down Expand Up @@ -395,6 +400,13 @@ func (c *Conversation) SendMessage(ctx context.Context, s *sdk_struct.MsgStruct,
if err != nil {
return nil, err
}
err = c.db.InsertSendingMessage(ctx, &model_struct.LocalSendingMessages{
ConversationID: lc.ConversationID,
ClientMsgID: localMessage.ClientMsgID,
})
if err != nil {
return nil, err
}
} else {
if oldMessage.Status != constant.MsgStatusSendFailed {
return nil, sdkerrs.ErrMsgRepeated
Expand Down Expand Up @@ -614,6 +626,13 @@ func (c *Conversation) SendMessageNotOss(ctx context.Context, s *sdk_struct.MsgS
if err != nil {
return nil, err
}
err = c.db.InsertSendingMessage(ctx, &model_struct.LocalSendingMessages{
ConversationID: lc.ConversationID,
ClientMsgID: localMessage.ClientMsgID,
})
if err != nil {
return nil, err
}
} else {
if oldMessage.Status != constant.MsgStatusSendFailed {
return nil, sdkerrs.ErrMsgRepeated
Expand Down Expand Up @@ -685,6 +704,13 @@ func (c *Conversation) SendMessageByBuffer(ctx context.Context, s *sdk_struct.Ms
if err != nil {
return nil, err
}
err = c.db.InsertSendingMessage(ctx, &model_struct.LocalSendingMessages{
ConversationID: lc.ConversationID,
ClientMsgID: localMessage.ClientMsgID,
})
if err != nil {
return nil, err
}
} else {
if oldMessage.Status != constant.MsgStatusSendFailed {
return nil, sdkerrs.ErrMsgRepeated
Expand Down
27 changes: 27 additions & 0 deletions internal/login/init_login.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package login
import (
"context"
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"sync"
"time"

Expand Down Expand Up @@ -253,6 +254,31 @@ func (u *LoginMgr) setLoginStatus(status int) {
defer u.w.Unlock()
u.loginStatus = status
}
func (u *LoginMgr) checkSendingMessage(ctx context.Context) {
sendingMessages, err := u.db.GetAllSendingMessages(ctx)
if err != nil {
log.ZError(ctx, "GetAllSendingMessages failed", err)
}
for _, message := range sendingMessages {
tableMessage, err := u.db.GetMessage(ctx, message.ConversationID, message.ClientMsgID)
if err != nil {
log.ZError(ctx, "GetMessage failed", err, "message", message)
continue
}
if tableMessage.Status == constant.MsgStatusSending {
err := u.db.UpdateMessage(ctx, message.ConversationID, &model_struct.LocalChatLog{ClientMsgID: message.ClientMsgID, Status: constant.MsgStatusSendFailed})
if err != nil {
log.ZError(ctx, "UpdateMessage failed", err, "tableMessage", tableMessage)
} else {
err := u.db.DeleteSendingMessage(ctx, message.ConversationID, message.ClientMsgID)
if err != nil {
log.ZError(ctx, "DeleteSendingMessage failed", err, "tableMessage", tableMessage)
}
}

}
}
}

func (u *LoginMgr) login(ctx context.Context, userID, token string) error {
if u.getLoginStatus(ctx) == Logged {
Expand All @@ -267,6 +293,7 @@ func (u *LoginMgr) login(ctx context.Context, userID, token string) error {
u.loginUserID = userID
var err error
u.db, err = db.NewDataBase(ctx, userID, u.info.DataDir, int(u.info.LogLevel))
u.checkSendingMessage(ctx)
if err != nil {
return sdkerrs.ErrSdkInternal.Wrap("init database " + err.Error())
}
Expand Down
12 changes: 0 additions & 12 deletions pkg/db/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,18 +483,6 @@ func (d *DataBase) GetAllMessageForTest(ctx context.Context) (result []*model_st
// }
//}

func (d *DataBase) GetSendingMessageList(ctx context.Context) (result []*model_struct.LocalChatLog, err error) {
d.mRWMutex.Lock()
defer d.mRWMutex.Unlock()
var messageList []model_struct.LocalChatLog
err = utils.Wrap(d.conn.WithContext(ctx).Where("status = ?", constant.MsgStatusSending).Find(&messageList).Error, "GetMessageList failed")
for _, v := range messageList {
v1 := v
result = append(result, &v1)
}
return result, err
}

func (d *DataBase) GetTestMessage(ctx context.Context, seq uint32) (*model_struct.LocalChatLog, error) {
d.mRWMutex.Lock()
defer d.mRWMutex.Unlock()
Expand Down
46 changes: 1 addition & 45 deletions pkg/db/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,51 +110,9 @@ func NewDataBase(ctx context.Context, loginUserID string, dbDir string, logLevel
UserDBMap[loginUserID] = dataBase
//log.Info(operationID, "open db", loginUserID)
}
dataBase.setChatLogFailedStatus(ctx)
return dataBase, nil
}

func (d *DataBase) setChatLogFailedStatus(ctx context.Context) {
msgList, err := d.GetSendingMessageList(ctx)
if err != nil {
log.ZError(ctx, "GetSendingMessageList failed", err)
return
}
for _, v := range msgList {
v.Status = constant.MsgStatusSendFailed
//todo
err := d.UpdateMessage(ctx, "", v)
if err != nil {
log.ZError(ctx, "UpdateMessage failed", err, "msg", v)
continue
}
}
groupIDList, err := d.GetReadDiffusionGroupIDList(ctx)
if err != nil {
log.ZError(ctx, "GetReadDiffusionGroupIDList failed", err)
return
}
for _, v := range groupIDList {
msgList, err := d.SuperGroupGetSendingMessageList(ctx, v)
if err != nil {
log.ZError(ctx, "GetSendingMessageList failed", err)
return
}
if len(msgList) > 0 {
for _, v := range msgList {
v.Status = constant.MsgStatusSendFailed
err := d.SuperGroupUpdateMessage(ctx, v)
if err != nil {
log.ZError(ctx, "UpdateMessage failed", err, "msg", v)
continue
}
}
}

}

}

func (d *DataBase) initDB(ctx context.Context, logLevel int) error {
var zLogLevel logger.LogLevel
if d.loginUserID == "" {
Expand Down Expand Up @@ -213,6 +171,7 @@ func (d *DataBase) initDB(ctx context.Context, logLevel int) error {
&model_struct.LocalChatLogReactionExtensions{},
&model_struct.LocalUpload{},
&model_struct.LocalStranger{},
&model_struct.LocalSendingMessages{},
)
if err != nil {
return err
Expand Down Expand Up @@ -246,8 +205,5 @@ func (d *DataBase) initDB(ctx context.Context, logLevel int) error {
}

}
if err := d.InitWorkMomentsNotificationUnreadCount(ctx); err != nil {
log.ZError(ctx, "init InitWorkMomentsNotificationUnreadCount failed", err)
}
return nil
}
35 changes: 20 additions & 15 deletions pkg/db/db_interface/databse.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
)

type GroupDatabase interface {
type GroupModel interface {
InsertGroup(ctx context.Context, groupInfo *model_struct.LocalGroup) error
DeleteGroup(ctx context.Context, groupID string) error
UpdateGroup(ctx context.Context, groupInfo *model_struct.LocalGroup) error
Expand Down Expand Up @@ -72,7 +72,7 @@ type GroupDatabase interface {
SearchGroupMembersDB(ctx context.Context, keyword string, groupID string, isSearchMemberNickname, isSearchUserID bool, offset, count int) (result []*model_struct.LocalGroupMember, err error)
}

type MessageDatabase interface {
type MessageModel interface {
BatchInsertMessageList(ctx context.Context, conversationID string, MessageList []*model_struct.LocalChatLog) error
//BatchInsertMessageListController(ctx context.Context, MessageList []*model_struct.LocalChatLog) error
InsertMessage(ctx context.Context, conversationID string, Message *model_struct.LocalChatLog) error
Expand Down Expand Up @@ -106,7 +106,6 @@ type MessageDatabase interface {
//GetMessageListController(ctx context.Context, sourceID string, sessionType, count int, startTime int64, isReverse bool) (result []*model_struct.LocalChatLog, err error)
GetMessageListNoTime(ctx context.Context, conversationID string, count int, isReverse bool) (result []*model_struct.LocalChatLog, err error)
//GetMessageListNoTimeController(ctx context.Context, sourceID string, sessionType, count int, isReverse bool) (result []*model_struct.LocalChatLog, err error)
GetSendingMessageList(ctx context.Context) (result []*model_struct.LocalChatLog, err error)
MarkConversationMessageAsReadDB(ctx context.Context, conversationID string, msgIDs []string) (rowsAffected int64, err error)
MarkConversationMessageAsReadBySeqs(ctx context.Context, conversationID string, seqs []int64) (rowsAffected int64, err error)
GetUnreadMessage(ctx context.Context, conversationID string) (result []*model_struct.LocalChatLog, err error)
Expand Down Expand Up @@ -177,7 +176,7 @@ type MessageDatabase interface {
GetNotificationAllSeqs(ctx context.Context) ([]*model_struct.NotificationSeqs, error)
}

type ConversationDatabase interface {
type ConversationModel interface {
GetConversationByUserID(ctx context.Context, userID string) (*model_struct.LocalConversation, error)
GetAllConversationListDB(ctx context.Context) ([]*model_struct.LocalConversation, error)
GetHiddenConversationList(ctx context.Context) ([]*model_struct.LocalConversation, error)
Expand Down Expand Up @@ -211,7 +210,7 @@ type ConversationDatabase interface {
SuperGroupSearchAllMessageByContentType(ctx context.Context, superGroupID string, contentType int32) ([]*model_struct.LocalChatLog, error)
}

type UserDatabase interface {
type UserModel interface {
GetLoginUser(ctx context.Context, userID string) (*model_struct.LocalUser, error)
UpdateLoginUser(ctx context.Context, user *model_struct.LocalUser) error
UpdateLoginUserByMap(ctx context.Context, user *model_struct.LocalUser, args map[string]interface{}) error
Expand All @@ -220,7 +219,7 @@ type UserDatabase interface {
SetStrangerInfo(ctx context.Context, localStrangerList []*model_struct.LocalStranger) error
}

type FriendDatabase interface {
type FriendModel interface {
InsertFriend(ctx context.Context, friend *model_struct.LocalFriend) error
DeleteFriendDB(ctx context.Context, friendUserID string) error
UpdateFriend(ctx context.Context, friend *model_struct.LocalFriend) error
Expand All @@ -246,7 +245,7 @@ type FriendDatabase interface {
DeleteBlack(ctx context.Context, blockUserID string) error
}

type ReactionDatabase interface {
type ReactionModel interface {
GetMessageReactionExtension(ctx context.Context, msgID string) (result *model_struct.LocalChatLogReactionExtensions, err error)
InsertMessageReactionExtension(ctx context.Context, messageReactionExtension *model_struct.LocalChatLogReactionExtensions) error
UpdateMessageReactionExtension(ctx context.Context, c *model_struct.LocalChatLogReactionExtensions) error
Expand All @@ -256,22 +255,28 @@ type ReactionDatabase interface {
DeleteMessageReactionExtension(ctx context.Context, msgID string) error
}

type S3Database interface {
type S3Model interface {
GetUpload(ctx context.Context, partHash string) (*model_struct.LocalUpload, error)
InsertUpload(ctx context.Context, upload *model_struct.LocalUpload) error
DeleteUpload(ctx context.Context, partHash string) error
UpdateUpload(ctx context.Context, upload *model_struct.LocalUpload) error
DeleteExpireUpload(ctx context.Context) error
}
type SendingMessagesModel interface {
InsertSendingMessage(ctx context.Context, message *model_struct.LocalSendingMessages) error
DeleteSendingMessage(ctx context.Context, conversationID, clientMsgID string) error
GetAllSendingMessages(ctx context.Context) (friendRequests []*model_struct.LocalSendingMessages, err error)
}

type DataBase interface {
Close(ctx context.Context) error
InitDB(ctx context.Context, userID string, dataDir string) error
GroupDatabase
MessageDatabase
ConversationDatabase
UserDatabase
FriendDatabase
ReactionDatabase
S3Database
GroupModel
MessageModel
ConversationModel
UserModel
FriendModel
ReactionModel
S3Model
SendingMessagesModel
}
2 changes: 2 additions & 0 deletions pkg/db/db_js.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type IndexDB struct {
*indexdb.NotificationSeqs
*indexdb.LocalUpload
*indexdb.LocalStrangers
*indexdb.LocalSendingMessages
loginUserID string
}

Expand Down Expand Up @@ -73,6 +74,7 @@ func NewDataBase(ctx context.Context, loginUserID string, dbDir string, logLevel
NotificationSeqs: indexdb.NewNotificationSeqs(),
LocalUpload: indexdb.NewLocalUpload(),
LocalStrangers: indexdb.NewLocalStrangers(),
LocalSendingMessages: indexdb.NewLocalSendingMessages(),
loginUserID: loginUserID,
}
err := i.InitDB(ctx, loginUserID, dbDir)
Expand Down
10 changes: 10 additions & 0 deletions pkg/db/model_struct/data_model_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -511,3 +511,13 @@ type LocalStranger struct {
func (LocalStranger) TableName() string {
return "local_stranger"
}

type LocalSendingMessages struct {
ConversationID string `gorm:"column:conversation_id;primary_key;type:char(128)" json:"conversationID"`
ClientMsgID string `gorm:"column:client_msg_id;primary_key;type:char(64)" json:"clientMsgID"`
Ex string `gorm:"column:ex;type:varchar(1024)" json:"ex"`
}

func (LocalSendingMessages) TableName() string {
return "local_sending_messages"
}
43 changes: 43 additions & 0 deletions pkg/db/sending_messages_model.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright © 2023 OpenIM SDK. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !js
// +build !js

package db

import (
"context"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
)

func (d *DataBase) InsertSendingMessage(ctx context.Context, message *model_struct.LocalSendingMessages) error {
d.mRWMutex.Lock()
defer d.mRWMutex.Unlock()
return utils.Wrap(d.conn.WithContext(ctx).Create(message).Error, "InsertSendingMessage failed")
}

func (d *DataBase) DeleteSendingMessage(ctx context.Context, conversationID, clientMsgID string) error {
d.groupMtx.Lock()
defer d.groupMtx.Unlock()
localSendingMessage := model_struct.LocalSendingMessages{ConversationID: conversationID, ClientMsgID: clientMsgID}
return utils.Wrap(d.conn.WithContext(ctx).Delete(&localSendingMessage).Error, "DeleteSendingMessage failed")
}
func (d *DataBase) GetAllSendingMessages(ctx context.Context) (friendRequests []*model_struct.LocalSendingMessages, err error) {
d.friendMtx.Lock()
defer d.friendMtx.Unlock()
err = utils.Wrap(d.conn.WithContext(ctx).Find(&friendRequests).Error, "GetAllSendingMessages failed")
return friendRequests, utils.Wrap(err, "GetAllSendingMessages failed")
}
23 changes: 0 additions & 23 deletions wasm/indexdb/chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,29 +53,6 @@ func (i *LocalChatLogs) GetMessage(ctx context.Context, conversationID, clientMs
}
}

// GetSendingMessageList gets the list of messages that are being sent
func (i *LocalChatLogs) GetSendingMessageList(ctx context.Context) (result []*model_struct.LocalChatLog, err error) {
msgList, err := exec.Exec()
if err != nil {
return nil, err
} else {
if v, ok := msgList.(string); ok {
var temp []model_struct.LocalChatLog
err := utils.JsonStringToStruct(v, &temp)
if err != nil {
return nil, err
}
for _, v := range temp {
v1 := v
result = append(result, &v1)
}
return result, err
} else {
return nil, exec.ErrType
}
}
}

// UpdateMessage updates the message in the database
func (i *LocalChatLogs) UpdateMessage(ctx context.Context, conversationID string, c *model_struct.LocalChatLog) error {
if c.ClientMsgID == "" {
Expand Down
Loading
Loading