Skip to content

Commit

Permalink
fix review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
aajkl committed Jul 1, 2024
1 parent b7ec82a commit 28a6c2f
Show file tree
Hide file tree
Showing 22 changed files with 91 additions and 82 deletions.
2 changes: 1 addition & 1 deletion configs/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/cockroachdb/errors"
"github.com/dustin/go-humanize"
"github.com/mcuadros/go-defaults"
"github.com/projecteru2/yavirt/internal/utils/notify/bison"
"github.com/projecteru2/yavirt/pkg/netx"
"github.com/projecteru2/yavirt/pkg/notify/bison"
"github.com/projecteru2/yavirt/pkg/utils"
"github.com/urfave/cli/v2"

Expand Down
3 changes: 2 additions & 1 deletion internal/eru/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/patrickmn/go-cache"
"github.com/projecteru2/core/log"
corerpc "github.com/projecteru2/core/rpc"
"github.com/projecteru2/yavirt/internal/eru/common"
"github.com/projecteru2/yavirt/internal/eru/store"
corestore "github.com/projecteru2/yavirt/internal/eru/store/core"
Expand Down Expand Up @@ -89,7 +90,7 @@ func NewManager(
logger.Error(ctx, err, "failed to add node")
return
}
if e.Code() == 1031 && strings.Contains(e.Message(), "node already exists") {
if e.Code() == corerpc.AddNode && strings.Contains(e.Message(), "node already exists") {
logger.Infof(ctx, "node %s already exists", config.Hostname)
} else {
logger.Errorf(ctx, err, "failed to add node %s", config.Hostname)
Expand Down
12 changes: 6 additions & 6 deletions internal/eru/agent/guest.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,24 +82,24 @@ func (g *Guest) CheckHealth(ctx context.Context, svc service.Service, timeout ti
return true
}

var tcpChecker []string
var httpChecker []string
var tcpCheckers []string
var httpCheckers []string

healthCheck := g.HealthCheck

for _, port := range healthCheck.TCPPorts {
for _, ip := range g.IPs {
tcpChecker = append(tcpChecker, fmt.Sprintf("%s:%s", ip, port))
tcpCheckers = append(tcpCheckers, fmt.Sprintf("%s:%s", ip, port))
}
}
if healthCheck.HTTPPort != "" {
for _, ip := range g.IPs {
httpChecker = append(httpChecker, fmt.Sprintf("http://%s:%s%s", ip, healthCheck.HTTPPort, healthCheck.HTTPURL)) //nolint
httpCheckers = append(httpCheckers, fmt.Sprintf("http://%s:%s%s", ip, healthCheck.HTTPPort, healthCheck.HTTPURL)) //nolint
}
}

f1 := utils.CheckHTTP(ctx, g.ID, httpChecker, healthCheck.HTTPCode, timeout)
f2 := utils.CheckTCP(ctx, g.ID, tcpChecker, timeout)
f1 := utils.CheckHTTP(ctx, g.ID, httpCheckers, healthCheck.HTTPCode, timeout)
f2 := utils.CheckTCP(ctx, g.ID, tcpCheckers, timeout)
f3 := CheckCMD(ctx, svc, g.ID, healthCheck.Cmds, timeout)
return f1 && f2 && f3
}
Expand Down
8 changes: 7 additions & 1 deletion internal/eru/agent/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,23 @@ func (mgr *Manager) GetMetricsCollector() *MetricsCollector {

func (e *MetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- vmHealthyDesc
ch <- coreHealthyDesc
}

func (e *MetricsCollector) Collect(ch chan<- prometheus.Metric) {
logger := log.WithFunc("agent.MetricsCollector.Collect")
for _, v := range e.wrkStatusCache.Items() {
wrkStatus, _ := v.Object.(*types.WorkloadStatus)
if wrkStatus == nil {
logger.Warnf(context.TODO(), "[BUG] wrkStatus can't be nil here")
continue
}
if !wrkStatus.Running {
continue
}
de := vmcache.FetchDomainEntry(wrkStatus.ID)
if de == nil {
log.WithFunc("MetricsCollector.Collect").Warnf(context.TODO(), "[eru agent] failed to get domain entry %s", wrkStatus.ID)
logger.Warnf(context.TODO(), "[eru agent] failed to get domain entry %s", wrkStatus.ID)
continue
}
healthy := 0
Expand Down
1 change: 1 addition & 0 deletions internal/eru/agent/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func (m *Manager) nodeStatusReport(ctx context.Context) {
return
}
if err := m.store.CheckHealth(ctx); err != nil {
logger.Error(ctx, err, "failed to check health of core")
m.mCol.coreHealthy.Store(false)
} else {
m.mCol.coreHealthy.Store(true)
Expand Down
8 changes: 4 additions & 4 deletions internal/eru/agent/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func (m *Manager) checkAllWorkloads(ctx context.Context) {
return
}

for _, workloadID := range workloadIDs {
ID := workloadID
_ = utils.Pool.Submit(func() { m.checkOneWorkload(ctx, ID) })
for idx := range workloadIDs {
wrkID := workloadIDs[idx]
_ = utils.Pool.Submit(func() { m.checkOneWorkload(ctx, wrkID) })
}
}

Expand All @@ -157,7 +157,7 @@ func (m *Manager) checkOneWorkload(ctx context.Context, ID string) bool {

m.wrkStatusCache.Set(workloadStatus.ID, workloadStatus, 0)

if err = m.setWorkloadStatus(ctx, workloadStatus); err != nil {
if err := m.setWorkloadStatus(ctx, workloadStatus); err != nil {
logger.Error(ctx, err, "update workload status failed")
}
return workloadStatus.Healthy
Expand Down
36 changes: 19 additions & 17 deletions internal/eru/recycle/recycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/pkg/errors"
"github.com/projecteru2/core/log"
corerpc "github.com/projecteru2/core/rpc"
virttypes "github.com/projecteru2/libyavirt/types"
"github.com/projecteru2/yavirt/configs"
"github.com/projecteru2/yavirt/internal/eru/common"
Expand All @@ -18,21 +19,21 @@ import (
"github.com/projecteru2/yavirt/internal/eru/types"
"github.com/projecteru2/yavirt/internal/service"
"github.com/projecteru2/yavirt/internal/utils"
"github.com/projecteru2/yavirt/internal/utils/notify/bison"
"github.com/projecteru2/yavirt/pkg/notify/bison"
"github.com/samber/lo"
"google.golang.org/grpc/status"
)

var (
interval = 1 * time.Minute
deleteWait = 15 * time.Second
sto store.Store
stor store.Store
)

func fetchWorkloads() ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
wrks, err := sto.ListNodeWorkloads(ctx, configs.Hostname())
wrks, err := stor.ListNodeWorkloads(ctx, configs.Hostname())
if err != nil {
return nil, err
}
Expand All @@ -54,7 +55,7 @@ func deleteGuest(svc service.Service, eruID string) error {
logger.Infof(ctx, "[recycle] start to remove dangling guest %s", eruID)
// since guest deletion is a dangerous operation here,
// so we check eru again
wrk, err := sto.GetWorkload(ctx, eruID)
wrk, err := stor.GetWorkload(ctx, eruID)
logger.Infof(ctx, "[recycle] guest %s, wrk: %v, err: %v", eruID, wrk, err)
if err == nil {
logger.Errorf(ctx, err, "[recycle] BUG: dangling guest %s is still in eru", eruID)
Expand All @@ -65,7 +66,7 @@ func deleteGuest(svc service.Service, eruID string) error {
if !ok {
return err
}
if e.Code() == 1051 && strings.Contains(e.Message(), "entity count invalid") { //nolint
if e.Code() == corerpc.GetWorkload && strings.Contains(e.Message(), "entity count invalid") { //nolint
logger.Infof(ctx, "[recycle] start to remove local guest %s", eruID)
// When creating a guest, the core first creates the workload and then creates a record in ETCD.
// Therefore, within the time window between these two operations, we may incorrectly detect dangling guests.
Expand All @@ -85,14 +86,13 @@ func deleteGuest(svc service.Service, eruID string) error {
notifier := bison.GetService()
log.Debugf(ctx, "[recycle] notifier: %v", notifier)
if notifier != nil {
msgList := []string{
"<font color=#00CC33 size=10>delete dangling successfully </font>",
"---",
"",
fmt.Sprintf("- **node:** %s", configs.Hostname()),
fmt.Sprintf("- **id:** %s", eruID),
}
text := "\n" + strings.Join(msgList, "\n")
text := fmt.Sprintf(`
<font color=#00CC33 size=10>delete dangling guest successfully </font>
---
- **node:** %s
- **id:** %s
`, configs.Hostname(), eruID)
if err := notifier.SendMarkdown(context.TODO(), "delete dangling guest", text); err != nil {
logger.Warnf(ctx, "[recycle] failed to send dingtalk message: %v", err)
}
Expand All @@ -104,8 +104,9 @@ func deleteGuest(svc service.Service, eruID string) error {
}

func startLoop(ctx context.Context, svc service.Service) {
log.WithFunc("startLoop").Infof(ctx, "[recycle] starting recycle loop")
defer log.WithFunc("startLoop").Infof(ctx, "[recycle] recycle loop stopped")
logger := log.WithFunc("startLoop")
logger.Info(ctx, "[recycle] starting recycle loop")
defer logger.Info(ctx, "[recycle] recycle loop stopped")

for {
select {
Expand All @@ -116,6 +117,7 @@ func startLoop(ctx context.Context, svc service.Service) {

coreIDs, err := fetchWorkloads()
if err != nil {
logger.Error(ctx, err, "failed to fetch workloads")
continue
}
localIDs, err := svc.GetGuestIDList(context.Background())
Expand All @@ -139,11 +141,11 @@ func startLoop(ctx context.Context, svc service.Service) {
func Setup(ctx context.Context, cfg *configs.Config, t *testing.T) (err error) {
if t == nil {
corestore.Init(ctx, &cfg.Eru)
if sto = corestore.Get(); sto == nil {
if stor = corestore.Get(); stor == nil {
return common.ErrGetStoreFailed
}
} else {
sto = storemocks.NewFakeStore()
stor = storemocks.NewFakeStore()
}
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions internal/eru/recycle/recycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (

virttypes "github.com/projecteru2/libyavirt/types"
"github.com/projecteru2/yavirt/internal/service/mocks"
"github.com/projecteru2/yavirt/internal/utils/notify/bison"
"github.com/projecteru2/yavirt/pkg/notify/bison"
"github.com/projecteru2/yavirt/pkg/test/assert"
"github.com/projecteru2/yavirt/pkg/test/mock"

Expand All @@ -27,7 +27,7 @@ func TestDeleteGuest(t *testing.T) {
assert.Nil(t, err)

svc := &mocks.Service{}
mockSto := sto.(*storemocks.MockStore)
mockSto := stor.(*storemocks.MockStore)

// still in eru
mockSto.On("GetWorkload", mock.Anything, mock.Anything).Return(
Expand Down Expand Up @@ -61,7 +61,7 @@ func TestNormal(t *testing.T) {

svc := &mocks.Service{}
svc.On("GetGuestIDList", mock.Anything).Return([]string{"00033017009174384208170000000001", "00033017009174384208170000000002"}, nil)
mockSto := sto.(*storemocks.MockStore)
mockSto := stor.(*storemocks.MockStore)
mockSto.On("ListNodeWorkloads", mock.Anything, mock.Anything).Return(
[]*types.Workload{
{
Expand Down
41 changes: 20 additions & 21 deletions internal/eru/resources/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
stotypes "github.com/projecteru2/resource-storage/storage/types"
"github.com/projecteru2/yavirt/configs"
"github.com/projecteru2/yavirt/internal/eru/types"
"github.com/projecteru2/yavirt/internal/utils/notify/bison"
"github.com/projecteru2/yavirt/pkg/notify/bison"
gputypes "github.com/yuyang0/resource-gpu/gpu/types"
)

Expand All @@ -33,7 +33,7 @@ func NewCoreResourcesManager() *CoreResourcesManager {
return &CoreResourcesManager{}
}

func (cm *CoreResourcesManager) fetchResources() {
func (cm *CoreResourcesManager) fetchResourcesWithLock() {
ctx, cancel := context.WithTimeout(context.TODO(), 30*time.Second)
defer cancel()
logger := log.WithFunc("fetchResources")
Expand Down Expand Up @@ -83,7 +83,7 @@ func (cm *CoreResourcesManager) GetCpumem() (ans *cpumemtypes.NodeResource) {
if ans != nil {
return
}
cm.fetchResources()
cm.fetchResourcesWithLock()
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.cpumem
Expand All @@ -96,7 +96,7 @@ func (cm *CoreResourcesManager) GetGPU() (ans *gputypes.NodeResource) {
if ans != nil {
return
}
cm.fetchResources()
cm.fetchResourcesWithLock()
cm.mu.Lock()
defer cm.mu.Unlock()
return cm.gpu
Expand All @@ -115,7 +115,7 @@ func (cm *CoreResourcesManager) UpdateGPU(nr *gputypes.NodeResource) {
remoteNR1 := remoteNR.DeepCopy()
remoteNR1.Sub(nr)
if remoteNR1.Count() == 0 {
logger.Debugf(ctx, "remote gpu config is consistent")
logger.Debug(ctx, "remote gpu config is consistent")
return
}
}
Expand All @@ -136,14 +136,14 @@ func (cm *CoreResourcesManager) UpdateGPU(nr *gputypes.NodeResource) {

notifier := bison.GetService()
if notifier != nil {
msgList := []string{
"<font color=#00CC33 size=10>update core gpu resource successfully</font>",
"---",
"",
fmt.Sprintf("- **node:** %s", configs.Hostname()),
fmt.Sprintf("- **gpu:** %v", nr),
}
text := "\n" + strings.Join(msgList, "\n")
text := fmt.Sprintf(`
<font color=#00CC33 size=10>update core gpu resource successfully</font>
---
- **node:** %s
- **gpu:** %v
`, configs.Hostname(), nr)
_ = notifier.SendMarkdown(ctx, "update core gpu resource successfully", text)
}

Expand Down Expand Up @@ -184,14 +184,13 @@ func (cm *CoreResourcesManager) UpdateCPUMem(nr *cpumemtypes.NodeResource) (err

notifier := bison.GetService()
if notifier != nil {
msgList := []string{
"<font color=#00CC33 size=10>update core cpumem resource successfully</font>",
"---",
"",
fmt.Sprintf("- **node:** %s", configs.Hostname()),
fmt.Sprintf("- **cpumem:** %+v", localNR),
}
text := "\n" + strings.Join(msgList, "\n")
text := fmt.Sprintf(`
<font color=#00CC33 size=10>update core cpumem resource successfully</font>
---
- **node:** %s
- **cpumem:** %+v
`, configs.Hostname(), localNR)
_ = notifier.SendMarkdown(ctx, "update core cpumem resource successfully", text)
}

Expand Down
6 changes: 3 additions & 3 deletions internal/network/drivers/ovn/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
)

var (
calicoHealthyDesc = prometheus.NewDesc(
ovnHealthyDesc = prometheus.NewDesc(
prometheus.BuildFQName("node", "ovn", "healthy"),
"ovn healthy status.",
[]string{"node"},
Expand All @@ -25,13 +25,13 @@ func (d *Driver) GetMetricsCollector() prometheus.Collector {
}

func (e *MetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- calicoHealthyDesc
ch <- ovnHealthyDesc
}

func (e *MetricsCollector) Collect(ch chan<- prometheus.Metric) {
healthy := utils.Bool2Int(e.healthy.Load())
ch <- prometheus.MustNewConstMetric(
calicoHealthyDesc,
ovnHealthyDesc,
prometheus.GaugeValue,
float64(healthy),
configs.Hostname(),
Expand Down
10 changes: 5 additions & 5 deletions internal/network/drivers/vlan/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
)

var (
calicoHealthyDesc = prometheus.NewDesc(
prometheus.BuildFQName("network", "ovn", "healthy"),
"ovn healthy status.",
vlanHealthyDesc = prometheus.NewDesc(
prometheus.BuildFQName("network", "vlan", "healthy"),
"vlan healthy status.",
[]string{"node"},
nil)
)
Expand All @@ -25,13 +25,13 @@ func (d *Handler) GetMetricsCollector() prometheus.Collector {
}

func (e *MetricsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- calicoHealthyDesc
ch <- vlanHealthyDesc
}

func (e *MetricsCollector) Collect(ch chan<- prometheus.Metric) {
healthy := utils.Bool2Int(e.healthy.Load())
ch <- prometheus.MustNewConstMetric(
calicoHealthyDesc,
vlanHealthyDesc,
prometheus.GaugeValue,
float64(healthy),
configs.Hostname(),
Expand Down
Loading

0 comments on commit 28a6c2f

Please sign in to comment.