Skip to content

Commit

Permalink
adjusting;disabled release
Browse files Browse the repository at this point in the history
  • Loading branch information
jin06 committed Apr 15, 2023
1 parent 9019533 commit 8808689
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 73 deletions.
117 changes: 45 additions & 72 deletions app/server/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// Running pipeline, reporting status, etc.
// if it becomes the master node, it will run tasks such as scheduling pipeline, monitoring, event management, etc
type Node struct {
Mode *NodeMode
Mode *Mode
Options *Options
Name string
Register *register.Register
Expand All @@ -43,21 +43,21 @@ type Node struct {
raftNode *raft.RaftNode
}

type NodeMode byte
type Mode byte

const (
MODE_CLUSTER NodeMode = 1
MODE_SINGLE NodeMode = 2
ModeCluster Mode = 1
ModeSingle Mode = 2
)

// NodeMode todo
func (n NodeMode) String() string {
func (n Mode) String() string {
switch n {
case MODE_CLUSTER:
case ModeCluster:
{
return "cluster"
}
case MODE_SINGLE:
case ModeSingle:
{
return "single"
}
Expand Down Expand Up @@ -107,25 +107,9 @@ func (n *Node) Run(ctx context.Context) (err error) {
cancel()
}()

err = n.runLeaderDuty(myCtx)
if err != nil {
return
}
n.runLeaderDuty(myCtx)

err = n.runNodeDuty(myCtx)
if err != nil {
return
}
//err = n.startRaftNode(myCtx)
//if err != nil {
// return
//}
//err = n.refreshNode()
//if err != nil {
// return
//}
//nodeCtx := n._mustRun(myCtx)
//n._leaderRun(myCtx)
n.runWorkerDuty(myCtx)

select {
case <-ctx.Done():
Expand All @@ -135,32 +119,46 @@ func (n *Node) Run(ctx context.Context) (err error) {
}
}

func (n *Node) runLeaderDuty(ctx context.Context) (err error) {
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()
select {
case <-ctx.Done():
{
return
}
func (n *Node) runLeaderDuty(ctx context.Context) {
err := n.startRaftNode(ctx)
if err != nil {
panic(err)
}
return
go func() {
ch := n.raftNode.R.LeaderCh()
for {
select {
case <-time.Tick(time.Second):
{
logrus.Debug("leader tick ")
}
case <-ctx.Done():
{
return
}
case isLeader := <-ch:
{
if isLeader {
logrus.Infoln("I'm leader!")
} else {
logrus.Infoln("I'm not leader ")
}
}
}
}
}()
}

func (n *Node) runNodeDuty(ctx context.Context) (err error) {
subCtx, cancel := context.WithCancel(ctx)
defer func() {
cancel()
}()
select {
case <-ctx.Done():
{
return
func (n *Node) runWorkerDuty(ctx context.Context) {
go func() {

select {
case <-ctx.Done():
{
return
}
}
}
return
}()
}

// Role returns current role
Expand Down Expand Up @@ -292,11 +290,6 @@ func (n *Node) _mustRun(ctx context.Context) (resCtx context.Context) {
var a bool

func (n *Node) startRaftNode(ctx context.Context) error {
if a {
return nil
}
a = true
logrus.Infoln("Start Raft Node!")
var err error
defer func() {
if err != nil {
Expand All @@ -322,25 +315,5 @@ func (n *Node) startRaftNode(ctx context.Context) error {
Address: raft2.ServerAddress(myServerAddress),
}
n.raftNode, err = raft.NewRaftNode(ctx, myServer, myServerDir, nodes, viper.GetBool("raft.bootstrap"))
go func() {
for {
//et := cache.Entry{
// Menu: "pipelineß",
// Key: "test" + time.Now().String(),
// Value: []byte("111"),
//}
//b, _ := json.Marshal(et)
//af := n.raftNode.R.Apply(b,time.Second)
//fmt.Println(af.Error())

<-time.Tick(time.Second)
fmt.Println(n.raftNode.R.LastIndex())
fmt.Println(n.raftNode.R.LeaderWithID())
fmt.Println(n.raftNode.R.AppliedIndex())
//n.raftNode.R.Apply([]byte(time.Now().String()), time.Second)
//fmt.Println(n.raftNode.R.LastIndex())
//fmt.Printf("%v\n", n.raftNode.R.VerifyLeader())
}
}()
return err
}
2 changes: 1 addition & 1 deletion pkg/raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func NewRaftNode(ctx context.Context, myServer raft.Server, dir string, raftServ
logrus.Fatalln("raft grpc server error, ", err)
}
}(ctx)
go rn.doLeader(ctx)
//go rn.doLeader(ctx)

return rn, err
}
Expand Down

0 comments on commit 8808689

Please sign in to comment.