Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions apis/task.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,21 @@
package apis

import (
"errors"
"log/slog"
"net/http"

"github.com/mss-boot-io/mss-boot-admin/center"

"github.com/gin-gonic/gin"
"github.com/mss-boot-io/mss-boot-admin/center"
"github.com/mss-boot-io/mss-boot-admin/config"
"github.com/mss-boot-io/mss-boot-admin/dto"
"github.com/mss-boot-io/mss-boot-admin/models"
"github.com/mss-boot-io/mss-boot/core/server/task"
"github.com/mss-boot-io/mss-boot/pkg/enum"
"github.com/mss-boot-io/mss-boot/pkg/response"
"github.com/mss-boot-io/mss-boot/pkg/response/actions"
"github.com/mss-boot-io/mss-boot/pkg/response/controller"
"gorm.io/gorm"
)

/*
Expand Down Expand Up @@ -82,27 +84,37 @@ func (e *Task) Operate(c *gin.Context) {
api.Err(http.StatusUnprocessableEntity)
return
}
var count int64
err := center.Default.GetDB(c, &models.Task{}).Model(&models.Task{}).Where("id = ?", req.ID).Count(&count).Error
t := &models.Task{}
err := center.Default.GetDB(c, &models.Task{}).
Model(&models.Task{}).
Where("id = ?", req.ID).
First(t).Error
if err != nil {
api.AddError(err).Log.Error("count task error")
if errors.Is(err, gorm.ErrRecordNotFound) {
api.Err(http.StatusNotFound)
return
}
api.AddError(err).Log.Error("get task error")
api.Err(http.StatusInternalServerError)
return
}
if count == 0 {
api.Err(http.StatusNotFound)
return
}
var status enum.Status
switch req.Operate {
case "start":
err = task.UpdateJob(t.ID, t.Spec, t)
status = enum.Enabled
case "stop":
err = task.RemoveJob(t.ID)
status = enum.Disabled
default:
api.Err(http.StatusBadRequest, "operate not support")
return
}
if err != nil {
api.AddError(err).Log.Error("task operate error")
api.Err(http.StatusInternalServerError)
return
}

err = center.Default.GetDB(c, &models.Task{}).Model(&models.Task{}).Where("id = ?", req.ID).Update("status", status).Error
if err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,11 @@ func setup() error {
// setup 06 task init
if config.Cfg.Task.Enable {
runnable = append(runnable,
task.New(task.WithStorage(&models.TaskStorage{DB: gormdb.DB}), task.WithSchedule("task", config.Cfg.Task.Spec, &taskE{})))
task.New(
task.WithStorage(&models.TaskStorage{DB: gormdb.DB}),
task.WithSchedule("task", config.Cfg.Task.Spec, &taskE{}),
),
)
}

// setup 07 init virtual models
Expand Down Expand Up @@ -250,7 +254,7 @@ func (t *taskE) Run() {
}
}
//check
err = gormdb.DB.Where("provider = ?", models.TaskProviderDefault).Where("status = ?", enum.Enabled).Find(&tasks).Error
err = gormdb.DB.Not("provider = ?", models.TaskProviderK8S).Where("status = ?", enum.Enabled).Find(&tasks).Error
if err != nil {
slog.Error("task run get tasks error", slog.Any("err", err))
return
Expand Down
5 changes: 4 additions & 1 deletion config/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,4 +66,7 @@ cache:
- '*'
memory: ''
# redis:
# addr: '127.0.0.1:6379'
# addr: '127.0.0.1:6379'
queue:
memory:
poolSize: 10
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674
github.com/grafana/pyroscope-go v1.2.2
github.com/larksuite/oapi-sdk-go/v3 v3.4.16
github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420
github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88
github.com/nsqio/go-nsq v1.1.0
github.com/redis/go-redis/v9 v9.8.0
github.com/robfig/cron/v3 v3.0.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,8 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8
github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow=
github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420 h1:rf7+hhmLuTkuJcg24yzVQavThuZKYl4pfzLNOq4Cgfg=
github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420/go.mod h1:/CeSAd5sHVMzPjT5yEl8yMUWwsw/xnVBi7mQy+qoJ04=
github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88 h1:pwXYu72+eD6A11j0vWxy6XZpgt0NWH8upM0jm3sjCkw=
github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88/go.mod h1:13tAJryxeyFpAMFvIe39C8jLvQBLLAckWee0yfh3X70=
github.com/mss-boot-io/redisqueue/v2 v2.0.0-20240222064111-d36e396df7f9 h1:/YgpHiqgrxz/0+mKoETXu21c6/fyDt7/j9bdD3UkbdU=
github.com/mss-boot-io/redisqueue/v2 v2.0.0-20240222064111-d36e396df7f9/go.mod h1:f/sISkwvecPq37zygQzWNj5ntJoP/jcoK0kPIjFTWFI=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
Expand Down
17 changes: 14 additions & 3 deletions models/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package models

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
Expand Down Expand Up @@ -39,6 +40,13 @@ const (
TaskProviderFunc TaskProvider = "func"
)

func init() {
TaskFuncMap["test"] = func(ctx context.Context, args ...string) error {
fmt.Println("task", time.Now(), args)
return nil
}
}

// Task support http/grpc/script
type Task struct {
ModelGormTenant
Expand Down Expand Up @@ -210,7 +218,7 @@ func (t *Task) AfterUpdate(tx *gorm.DB) error {

func (t *Task) AfterDelete(tx *gorm.DB) error {
switch t.Provider {
case TaskProviderDefault, "":
case TaskProviderDefault, "", TaskProviderFunc:
return nil
}
clientSet := config.Cfg.Clusters.GetClientSet(t.Cluster)
Expand Down Expand Up @@ -383,15 +391,18 @@ func (t *TaskStorage) Remove(key string) error {
}
tk.EntryID = 0
tk.CheckedAt = sql.NullTime{}
return t.DB.Updates(tk).Error
return t.DB.Model(tk).UpdateColumns(map[string]interface{}{
"entry_id": 0,
"checked_at": nil,
}).Error
}

func (t *TaskStorage) ListKeys() ([]string, error) {
if t.DB == nil {
return nil, fmt.Errorf("db is nil")
}
var tasks []*Task
err := t.DB.Where("status = ?", enum.Enabled).Where("provider = ?", TaskProviderDefault).Find(&tasks).Error
err := t.DB.Where("status = ?", enum.Enabled).Not("provider = ?", TaskProviderK8S).Find(&tasks).Error
if err != nil {
return nil, err
}
Expand Down