Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pull message unread count remove. #404

Merged
merged 4 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions internal/conversation_msg/create_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@ package conversation_msg
import (
"context"
"errors"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"

"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"
Expand Down Expand Up @@ -161,14 +162,17 @@ func (c *Conversation) CreateVideoMessageFromFullPath(ctx context.Context, video
//log.Error("internal", "open file failed: ", err, videoFullPath)
return nil, err
}
log.Info("internal", "videoFullPath dstFile", videoFullPath, dstFile, written)
log.ZDebug(ctx, "videoFullPath dstFile", "videoFullPath", videoFullPath,
"dstFile", dstFile, "written", written)

dstFile = utils.FileTmpPath(snapshotFullPath, c.DataDir) //a->b
sWritten, err := utils.CopyFile(snapshotFullPath, dstFile)
if err != nil {
//log.Error("internal", "open file failed: ", err, snapshotFullPath)
return nil, err
}
log.Info("internal", "snapshotFullPath dstFile", snapshotFullPath, dstFile, sWritten)
log.ZDebug(ctx, "snapshotFullPath dstFile", "snapshotFullPath", snapshotFullPath,
"dstFile", dstFile, "sWritten", sWritten)

s := sdk_struct.MsgStruct{}
err = c.initBasicInfo(ctx, &s, constant.UserMsgType, constant.Video)
Expand All @@ -194,7 +198,7 @@ func (c *Conversation) CreateVideoMessageFromFullPath(ctx context.Context, video
if snapshotFullPath != "" {
imageInfo, err := getImageInfo(s.VideoElem.SnapshotPath)
if err != nil {
log.Error("internal", "get Image Attributes error", err.Error())
log.ZError(ctx, "getImageInfo err:", err, "snapshotFullPath", snapshotFullPath)
return nil, err
}
s.VideoElem.SnapshotHeight = imageInfo.Height
Expand Down Expand Up @@ -398,7 +402,7 @@ func (c *Conversation) CreateVideoMessage(ctx context.Context, videoPath string,
}
fi, err := os.Stat(s.VideoElem.VideoPath)
if err != nil {
log.Error("internal", "get video file error", err.Error())
log.ZDebug(ctx, "get video file error", "videoPath", videoPath, "snapshotPath", snapshotPath)
return nil, err
}
s.VideoElem.VideoSize = fi.Size()
Expand Down Expand Up @@ -473,7 +477,8 @@ func (c *Conversation) CreateFaceMessage(ctx context.Context, index int, data st

func (c *Conversation) CreateForwardMessage(ctx context.Context, s *sdk_struct.MsgStruct) (*sdk_struct.MsgStruct, error) {
if s.Status != constant.MsgStatusSendSuccess {
log.Error("internal", "only send success message can be Forward")
log.ZError(ctx, "only send success message can be Forward",
errors.New("only send success message can be Forward"))
return nil, errors.New("only send success message can be Forward")
}
err := c.initBasicInfo(ctx, s, constant.UserMsgType, s.ContentType)
Expand Down
12 changes: 0 additions & 12 deletions internal/interaction/msg_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,6 @@ func (m *MsgSyncer) compareSeqsAndBatchSync(ctx context.Context, maxSeqToSync ma
_ = m.syncAndTriggerMsgs(m.ctx, needSyncSeqMap, pullNums)
}

func (m *MsgSyncer) compareSeqsAndSync(maxSeqToSync map[string]int64) {
for conversationID, maxSeq := range maxSeqToSync {
if syncedMaxSeq, ok := m.syncedMaxSeqs[conversationID]; ok {
if maxSeq > syncedMaxSeq {
_ = m.syncAndTriggerMsgs(m.ctx, map[string][2]int64{conversationID: {syncedMaxSeq, maxSeq}}, defaultPullNums)
}
} else {
_ = m.syncAndTriggerMsgs(m.ctx, map[string][2]int64{conversationID: {syncedMaxSeq, maxSeq}}, defaultPullNums)
}
}
}

func (m *MsgSyncer) doPushMsg(ctx context.Context, push *sdkws.PushMessages) {
log.ZDebug(ctx, "push msgs", "push", push, "syncedMaxSeqs", m.syncedMaxSeqs)
m.pushTriggerAndSync(ctx, push.Msgs, m.triggerConversation)
Expand Down
9 changes: 0 additions & 9 deletions open_im_sdk/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,66 +16,57 @@ package open_im_sdk

import (
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
)

func SetGroupListener(callback open_im_sdk_callback.OnGroupListener) {
if callback == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetGroupListener(callback)
}

func SetConversationListener(listener open_im_sdk_callback.OnConversationListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetConversationListener(listener)
}
func SetAdvancedMsgListener(listener open_im_sdk_callback.OnAdvancedMsgListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetAdvancedMsgListener(listener)
}
func SetBatchMsgListener(listener open_im_sdk_callback.OnBatchMsgListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetBatchMsgListener(listener)
}

func SetUserListener(listener open_im_sdk_callback.OnUserListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetUserListener(listener)
}

func SetFriendListener(listener open_im_sdk_callback.OnFriendshipListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetFriendListener(listener)
}

func SetCustomBusinessListener(listener open_im_sdk_callback.OnCustomBusinessListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetBusinessListener(listener)
}

func SetMessageKvInfoListener(listener open_im_sdk_callback.OnMessageKvInfoListener) {
if listener == nil || UserForSDK == nil {
log.Error("callback or UserForSDK is nil")
return
}
UserForSDK.SetMessageKvInfoListener(listener)
Expand Down
3 changes: 2 additions & 1 deletion open_im_sdk/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,8 @@ func syncCall(operationID string, fn any, args ...any) string {
v := reflect.New(tag)
if args[i].(string) != "" {
if err := json.Unmarshal([]byte(args[i].(string)), v.Interface()); err != nil {
log.ZWarn(ctx, "json.Unmarshal error", err, "function name", funcName, "arg", args[i], "v", v.Interface())
log.ZWarn(ctx, "json.Unmarshal error", err, "function name", funcName,
"arg", args[i], "v", v.Interface())
//callback.OnError(constant.ErrArgs.ErrCode, err.Error())
return ""
}
Expand Down
28 changes: 4 additions & 24 deletions pkg/common/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"github.com/mitchellh/mapstructure"
"github.com/openimsdk/openim-sdk-core/v3/open_im_sdk_callback"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
"github.com/openimsdk/openim-sdk-core/v3/pkg/sdkerrs"
"github.com/openimsdk/openim-sdk-core/v3/pkg/server_api_params"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
Expand All @@ -35,7 +34,7 @@ import (
func CheckAnyErrCallback(callback open_im_sdk_callback.Base, errCode int32, err error, operationID string) {
if err != nil {
errInfo := "operationID[" + operationID + "], " + "info[" + err.Error() + "]"
log.NewError(operationID, "checkErr ", errInfo)
//log.NewError(operationID, "checkErr ", errInfo)
callback.OnError(errCode, errInfo)
runtime.Goexit()
}
Expand All @@ -60,25 +59,6 @@ func CheckArgsErrCallback(callback open_im_sdk_callback.Base, err error, operati
CheckAnyErrCallback(callback, sdkerrs.ArgsError, err, operationID)
}

func CheckErrAndRespCallback(callback open_im_sdk_callback.Base, err error, resp []byte, output interface{}, operationID string) {
log.Debug(operationID, utils.GetSelfFuncName(), "args: ", string(resp))
if err = CheckErrAndResp(err, resp, output, nil); err != nil {
log.Error(operationID, "CheckErrAndResp failed ", err.Error(), "input: ", string(resp))
callback.OnError(sdkerrs.SdkInternalError, err.Error())
runtime.Goexit()
}
}

func CheckErrAndRespCallbackPenetrate(callback open_im_sdk_callback.Base, err error, resp []byte, output interface{}, operationID string) {
log.Debug(operationID, utils.GetSelfFuncName(), "args: ", string(resp))
var penetrateErrCode int32
if err = CheckErrAndResp(err, resp, output, &penetrateErrCode); err != nil {
log.Error(operationID, "CheckErrAndResp failed ", err.Error(), "input: ", string(resp), penetrateErrCode)
callback.OnError(penetrateErrCode, utils.Unwrap(err).Error())
runtime.Goexit()
}
}

//
//funcation CheckErrAndResp2(err error, resp []byte, output interface{}) error {
// if err != nil {
Expand Down Expand Up @@ -164,7 +144,7 @@ one:

err = json.Unmarshal(resp, &c2)
if err != nil {
log.Error("json.Unmarshal failed ", string(resp), "err: ", err.Error())
//log.Error("json.Unmarshal failed ", string(resp), "err: ", err.Error())
return utils.Wrap(err, "")
}
if c2.ErrCode != 0 {
Expand All @@ -187,7 +167,7 @@ func JsonUnmarshalAndArgsValidate(s string, args interface{}, callback open_im_s
err := json.Unmarshal([]byte(s), args)
if err != nil {
if callback != nil {
log.NewError(operationID, "Unmarshal failed ", err.Error(), s)
//log.NewError(operationID, "Unmarshal failed ", err.Error(), s)
callback.OnError(sdkerrs.ArgsError, err.Error())
runtime.Goexit()
} else {
Expand All @@ -210,7 +190,7 @@ func JsonUnmarshalCallback(s string, args interface{}, callback open_im_sdk_call
err := json.Unmarshal([]byte(s), args)
if err != nil {
if callback != nil {
log.NewError(operationID, "Unmarshal failed ", err.Error(), s)
//log.NewError(operationID, "Unmarshal failed ", err.Error(), s)
callback.OnError(sdkerrs.ArgsError, err.Error())
runtime.Goexit()
} else {
Expand Down
5 changes: 1 addition & 4 deletions pkg/db/group_member_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
)

Expand Down Expand Up @@ -276,12 +275,10 @@ func (d *DataBase) SearchGroupMembersDB(ctx context.Context, keyword string, gro
if groupID != "" {
condition = "( " + condition + " ) "
condition += " and group_id IN ? "
log.Debug("", "subCondition SearchGroupMembers ", condition)
err = d.conn.WithContext(ctx).Where(condition, []string{groupID}).Order("role_level DESC,join_time ASC").Offset(offset).Limit(count).Find(&groupMemberList).Error
} else {
log.Debug("", "subCondition SearchGroupMembers ", condition)
err = d.conn.WithContext(ctx).Where(condition).Order("role_level DESC,join_time ASC").Offset(offset).Limit(count).Find(&groupMemberList).Error
log.Debug("", "subCondition SearchGroupMembers ", condition, len(groupMemberList))

}

for _, v := range groupMemberList {
Expand Down
2 changes: 0 additions & 2 deletions pkg/db/super_group_chat_log_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/pkg/constant"
"github.com/openimsdk/openim-sdk-core/v3/pkg/db/model_struct"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/sdk_struct"

Expand Down Expand Up @@ -141,7 +140,6 @@ func (d *DataBase) SuperGroupSearchMessageByContentTypeAndKeyword(ctx context.Co
}
condition = fmt.Sprintf("send_time between %d and %d AND status <=%d And content_type IN ? ", startTime, endTime, constant.MsgStatusSendFailed)
condition += subCondition
log.Info("key owrd", condition)
err = utils.Wrap(d.conn.WithContext(ctx).Table(utils.GetConversationTableName(groupID)).Where(condition, contentType).Order("send_time DESC").Find(&messageList).Error, "SearchMessage failed")
for _, v := range messageList {
v1 := v
Expand Down
3 changes: 1 addition & 2 deletions wasm/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package main

import (
"fmt"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
"github.com/openimsdk/openim-sdk-core/v3/wasm/wasm_wrapper"
"runtime"
"runtime/debug"
Expand All @@ -30,7 +29,7 @@ import (
func main() {
defer func() {
if r := recover(); r != nil {
log.Error("MAIN", "panic info is:", r, string(debug.Stack()))
fmt.Println("MAIN", "panic info is:", r, debug.Stack())
}
}()
fmt.Println("runtime env", runtime.GOARCH, runtime.GOOS)
Expand Down
25 changes: 14 additions & 11 deletions wasm/event_listener/caller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ package event_listener

import (
"bytes"
"context"
"errors"
"github.com/openimsdk/openim-sdk-core/v3/pkg/log"
"github.com/OpenIMSDK/tools/log"
"github.com/openimsdk/openim-sdk-core/v3/pkg/utils"
"github.com/openimsdk/openim-sdk-core/v3/wasm/exec"
"reflect"
Expand Down Expand Up @@ -63,6 +64,7 @@ func (r *ReflectCall) asyncCallWithCallback() {
r.ErrHandle(rc)
}
}()
ctx := context.Background()
var funcName reflect.Value
var typeFuncName reflect.Type
var hasCallback bool
Expand All @@ -79,7 +81,7 @@ func (r *ReflectCall) asyncCallWithCallback() {
r.callback.SetOperationID(r.arguments[0].String())
values = append(values, reflect.ValueOf(r.callback))
} else {
log.Error("AsyncCallWithCallback", "not set callback")
log.ZDebug(ctx, "AsyncCallWithCallback not set callback")
panic(ErrNotSetCallback)
}
funcFieldsNum := typeFuncName.NumIn()
Expand All @@ -99,11 +101,11 @@ func (r *ReflectCall) asyncCallWithCallback() {
if !strings.HasPrefix(convertValue, "<number: ") {
values = append(values, reflect.ValueOf(convertValue))
} else {
log.Error("AsyncCallWithCallback", "input args type err index:", utils.IntToString(i))
log.ZError(ctx, "AsyncCallWithCallback", nil, "input args type err index:",
utils.IntToString(i))
panic("input args type err index:" + utils.IntToString(i))
}
case reflect.Int:
log.NewDebug("", "type is ", r.arguments[i].Int())
values = append(values, reflect.ValueOf(r.arguments[i].Int()))
case reflect.Int32:
values = append(values, reflect.ValueOf(int32(r.arguments[i].Int())))
Expand All @@ -114,7 +116,8 @@ func (r *ReflectCall) asyncCallWithCallback() {
case reflect.Ptr:
values = append(values, reflect.ValueOf(bytes.NewBuffer(exec.ExtractArrayBuffer(r.arguments[i]))))
default:
log.Error("AsyncCallWithCallback", "input args type not support:", strconv.Itoa(int(typeFuncName.In(temp).Kind())))
log.ZError(ctx, "AsyncCallWithCallback", nil,
"input args type not support:", strconv.Itoa(int(typeFuncName.In(temp).Kind())))
panic("input args type not support:" + strconv.Itoa(int(typeFuncName.In(temp).Kind())))
}
}
Expand All @@ -133,6 +136,7 @@ func (r *ReflectCall) asyncCallWithOutCallback() {
r.ErrHandle(rc)
}
}()
ctx := context.Background()
var funcName reflect.Value
var typeFuncName reflect.Type
if r.funcName == nil {
Expand All @@ -145,7 +149,7 @@ func (r *ReflectCall) asyncCallWithOutCallback() {
if r.callback == nil {
r.callback = NewBaseCallback(utils.FirstLower(utils.GetSelfFuncName()), nil)
}
log.Error("test", "asyncCallWithOutCallback", len(r.arguments))
log.ZError(ctx, "test", nil, "asyncCallWithOutCallback", len(r.arguments))
r.callback.SetOperationID(r.arguments[0].String())
//strings.SplitAfter()
for i := 0; i < len(r.arguments); i++ {
Expand All @@ -159,7 +163,6 @@ func (r *ReflectCall) asyncCallWithOutCallback() {
panic("input args type err index:" + utils.IntToString(i))
}
case reflect.Int:
log.NewDebug("", "type is ", r.arguments[i].Int())
values = append(values, reflect.ValueOf(r.arguments[i].Int()))
case reflect.Int32:
values = append(values, reflect.ValueOf(int32(r.arguments[i].Int())))
Expand Down Expand Up @@ -236,7 +239,6 @@ func (r *ReflectCall) SyncCall() (result []interface{}) {
panic("input args type err index:" + utils.IntToString(i))
}
case reflect.Int:
log.NewDebug("", "type is ", r.arguments[i].Int())
values = append(values, reflect.ValueOf(r.arguments[i].Int()))
case reflect.Int32:
values = append(values, reflect.ValueOf(int32(r.arguments[i].Int())))
Expand Down Expand Up @@ -266,18 +268,19 @@ func (r *ReflectCall) SyncCall() (result []interface{}) {

}
func (r *ReflectCall) ErrHandle(recover interface{}) []string {
ctx := context.Background()
var temp string
switch x := recover.(type) {
case string:
log.Error("STRINGERR", x)
log.ZError(ctx, "STRINGERR", nil, "r", x)
temp = utils.Wrap(errors.New(x), "").Error()
case error:
//buf := make([]byte, 1<<20)
//runtime.Stack(buf, true)
log.Error("ERR", x.Error())
log.ZError(ctx, "ERR", x, "r", x.Error())
temp = x.Error()
default:
log.Error("unknown panic")
log.ZError(ctx, "unknown panic", nil, "r", x)
temp = utils.Wrap(errors.New("unknown panic"), "").Error()
}
if r.callback != nil {
Expand Down
Loading
Loading