Skip to content

Commit

Permalink
chore: synchronize the code to the community (#465)
Browse files Browse the repository at this point in the history
Signed-off-by: shilinlee <836160610@qq.com>
Co-authored-by: openGemini <>
  • Loading branch information
shilinlee committed Jan 19, 2024
1 parent 7455886 commit 5159f52
Show file tree
Hide file tree
Showing 185 changed files with 24,576 additions and 1,228 deletions.
2 changes: 0 additions & 2 deletions CONTRIBUTION.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ openGemini
├── engine
├── images
├── lib
├── open_src
├── python
├── scripts
├── services
Expand All @@ -38,7 +37,6 @@ openGemini
| docker | Store files related to Docker deployment, such as Dockerfile, startup scripts, etc. |
| engine | Storage engine implementation |
| lib | Implementation of various common tools and support functions |
| open_src | Dependent third-party open source component code (this directory will be considered for deletion later) |
| python | Implementation of AI-based time series data analysis platform, supporting time series data anomaly detection |
| scripts | Contains openGemini’s automatic deployment scripts, unit test scripts, etc. |
| services | openGemini's background services, such as Continue Query, Multi-level Downsample, etc. |
Expand Down
2 changes: 0 additions & 2 deletions CONTRIBUTION_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ openGemini
├── engine
├── images
├── lib
├── open_src
├── python
├── scripts
├── services
Expand All @@ -40,7 +39,6 @@ openGemini
| docker | 存放Docker部署相关的文件,比如Dockerfile,启动脚本等 |
| engine | 存储引擎实现 |
| lib | 各种通用工具和支持函数的实现 |
| open_src | 依赖的第三方开源组件代码(该目录后续会考虑删除) |
| python | 基于AI的时序数据分析平台实现,支持时序数据异常检测 |
| scripts | 包含openGemini的自动部署脚本、单元测试脚本等 |
| services | openGemini的后台服务,比如连续查询将采样(Continue Query), 多级将采样(Downsample)等 |
Expand Down
4 changes: 2 additions & 2 deletions Makefile.common
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ FAILPOINT_DISABLE := find $$PWD/ -type d | grep -vE "(\.git|\.github|\.tests)" |
PACKAGE_LIST_OPEN_GEMINI_TESTS := $(GO) list ./... | grep -vE "tests|lifted\/hashicorp"
PACKAGES_OPEN_GEMINI_TESTS ?= $$($(PACKAGE_LIST_OPEN_GEMINI_TESTS))

COPYRIGHT_EXCEPT := "open_src|lifted|tests|lib/netstorage/data/data.pb.go|lib/statisticsPusher/statistics/handler_statistics.go|app/ts-meta/meta/snapshot.go|engine/index/tsi/tag_filters.go|engine/index/tsi/tag_filter_test.go|engine/index/mergeindex/item.go|lib/config/openGemini_dir.go"
COPYRIGHT_EXCEPT := "lifted|tests|lib/netstorage/data/data.pb.go|lib/statisticsPusher/statistics/handler_statistics.go|app/ts-meta/meta/snapshot.go|engine/index/tsi/tag_filters.go|engine/index/tsi/tag_filter_test.go|engine/index/mergeindex/item.go|lib/config/openGemini_dir.go"
COPYRIGHT_GOFILE := $$(find . -name '*.go' | grep -vE $(COPYRIGHT_EXCEPT))
COPYRIGHT_HEADER := "Copyright 2022|2023|2024 Huawei Cloud Computing Technologies Co., Ltd."

STYLE_CHECK_EXCEPT := "lifted/hashicorp"
STYLE_CHECK_EXCEPT := "lifted/hashicorp|lifted/protobuf"
STYLE_CHECK_GOFILE := $$(find . -name '*.go' | grep -vE $(STYLE_CHECK_EXCEPT))

PROTOC_ZIP_LINUX=protoc-3.14.0-linux-x86_64.zip
Expand Down
2 changes: 1 addition & 1 deletion app/ts-meta/meta/assign_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ import (
"fmt"
"time"

"github.com/gogo/protobuf/proto"
"github.com/openGemini/openGemini/lib/errno"
"github.com/openGemini/openGemini/lib/logger"
"github.com/openGemini/openGemini/lib/statisticsPusher/statistics"
"github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
mproto "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
)

type AssignEvent struct {
Expand Down
53 changes: 36 additions & 17 deletions app/ts-meta/meta/cluster_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package meta
import (
"math"
"math/rand"
"net"
"sort"
"strconv"
"sync"
Expand All @@ -34,8 +35,19 @@ import (
"go.uber.org/zap"
)

type eventFrom string

const (
fromGossip eventFrom = "gossip"
fromSelfCheck eventFrom = "selfCheck"
fromRetryChan eventFrom = "retryChan"
fromReopen eventFrom = "reopen"
fromLeaderChanged eventFrom = "leaderChanged"
fromTakeover eventFrom = "takeover"
)

type storeInterface interface {
updateNodeStatus(id uint64, status int32, lTime uint64, gossipAddr string) error
updateNodeStatus(id uint64, status int32, lTime uint64, gossipPort string) error
dataNodes() meta.DataNodeInfos
}

Expand Down Expand Up @@ -124,7 +136,7 @@ func (cm *ClusterManager) Start() {
cm.reOpen = make(chan struct{})
globalService.msm.waitRecovery() // wait exist pt event execute first
atomic.CompareAndSwapInt32(&cm.stop, 1, 0)
cm.resendPreviousEvent()
cm.resendPreviousEvent(fromLeaderChanged)
cm.wg.Add(1)
go cm.checkEvents()
}
Expand All @@ -147,7 +159,7 @@ func (cm *ClusterManager) isClosed() bool {
}

// resend previous event when transfer to leader to avoid some event do not handled when leader change
func (cm *ClusterManager) resendPreviousEvent() {
func (cm *ClusterManager) resendPreviousEvent(from eventFrom) {
dataNodes := globalService.store.dataNodes()
cm.mu.RLock()
for i := range dataNodes {
Expand All @@ -169,9 +181,9 @@ func (cm *ClusterManager) resendPreviousEvent() {
}
}

logger.NewLogger(errno.ModuleHA).Error("resend event", zap.String("event", e.String()), zap.Any("members", e.Members))
logger.NewLogger(errno.ModuleHA).Error("resend event", zap.String("event", e.String()), zap.Any("members", e.Members), zap.Uint64("lTime", uint64(e.EventTime)))
cm.eventWg.Add(1)
go cm.processEvent(*e)
go cm.processEvent(*e, from)
}
cm.mu.RUnlock()
}
Expand Down Expand Up @@ -202,17 +214,17 @@ func (cm *ClusterManager) checkEvents() {
for i := 0; i < len(cm.eventCh); i++ {
e := <-cm.eventCh
cm.eventWg.Add(1)
go cm.processEvent(e)
go cm.processEvent(e, fromReopen)
}
return
case <-cm.closing: // meta node is closing
return
case event := <-cm.eventCh:
cm.eventWg.Add(1)
go cm.processEvent(event)
go cm.processEvent(event, fromGossip)
case event := <-cm.retryEventCh:
cm.eventWg.Add(1)
go cm.processEvent(event)
go cm.processEvent(event, fromRetryChan)
case <-check:
if !sendFailedEvent {
sendFailedEvent = true
Expand All @@ -221,7 +233,7 @@ func (cm *ClusterManager) checkEvents() {
case takeoverEnable := <-cm.takeover:
if takeoverEnable {
cm.eventWg.Wait()
cm.resendPreviousEvent()
cm.resendPreviousEvent(fromTakeover)
}
}
}
Expand All @@ -236,35 +248,42 @@ func (cm *ClusterManager) checkFailedNode() {
}
e := cm.eventMap[strconv.FormatUint(dataNodes[i].ID, 10)]
if e == nil {
host, _, _ := net.SplitHostPort(dataNodes[i].Host)
e = &serf.MemberEvent{
Type: serf.EventMemberFailed,
EventTime: serf.LamportTime(dataNodes[i].LTime),
Members: []serf.Member{serf.Member{Name: strconv.FormatUint(dataNodes[i].ID, 10), Tags: map[string]string{"role": "store"},
Status: serf.StatusFailed}}}
Members: []serf.Member{{
Name: strconv.FormatUint(dataNodes[i].ID, 10),
Addr: net.ParseIP(host),
Tags: map[string]string{"role": "store"},
Status: serf.StatusFailed,
}},
}
logger.GetLogger().Error("check failed node", zap.String("event", e.String()),
zap.Uint64("lTime", uint64(e.EventTime)), zap.Any("host", e.Members))
zap.Uint64("lTime", uint64(e.EventTime)), zap.Uint64("nodeId", dataNodes[i].ID))
cm.eventWg.Add(1)
go cm.processEvent(*e)
go cm.processEvent(*e, fromSelfCheck)
}
}
cm.mu.RUnlock()
}

func (cm *ClusterManager) processEvent(event serf.Event) {
func (cm *ClusterManager) processEvent(event serf.Event, from eventFrom) {
defer cm.eventWg.Done()
// ignore handle update and reap event
if cm.handlerMap[event.EventType()] == nil {
return
}
e := event.(serf.MemberEvent)
me := initMemberEvent(e, cm.handlerMap[event.EventType()])
me := initMemberEvent(from, e, cm.handlerMap[event.EventType()])
err := me.handle()
if err == raft.ErrNotLeader {
logger.GetLogger().Error("not leader cannot handle event", zap.String("type", e.String()), zap.Any("members", e.Members), zap.Uint64("lTime", uint64(e.EventTime)), zap.Error(err))
return
}
if err != nil {
logger.NewLogger(errno.ModuleHA).Error("fail to handle event", zap.Error(err), zap.Any("members", e.Members))
cm.retryEventCh <- event
logger.GetLogger().Error("fail to handle event", zap.String("type", e.String()), zap.Any("members", e.Members), zap.Uint64("lTime", uint64(e.EventTime)), zap.Error(err))
cm.retryEventCh <- e
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/ts-meta/meta/continuous_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"sort"
"time"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
proto2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion app/ts-meta/meta/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/openGemini/openGemini/app"
"github.com/openGemini/openGemini/lib/config"
"github.com/openGemini/openGemini/lib/errno"
Expand All @@ -40,6 +39,7 @@ import (
"github.com/openGemini/openGemini/lib/util/lifted/influx/httpd"
"github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
proto2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
"go.uber.org/zap"
)

Expand Down
20 changes: 7 additions & 13 deletions app/ts-meta/meta/handlers_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ import (
"sync"
"time"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/openGemini/openGemini/app/ts-meta/meta/message"
"github.com/openGemini/openGemini/engine/executor/spdy/transport"
"github.com/openGemini/openGemini/lib/errno"
"github.com/openGemini/openGemini/lib/metaclient"
"github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
proto2 "github.com/openGemini/openGemini/lib/util/lifted/influx/meta/proto"
"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
"github.com/pingcap/failpoint"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -79,9 +79,13 @@ func (h *Snapshot) Process() (transport.Codec, error) {
return rsp, nil
}

if !h.store.IsLeader() {
rsp.Err = errno.NewError(errno.MetaIsNotLeader).Error()
return rsp, nil
}

index := h.req.Index
checkRaft := time.After(2 * time.Second)
tries := 0
checkRaft := time.After(3 * time.Second)
for {
select {
case <-h.store.afterIndex(index):
Expand All @@ -92,16 +96,6 @@ func (h *Snapshot) Process() (transport.Codec, error) {
rsp.Err = "server closed"
return rsp, nil
case <-checkRaft:
checkRaft = time.After(2 * time.Second)
if h.store.isCandidate() {
tries++
if tries >= 3 {
rsp.Err = "server closed"
return rsp, nil
}
h.logger.Info("checkRaft failed", zap.Int("tries", tries))
continue
}
return rsp, nil
}
}
Expand Down
24 changes: 23 additions & 1 deletion app/ts-meta/meta/handlers_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
"github.com/hashicorp/raft"
"github.com/openGemini/openGemini/app/ts-meta/meta/message"
"github.com/openGemini/openGemini/lib/config"
"github.com/openGemini/openGemini/lib/errno"
"github.com/openGemini/openGemini/lib/util/lifted/hashicorp/serf/serf"
"github.com/openGemini/openGemini/lib/util/lifted/influx/meta"
"github.com/openGemini/openGemini/lib/util/lifted/protobuf/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCreateDatabase(t *testing.T) {
Expand Down Expand Up @@ -293,3 +296,22 @@ func TestSendSysCtrlToMetaProcess(t *testing.T) {
assert.Equal(t, res.(*message.SendSysCtrlToMetaResponse).Err, testcase.ExpectErr)
}
}

func TestSnapshot(t *testing.T) {
mockStore := NewMockRPCStore()
mockStore.stat = raft.Follower

msg := message.NewMetaMessage(message.SnapshotRequestMessage, &message.SnapshotRequest{
Index: 2,
})
h := New(msg.Type())
h.InitHandler(mockStore, nil, nil)
require.NoError(t, h.SetRequestMsg(msg.Data()))

respMsg, err := h.Process()
require.NoError(t, err)

resp, ok := respMsg.(*message.SnapshotResponse)
require.True(t, ok)
require.Equal(t, resp.Err, errno.NewError(errno.MetaIsNotLeader).Error())
}
4 changes: 3 additions & 1 deletion app/ts-meta/meta/member_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ package meta
import "github.com/openGemini/openGemini/lib/util/lifted/hashicorp/serf/serf"

type memberEvent struct {
from eventFrom
event serf.MemberEvent
handler memberEventHandler
}

func initMemberEvent(e serf.MemberEvent, handler memberEventHandler) *memberEvent {
func initMemberEvent(from eventFrom, e serf.MemberEvent, handler memberEventHandler) *memberEvent {
return &memberEvent{
from: from,
event: e,
handler: handler,
}
Expand Down
Loading

0 comments on commit 5159f52

Please sign in to comment.