Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: process add errors wrap. #1862

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 4 additions & 21 deletions cmd/openim-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"net"
"net/http"
_ "net/http/pprof"
Expand All @@ -28,8 +29,6 @@ import (

"github.com/OpenIMSDK/protocol/constant"
"github.com/OpenIMSDK/tools/discoveryregistry"
"github.com/OpenIMSDK/tools/log"

"github.com/openimsdk/open-im-server/v3/internal/api"
"github.com/openimsdk/open-im-server/v3/pkg/common/cmd"
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
Expand All @@ -44,69 +43,54 @@ func main() {
apiCmd.AddPortFlag()
apiCmd.AddApi(run)
if err := apiCmd.Execute(); err != nil {
log.ZError(context.Background(), "API command execution failed", err)
panic(err.Error())
}
}

func run(port int, proPort int) error {
log.ZInfo(context.Background(), "Openim api port:", "port", port, "proPort", proPort)

if port == 0 || proPort == 0 {
err := "port or proPort is empty:" + strconv.Itoa(port) + "," + strconv.Itoa(proPort)
log.ZError(context.Background(), err, nil)
return fmt.Errorf(err)
return errs.Wrap(fmt.Errorf(err))
}

rdb, err := cache.NewRedis()
if err != nil {
log.ZError(context.Background(), "Failed to initialize Redis", err)
return err
}
log.ZInfo(context.Background(), "api start init discov client")

var client discoveryregistry.SvcDiscoveryRegistry

// Determine whether zk is passed according to whether it is a clustered deployment
client, err = kdisc.NewDiscoveryRegister(config.Config.Envs.Discovery)
if err != nil {
log.ZError(context.Background(), "Failed to initialize discovery register", err)
return err
return errs.Wrap(err, "register discovery err")
}

if err = client.CreateRpcRootNodes(config.Config.GetServiceNames()); err != nil {
log.ZError(context.Background(), "Failed to create RPC root nodes", err)
return err
return errs.Wrap(err, "create rpc root nodes error")
}

log.ZInfo(context.Background(), "api register public config to discov")
if err = client.RegisterConf2Registry(constant.OpenIMCommonConfigKey, config.Config.EncodeConfig()); err != nil {
log.ZError(context.Background(), "Failed to register public config to discov", err)
return err
}

log.ZInfo(context.Background(), "api register public config to discov success")
router := api.NewGinRouter(client, rdb)
if config.Config.Prometheus.Enable {
p := ginprom.NewPrometheus("app", prommetrics.GetGinCusMetrics("Api"))
p.SetListenAddress(fmt.Sprintf(":%d", proPort))
p.Use(router)
}
log.ZInfo(context.Background(), "api init router success")

var address string
if config.Config.Api.ListenIP != "" {
address = net.JoinHostPort(config.Config.Api.ListenIP, strconv.Itoa(port))
} else {
address = net.JoinHostPort("0.0.0.0", strconv.Itoa(port))
}
log.ZInfo(context.Background(), "start api server", "address", address, "OpenIM version", config.Version)

server := http.Server{Addr: address, Handler: router}
go func() {
err = server.ListenAndServe()
if err != nil && err != http.ErrServerClosed {
log.ZError(context.Background(), "api run failed", err, "address", address)
os.Exit(1)
}
}()
Expand All @@ -120,7 +104,6 @@ func run(port int, proPort int) error {

// graceful shutdown operation.
if err := server.Shutdown(ctx); err != nil {
log.ZError(context.Background(), "failed to api-server shutdown", err)
return err
}

Expand Down
49 changes: 29 additions & 20 deletions internal/msgtransfer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package msgtransfer

import (
"context"
"errors"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"log"
"net/http"
"sync"
Expand Down Expand Up @@ -69,40 +71,47 @@ func StartTransfer(prometheusPort int) error {
client.AddOption(mw.GrpcClient(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, "round_robin")))
msgModel := cache.NewMsgCacheModel(rdb)
msgDocModel := unrelation.NewMsgMongoDriver(mongo.GetDatabase())
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, msgModel)
if err != nil {
return err
}
conversationRpcClient := rpcclient.NewConversationRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
msgTransfer := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
msgTransfer, err := NewMsgTransfer(msgDatabase, &conversationRpcClient, &groupRpcClient)
if err != nil {
return err
}
return msgTransfer.Start(prometheusPort)
}

func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) *MsgTransfer {
return &MsgTransfer{
historyCH: NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient),
historyMongoCH: NewOnlineHistoryMongoConsumerHandler(msgDatabase),
func NewMsgTransfer(msgDatabase controller.CommonMsgDatabase, conversationRpcClient *rpcclient.ConversationRpcClient, groupRpcClient *rpcclient.GroupRpcClient) (*MsgTransfer, error) {
historyCH, err := NewOnlineHistoryRedisConsumerHandler(msgDatabase, conversationRpcClient, groupRpcClient)
if err != nil {
return nil, err
}
historyMongoCH, err := NewOnlineHistoryMongoConsumerHandler(msgDatabase)
if err != nil {
return nil, err
}

return &MsgTransfer{
historyCH: historyCH,
historyMongoCH: historyMongoCH,
}, nil
}

func (m *MsgTransfer) Start(prometheusPort int) error {
ctx := context.Background()
var wg sync.WaitGroup
wg.Add(1)
fmt.Println("start msg transfer", "prometheusPort:", prometheusPort)
if prometheusPort <= 0 {
return errors.New("prometheusPort not correct")
}
if config.Config.ChatPersistenceMysql {
// go m.persistentCH.persistentConsumerGroup.RegisterHandleAndConsumer(m.persistentCH)
} else {
fmt.Println("msg transfer not start mysql consumer")
return errs.Wrap(errors.New("prometheusPort not correct"))
}
go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(m.historyMongoCH)
// go m.modifyCH.modifyMsgConsumerGroup.RegisterHandleAndConsumer(m.modifyCH)
/*err := prome.StartPrometheusSrv(prometheusPort)
if err != nil {
return err
}*/
////////////////////////////

go m.historyCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyCH)
go m.historyMongoCH.historyConsumerGroup.RegisterHandleAndConsumer(ctx, m.historyMongoCH)

if config.Config.Prometheus.Enable {
reg := prometheus.NewRegistry()
reg.MustRegister(
Expand Down
7 changes: 4 additions & 3 deletions internal/msgtransfer/online_history_msg_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func NewOnlineHistoryRedisConsumerHandler(
database controller.CommonMsgDatabase,
conversationRpcClient *rpcclient.ConversationRpcClient,
groupRpcClient *rpcclient.GroupRpcClient,
) *OnlineHistoryRedisConsumerHandler {
) (*OnlineHistoryRedisConsumerHandler, error) {
var och OnlineHistoryRedisConsumerHandler
och.msgDatabase = database
och.msgDistributionCh = make(chan Cmd2Value) // no buffer channel
Expand All @@ -98,14 +98,15 @@ func NewOnlineHistoryRedisConsumerHandler(
}
och.conversationRpcClient = conversationRpcClient
och.groupRpcClient = groupRpcClient
och.historyConsumerGroup = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
var err error
och.historyConsumerGroup, err = kafka.NewMConsumerGroup(&kafka.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.LatestMsgToRedis.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToRedis)
// statistics.NewStatistics(&och.singleMsgSuccessCount, config.Config.ModuleName.MsgTransferName, fmt.Sprintf("%d
// second singleMsgCount insert to mongo", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
return &och
return &och, err
}

func (och *OnlineHistoryRedisConsumerHandler) Run(channelID int) {
Expand Down
21 changes: 13 additions & 8 deletions internal/msgtransfer/online_msg_to_mongo_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,21 @@ type OnlineHistoryMongoConsumerHandler struct {
msgDatabase controller.CommonMsgDatabase
}

func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) *OnlineHistoryMongoConsumerHandler {
func NewOnlineHistoryMongoConsumerHandler(database controller.CommonMsgDatabase) (*OnlineHistoryMongoConsumerHandler, error) {
historyConsumerGroup, err := kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo)
if err != nil {
return nil, err
}

mc := &OnlineHistoryMongoConsumerHandler{
historyConsumerGroup: kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToMongo.Topic},
config.Config.Kafka.Addr, config.Config.Kafka.ConsumerGroupID.MsgToMongo),
msgDatabase: database,
historyConsumerGroup: historyConsumerGroup,
msgDatabase: database,
}
return mc
return mc, nil
}

func (mc *OnlineHistoryMongoConsumerHandler) handleChatWs2Mongo(
Expand Down
17 changes: 11 additions & 6 deletions internal/push/consumer_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,24 @@

package push

import "context"

type Consumer struct {
pushCh ConsumerHandler
successCount uint64
}

func NewConsumer(pusher *Pusher) *Consumer {
return &Consumer{
pushCh: *NewConsumerHandler(pusher),
func NewConsumer(pusher *Pusher) (*Consumer, error) {
c, err := NewConsumerHandler(pusher)
if err != nil {
return nil, err
}
return &Consumer{
pushCh: *c,
}, nil
}

func (c *Consumer) Start() {
// statistics.NewStatistics(&c.successCount, config.Config.ModuleName.PushName, fmt.Sprintf("%d second push to
// msg_gateway count", constant.StatisticsTimeInterval), constant.StatisticsTimeInterval)
go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(&c.pushCh)

go c.pushCh.pushConsumerGroup.RegisterHandleAndConsumer(context.Background(), &c.pushCh)
}
10 changes: 7 additions & 3 deletions internal/push/push_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,19 @@ type ConsumerHandler struct {
pusher *Pusher
}

func NewConsumerHandler(pusher *Pusher) *ConsumerHandler {
func NewConsumerHandler(pusher *Pusher) (*ConsumerHandler, error) {
var consumerHandler ConsumerHandler
consumerHandler.pusher = pusher
consumerHandler.pushConsumerGroup = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
var err error
consumerHandler.pushConsumerGroup, err = kfk.NewMConsumerGroup(&kfk.MConsumerGroupConfig{
KafkaVersion: sarama.V2_0_0_0,
OffsetsInitial: sarama.OffsetNewest, IsReturnErr: false,
}, []string{config.Config.Kafka.MsgToPush.Topic}, config.Config.Kafka.Addr,
config.Config.Kafka.ConsumerGroupID.MsgToPush)
return &consumerHandler
if err != nil {
return nil, err
}
return &consumerHandler, nil
}

func (c *ConsumerHandler) handleMs2PsChat(ctx context.Context, msg []byte) {
Expand Down
5 changes: 4 additions & 1 deletion internal/push/push_rpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
pusher: pusher,
})
}()
consumer, err := NewConsumer(pusher)
if err != nil {
return err
}
go func() {
defer wg.Done()
consumer := NewConsumer(pusher)
consumer.Start()
}()
wg.Wait()
Expand Down
5 changes: 4 additions & 1 deletion internal/rpc/msg/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ func Start(client discoveryregistry.SvcDiscoveryRegistry, server *grpc.Server) e
userRpcClient := rpcclient.NewUserRpcClient(client)
groupRpcClient := rpcclient.NewGroupRpcClient(client)
friendRpcClient := rpcclient.NewFriendRpcClient(client)
msgDatabase := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
msgDatabase, err := controller.NewCommonMsgDatabase(msgDocModel, cacheModel)
if err != nil {
return err
}
s := &msgServer{
Conversation: &conversationClient,
User: &userRpcClient,
Expand Down
14 changes: 6 additions & 8 deletions internal/tools/cron_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package tools
import (
"context"
"fmt"
"github.com/OpenIMSDK/tools/errs"
"os"
"os/signal"
"syscall"
Expand All @@ -25,14 +26,13 @@ import (
"github.com/redis/go-redis/v9"
"github.com/robfig/cron/v3"

"github.com/OpenIMSDK/tools/log"

"github.com/openimsdk/open-im-server/v3/pkg/common/config"
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
)

func StartTask() error {
fmt.Println("cron task start, config", config.Config.ChatRecordsClearTime)

msgTool, err := InitMsgTool()
if err != nil {
return err
Expand All @@ -47,18 +47,16 @@ func StartTask() error {

// register cron tasks
var crontab = cron.New()
log.ZInfo(context.Background(), "start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
fmt.Println("start chatRecordsClearTime cron task", "cron config", config.Config.ChatRecordsClearTime)
_, err = crontab.AddFunc(config.Config.ChatRecordsClearTime, cronWrapFunc(rdb, "cron_clear_msg_and_fix_seq", msgTool.AllConversationClearMsgAndFixSeq))
if err != nil {
log.ZError(context.Background(), "start allConversationClearMsgAndFixSeq cron failed", err)
panic(err)
return errs.Wrap(err)
}

log.ZInfo(context.Background(), "start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
fmt.Println("start msgDestruct cron task", "cron config", config.Config.MsgDestructTime)
_, err = crontab.AddFunc(config.Config.MsgDestructTime, cronWrapFunc(rdb, "cron_conversations_destruct_msgs", msgTool.ConversationsDestructMsgs))
if err != nil {
log.ZError(context.Background(), "start conversationsDestructMsgs cron failed", err)
panic(err)
return errs.Wrap(err)
}

// start crontab
Expand Down
5 changes: 4 additions & 1 deletion internal/tools/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,10 @@ func InitMsgTool() (*MsgTool, error) {
if err != nil {
return nil, err
}
msgDatabase := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
msgDatabase, err := controller.InitCommonMsgDatabase(rdb, mongo.GetDatabase())
if err != nil {
return nil, err
}
userMongoDB := unrelation.NewUserMongoDriver(mongo.GetDatabase())
ctxTx := tx.NewMongo(mongo.GetClient())
userDatabase := controller.NewUserDatabase(
Expand Down
4 changes: 2 additions & 2 deletions pkg/common/db/cache/init_redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func NewRedis() (redis.UniversalClient, error) {
overrideConfigFromEnv()

if len(config.Config.Redis.Address) == 0 {
return nil, errors.New("redis address is empty")
return nil, errs.Wrap(errors.New("redis address is empty"))
}
specialerror.AddReplace(redis.Nil, errs.ErrRecordNotFound)
var rdb redis.UniversalClient
Expand Down Expand Up @@ -77,7 +77,7 @@ func NewRedis() (redis.UniversalClient, error) {
defer cancel()
err = rdb.Ping(ctx).Err()
if err != nil {
return nil, fmt.Errorf("redis ping %w", err)
return nil, errs.Wrap(fmt.Errorf("redis ping %w", err))
}

redisClient = rdb
Expand Down
Loading
Loading