diff --git a/apis/task.go b/apis/task.go index 2649da3..08c2451 100644 --- a/apis/task.go +++ b/apis/task.go @@ -1,19 +1,21 @@ package apis import ( + "errors" "log/slog" "net/http" - "github.com/mss-boot-io/mss-boot-admin/center" - "github.com/gin-gonic/gin" + "github.com/mss-boot-io/mss-boot-admin/center" "github.com/mss-boot-io/mss-boot-admin/config" "github.com/mss-boot-io/mss-boot-admin/dto" "github.com/mss-boot-io/mss-boot-admin/models" + "github.com/mss-boot-io/mss-boot/core/server/task" "github.com/mss-boot-io/mss-boot/pkg/enum" "github.com/mss-boot-io/mss-boot/pkg/response" "github.com/mss-boot-io/mss-boot/pkg/response/actions" "github.com/mss-boot-io/mss-boot/pkg/response/controller" + "gorm.io/gorm" ) /* @@ -82,27 +84,37 @@ func (e *Task) Operate(c *gin.Context) { api.Err(http.StatusUnprocessableEntity) return } - var count int64 - err := center.Default.GetDB(c, &models.Task{}).Model(&models.Task{}).Where("id = ?", req.ID).Count(&count).Error + t := &models.Task{} + err := center.Default.GetDB(c, &models.Task{}). + Model(&models.Task{}). + Where("id = ?", req.ID). + First(t).Error if err != nil { - api.AddError(err).Log.Error("count task error") + if errors.Is(err, gorm.ErrRecordNotFound) { + api.Err(http.StatusNotFound) + return + } + api.AddError(err).Log.Error("get task error") api.Err(http.StatusInternalServerError) return } - if count == 0 { - api.Err(http.StatusNotFound) - return - } var status enum.Status switch req.Operate { case "start": + err = task.UpdateJob(t.ID, t.Spec, t) status = enum.Enabled case "stop": + err = task.RemoveJob(t.ID) status = enum.Disabled default: api.Err(http.StatusBadRequest, "operate not support") return } + if err != nil { + api.AddError(err).Log.Error("task operate error") + api.Err(http.StatusInternalServerError) + return + } err = center.Default.GetDB(c, &models.Task{}).Model(&models.Task{}).Where("id = ?", req.ID).Update("status", status).Error if err != nil { diff --git a/cmd/server/server.go b/cmd/server/server.go index 77ad21d..d43fbbd 100644 --- a/cmd/server/server.go +++ b/cmd/server/server.go @@ -190,7 +190,11 @@ func setup() error { // setup 06 task init if config.Cfg.Task.Enable { runnable = append(runnable, - task.New(task.WithStorage(&models.TaskStorage{DB: gormdb.DB}), task.WithSchedule("task", config.Cfg.Task.Spec, &taskE{}))) + task.New( + task.WithStorage(&models.TaskStorage{DB: gormdb.DB}), + task.WithSchedule("task", config.Cfg.Task.Spec, &taskE{}), + ), + ) } // setup 07 init virtual models @@ -250,7 +254,7 @@ func (t *taskE) Run() { } } //check - err = gormdb.DB.Where("provider = ?", models.TaskProviderDefault).Where("status = ?", enum.Enabled).Find(&tasks).Error + err = gormdb.DB.Not("provider = ?", models.TaskProviderK8S).Where("status = ?", enum.Enabled).Find(&tasks).Error if err != nil { slog.Error("task run get tasks error", slog.Any("err", err)) return diff --git a/config/application.yml b/config/application.yml index 843e22e..a36b981 100644 --- a/config/application.yml +++ b/config/application.yml @@ -66,4 +66,7 @@ cache: - '*' memory: '' # redis: -# addr: '127.0.0.1:6379' \ No newline at end of file +# addr: '127.0.0.1:6379' +queue: + memory: + poolSize: 10 \ No newline at end of file diff --git a/go.mod b/go.mod index 0fe5689..51111f9 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/grafana/pyroscope-go v1.2.2 github.com/larksuite/oapi-sdk-go/v3 v3.4.16 - github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420 + github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88 github.com/nsqio/go-nsq v1.1.0 github.com/redis/go-redis/v9 v9.8.0 github.com/robfig/cron/v3 v3.0.1 diff --git a/go.sum b/go.sum index 7990241..67140e9 100644 --- a/go.sum +++ b/go.sum @@ -498,6 +498,8 @@ github.com/montanaflynn/stats v0.7.1 h1:etflOAAHORrCC44V+aR6Ftzort912ZU+YLiSTuV8 github.com/montanaflynn/stats v0.7.1/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420 h1:rf7+hhmLuTkuJcg24yzVQavThuZKYl4pfzLNOq4Cgfg= github.com/mss-boot-io/mss-boot v0.5.1-0.20250427230712-554bc1183420/go.mod h1:/CeSAd5sHVMzPjT5yEl8yMUWwsw/xnVBi7mQy+qoJ04= +github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88 h1:pwXYu72+eD6A11j0vWxy6XZpgt0NWH8upM0jm3sjCkw= +github.com/mss-boot-io/mss-boot v0.5.1-0.20250503002006-869d064bbb88/go.mod h1:13tAJryxeyFpAMFvIe39C8jLvQBLLAckWee0yfh3X70= github.com/mss-boot-io/redisqueue/v2 v2.0.0-20240222064111-d36e396df7f9 h1:/YgpHiqgrxz/0+mKoETXu21c6/fyDt7/j9bdD3UkbdU= github.com/mss-boot-io/redisqueue/v2 v2.0.0-20240222064111-d36e396df7f9/go.mod h1:f/sISkwvecPq37zygQzWNj5ntJoP/jcoK0kPIjFTWFI= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= diff --git a/models/task.go b/models/task.go index 6df6bb0..49386d2 100644 --- a/models/task.go +++ b/models/task.go @@ -2,6 +2,7 @@ package models import ( "bytes" + "context" "database/sql" "encoding/json" "errors" @@ -39,6 +40,13 @@ const ( TaskProviderFunc TaskProvider = "func" ) +func init() { + TaskFuncMap["test"] = func(ctx context.Context, args ...string) error { + fmt.Println("task", time.Now(), args) + return nil + } +} + // Task support http/grpc/script type Task struct { ModelGormTenant @@ -210,7 +218,7 @@ func (t *Task) AfterUpdate(tx *gorm.DB) error { func (t *Task) AfterDelete(tx *gorm.DB) error { switch t.Provider { - case TaskProviderDefault, "": + case TaskProviderDefault, "", TaskProviderFunc: return nil } clientSet := config.Cfg.Clusters.GetClientSet(t.Cluster) @@ -383,7 +391,10 @@ func (t *TaskStorage) Remove(key string) error { } tk.EntryID = 0 tk.CheckedAt = sql.NullTime{} - return t.DB.Updates(tk).Error + return t.DB.Model(tk).UpdateColumns(map[string]interface{}{ + "entry_id": 0, + "checked_at": nil, + }).Error } func (t *TaskStorage) ListKeys() ([]string, error) { @@ -391,7 +402,7 @@ func (t *TaskStorage) ListKeys() ([]string, error) { return nil, fmt.Errorf("db is nil") } var tasks []*Task - err := t.DB.Where("status = ?", enum.Enabled).Where("provider = ?", TaskProviderDefault).Find(&tasks).Error + err := t.DB.Where("status = ?", enum.Enabled).Not("provider = ?", TaskProviderK8S).Find(&tasks).Error if err != nil { return nil, err }