From 93d2cf6ed4ed1c6a7992b44f59990360e7cead4f Mon Sep 17 00:00:00 2001 From: lwnmengjing Date: Fri, 8 Dec 2023 17:07:29 +0800 Subject: [PATCH 1/2] :sparkles: feat: task enabled --- Makefile | 3 +- core/server/task/options.go | 28 +-- core/server/task/server.go | 51 ++--- core/server/task/storage.go | 90 +++++++++ pkg/response/api.go | 3 + pkg/search/gorms/query.go | 33 +-- proto/task.pb.go | 392 ++++++++++++++++++++++++++++++++++++ proto/task.proto | 36 ++++ proto/task_grpc.pb.go | 173 ++++++++++++++++ 9 files changed, 756 insertions(+), 53 deletions(-) create mode 100644 core/server/task/storage.go create mode 100644 proto/task.pb.go create mode 100644 proto/task.proto create mode 100644 proto/task_grpc.pb.go diff --git a/Makefile b/Makefile index 4c6f2a9..d0cd1b3 100644 --- a/Makefile +++ b/Makefile @@ -15,8 +15,7 @@ SRC_DIR=$(GOPATH)/src .PHONY: proto proto: - #protoc-go-inject-tag -I ./proto -I ${GOPATH}/src --go_out=plugins=grpc: proto/${W}/${V}/*; - find proto/ -name '*.proto' -exec protoc --proto_path=$(PROTO_PATH) $(PROTO_FLAGS) --go_out=plugins=grpc:. {} \; + protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/*.proto .PHONY: lint diff --git a/core/server/task/options.go b/core/server/task/options.go index d51e48d..209349a 100644 --- a/core/server/task/options.go +++ b/core/server/task/options.go @@ -8,8 +8,6 @@ package task */ import ( - "sync" - "github.com/robfig/cron/v3" ) @@ -23,26 +21,30 @@ type schedule struct { } type options struct { - task *cron.Cron - schedules map[string]schedule - mux sync.Mutex + task *cron.Cron + storage Storage } // WithSchedule set schedule func WithSchedule(key string, spec string, job cron.Job) Option { return func(o *options) { - o.mux.Lock() - o.schedules[key] = schedule{ - spec: spec, - job: job, - } - o.mux.Unlock() + o.storage.Set(key, 0, spec, job) + } +} + +// WithStorage set storage +func WithStorage(s Storage) Option { + return func(o *options) { + o.storage = s } + } func setDefaultOption() options { return options{ - task: cron.New(cron.WithSeconds(), cron.WithChain()), - schedules: make(map[string]schedule), + task: cron.New(cron.WithSeconds(), cron.WithChain()), + storage: &defaultStorage{ + schedules: make(map[string]*schedule), + }, } } diff --git a/core/server/task/server.go b/core/server/task/server.go index 7e4154d..5d59966 100644 --- a/core/server/task/server.go +++ b/core/server/task/server.go @@ -32,47 +32,43 @@ func New(opts ...Option) *Server { // GetJob get job func GetJob(key string) (string, cron.Job, bool) { - task.opts.mux.Lock() - defer task.opts.mux.Unlock() - s, ok := task.opts.schedules[key] + _, spec, job, ok, _ := task.opts.storage.Get(key) if !ok { return "", nil, false } - return s.spec, s.job, true + return spec, job, true +} + +// Entry get entry +func Entry(entryID cron.EntryID) cron.Entry { + return task.opts.task.Entry(entryID) + } // UpdateJob update or create job func UpdateJob(key string, spec string, job cron.Job) error { - task.opts.mux.Lock() - defer task.opts.mux.Unlock() - s, ok := task.opts.schedules[key] + var err error + entryID, _, _, ok, _ := task.opts.storage.Get(key) if ok { - task.opts.task.Remove(s.entryID) + task.opts.task.Remove(entryID) } - entryID, err := task.opts.task.AddJob(spec, job) + entryID, err = task.opts.task.AddJob(spec, job) if err != nil { slog.Error("task add job error", slog.Any("err", err)) return err } - task.opts.schedules[key] = schedule{ - spec: spec, - job: job, - entryID: entryID, - } + task.opts.storage.Update(key, entryID) return nil } // RemoveJob remove job func RemoveJob(key string) error { - task.opts.mux.Lock() - defer task.opts.mux.Unlock() - s, ok := task.opts.schedules[key] + entryID, _, _, ok, _ := task.opts.storage.Get(key) if !ok { return nil } - task.opts.task.Remove(s.entryID) - delete(task.opts.schedules, key) - return nil + task.opts.task.Remove(entryID) + return task.opts.storage.Remove(key) } // Options set options @@ -91,13 +87,22 @@ func (e *Server) String() string { func (e *Server) Start(ctx context.Context) error { var err error e.ctx = ctx - for i, s := range e.opts.schedules { - s.entryID, err = e.opts.task.AddJob(e.opts.schedules[i].spec, e.opts.schedules[i].job) + keys, _ := e.opts.storage.ListKeys() + for i := range keys { + entryID, spec, job, ok, _ := e.opts.storage.Get(keys[i]) + if !ok { + continue + } + entryID, err = e.opts.task.AddJob(spec, job) if err != nil { slog.ErrorContext(ctx, "task add job error", slog.Any("err", err)) return err } - e.opts.schedules[i] = s + err = e.opts.storage.Update(keys[i], entryID) + if err != nil { + slog.ErrorContext(ctx, "task update job error", slog.Any("err", err)) + return err + } } go func() { e.opts.task.Run() diff --git a/core/server/task/storage.go b/core/server/task/storage.go new file mode 100644 index 0000000..569a4df --- /dev/null +++ b/core/server/task/storage.go @@ -0,0 +1,90 @@ +package task + +import ( + "sync" + + "github.com/robfig/cron/v3" +) + +/* + * @Author: lwnmengjing + * @Date: 2023/12/5 16:56:16 + * @Last Modified by: lwnmengjing + * @Last Modified time: 2023/12/5 16:56:16 + */ + +// Storage storage interface +type Storage interface { + Get(key string) (entryID cron.EntryID, spec string, job cron.Job, exist bool, err error) + Set(key string, entryID cron.EntryID, spec string, job cron.Job) error + Update(key string, entryID cron.EntryID) error + Remove(key string) error + ListKeys() ([]string, error) +} + +type defaultStorage struct { + schedules map[string]*schedule + mux sync.Mutex +} + +// Get schedule +func (s *defaultStorage) Get(key string) (entryID cron.EntryID, spec string, job cron.Job, exist bool, err error) { + if s.schedules == nil { + return + } + item, ok := s.schedules[key] + if !ok { + return + } + entryID = item.entryID + spec = item.spec + job = item.job + exist = true + return +} + +// Set schedule +func (s *defaultStorage) Set(key string, entryID cron.EntryID, spec string, job cron.Job) error { + s.mux.Lock() + defer s.mux.Unlock() + if s.schedules == nil { + s.schedules = make(map[string]*schedule) + } + s.schedules[key] = &schedule{ + spec: spec, + entryID: entryID, + job: job, + } + return nil +} + +// Update schedule +func (s *defaultStorage) Update(key string, entryID cron.EntryID) error { + if s.schedules == nil { + s.schedules = make(map[string]*schedule) + return nil + } + item, ok := s.schedules[key] + if !ok { + return nil + } + item.entryID = entryID + return nil +} + +func (s *defaultStorage) Remove(key string) error { + if s.schedules == nil { + return nil + } + delete(s.schedules, key) + return nil +} + +// ListKeys list keys +func (s *defaultStorage) ListKeys() ([]string, error) { + keys := make([]string, 0, len(s.schedules)) + for k := range s.schedules { + keys = append(keys, k) + } + return keys, nil +} diff --git a/pkg/response/api.go b/pkg/response/api.go index f96870f..4257b08 100644 --- a/pkg/response/api.go +++ b/pkg/response/api.go @@ -104,6 +104,9 @@ func (e *API) AddError(err error) *API { } else if err != nil { e.Error = fmt.Errorf("%v; %w", e.Error, err) } + if e.Error != nil { + e.Log = e.Log.With("error", e.Error) + } return e } diff --git a/pkg/search/gorms/query.go b/pkg/search/gorms/query.go index c4dd073..858d86e 100644 --- a/pkg/search/gorms/query.go +++ b/pkg/search/gorms/query.go @@ -74,11 +74,14 @@ func parseSQL(driver string, searchTag *resolveSearchTag, condition Condition, q if driver == Postgres { iStr = "i" } + if searchTag.Table != "" { + searchTag.Table = fmt.Sprintf("`%s`.", searchTag.Table) + } switch searchTag.Type { case "left": //左关联 join := condition.SetJoinOn(searchTag.Type, fmt.Sprintf( - "left join %s on %s.%s = %s.%s", + "left join `%s` on `%s`.`%s` = %s.`%s`", searchTag.Join, searchTag.Join, searchTag.On[0], @@ -87,37 +90,37 @@ func parseSQL(driver string, searchTag *resolveSearchTag, condition Condition, q )) ResolveSearchQuery(driver, qValue.Field(i).Interface(), join) case "exact", "iexact": - condition.SetWhere(fmt.Sprintf("%s.%s = ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` = ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "contains": - condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String() + "%"}) + condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String() + "%"}) case "icontains": - condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String() + "%"}) + condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String() + "%"}) case "gt": - condition.SetWhere(fmt.Sprintf("%s.%s > ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` > ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "gte": - condition.SetWhere(fmt.Sprintf("%s.%s >= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` >= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "lt": - condition.SetWhere(fmt.Sprintf("%s.%s < ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` < ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "lte": - condition.SetWhere(fmt.Sprintf("%s.%s <= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` <= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "startswith": - condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).String() + "%"}) + condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).String() + "%"}) case "istartswith": - condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{qValue.Field(i).String() + "%"}) + condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{qValue.Field(i).String() + "%"}) case "endswith": - condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String()}) + condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String()}) case "iendswith": - condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String()}) + condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String()}) case "in": - condition.SetWhere(fmt.Sprintf("`%s`.`%s` in (?)", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) + condition.SetWhere(fmt.Sprintf("%s`%s` in (?)", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()}) case "isnull": if !(qValue.Field(i).IsZero() && qValue.Field(i).IsNil()) { - condition.SetWhere(fmt.Sprintf("%s.%s` isnull", searchTag.Table, searchTag.Column), make([]interface{}, 0)) + condition.SetWhere(fmt.Sprintf("%s`%s` isnull", searchTag.Table, searchTag.Column), make([]interface{}, 0)) } case "order": switch strings.ToLower(qValue.Field(i).String()) { case "desc", "asc": - condition.SetOrder(fmt.Sprintf("%s.%s %s", searchTag.Table, searchTag.Column, qValue.Field(i).String())) + condition.SetOrder(fmt.Sprintf("%s`%s` %s", searchTag.Table, searchTag.Column, qValue.Field(i).String())) } } } diff --git a/proto/task.pb.go b/proto/task.pb.go new file mode 100644 index 0000000..906c79d --- /dev/null +++ b/proto/task.pb.go @@ -0,0 +1,392 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.21.2 +// source: proto/task.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ExecRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name *string `protobuf:"bytes,2,opt,name=name,proto3,oneof" json:"name,omitempty"` + Command string `protobuf:"bytes,3,opt,name=command,proto3" json:"command,omitempty"` + Args []string `protobuf:"bytes,4,rep,name=args,proto3" json:"args,omitempty"` +} + +func (x *ExecRequest) Reset() { + *x = ExecRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_task_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExecRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecRequest) ProtoMessage() {} + +func (x *ExecRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_task_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecRequest.ProtoReflect.Descriptor instead. +func (*ExecRequest) Descriptor() ([]byte, []int) { + return file_proto_task_proto_rawDescGZIP(), []int{0} +} + +func (x *ExecRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *ExecRequest) GetName() string { + if x != nil && x.Name != nil { + return *x.Name + } + return "" +} + +func (x *ExecRequest) GetCommand() string { + if x != nil { + return x.Command + } + return "" +} + +func (x *ExecRequest) GetArgs() []string { + if x != nil { + return x.Args + } + return nil +} + +type ExecResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Content []byte `protobuf:"bytes,1,opt,name=content,proto3" json:"content,omitempty"` +} + +func (x *ExecResponse) Reset() { + *x = ExecResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_task_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ExecResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ExecResponse) ProtoMessage() {} + +func (x *ExecResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_task_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ExecResponse.ProtoReflect.Descriptor instead. +func (*ExecResponse) Descriptor() ([]byte, []int) { + return file_proto_task_proto_rawDescGZIP(), []int{1} +} + +func (x *ExecResponse) GetContent() []byte { + if x != nil { + return x.Content + } + return nil +} + +type StopRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name *string `protobuf:"bytes,2,opt,name=name,proto3,oneof" json:"name,omitempty"` +} + +func (x *StopRequest) Reset() { + *x = StopRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_task_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StopRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StopRequest) ProtoMessage() {} + +func (x *StopRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_task_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StopRequest.ProtoReflect.Descriptor instead. +func (*StopRequest) Descriptor() ([]byte, []int) { + return file_proto_task_proto_rawDescGZIP(), []int{2} +} + +func (x *StopRequest) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *StopRequest) GetName() string { + if x != nil && x.Name != nil { + return *x.Name + } + return "" +} + +type StopResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` +} + +func (x *StopResponse) Reset() { + *x = StopResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_task_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StopResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StopResponse) ProtoMessage() {} + +func (x *StopResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_task_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StopResponse.ProtoReflect.Descriptor instead. +func (*StopResponse) Descriptor() ([]byte, []int) { + return file_proto_task_proto_rawDescGZIP(), []int{3} +} + +func (x *StopResponse) GetSuccess() bool { + if x != nil { + return x.Success + } + return false +} + +func (x *StopResponse) GetMessage() string { + if x != nil { + return x.Message + } + return "" +} + +var File_proto_task_proto protoreflect.FileDescriptor + +var file_proto_task_proto_rawDesc = []byte{ + 0x0a, 0x10, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x61, 0x73, 0x6b, 0x2e, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x12, 0x05, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x6d, 0x0a, 0x0b, 0x45, 0x78, 0x65, + 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x17, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x88, 0x01, + 0x01, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x18, 0x03, 0x20, 0x01, + 0x28, 0x09, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x61, 0x6e, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x61, + 0x72, 0x67, 0x73, 0x18, 0x04, 0x20, 0x03, 0x28, 0x09, 0x52, 0x04, 0x61, 0x72, 0x67, 0x73, 0x42, + 0x07, 0x0a, 0x05, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x22, 0x28, 0x0a, 0x0c, 0x45, 0x78, 0x65, 0x63, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x63, 0x6f, 0x6e, 0x74, + 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x63, 0x6f, 0x6e, 0x74, 0x65, + 0x6e, 0x74, 0x22, 0x3f, 0x0a, 0x0b, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, + 0x64, 0x12, 0x17, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, + 0x00, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x42, 0x07, 0x0a, 0x05, 0x5f, 0x6e, + 0x61, 0x6d, 0x65, 0x22, 0x42, 0x0a, 0x0c, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, 0x0a, + 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, + 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x32, 0x6e, 0x0a, 0x04, 0x54, 0x61, 0x73, 0x6b, 0x12, + 0x33, 0x0a, 0x04, 0x45, 0x78, 0x65, 0x63, 0x12, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, + 0x45, 0x78, 0x65, 0x63, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x30, 0x01, 0x12, 0x31, 0x0a, 0x04, 0x53, 0x74, 0x6f, 0x70, 0x12, 0x12, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x13, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x74, 0x6f, 0x70, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x28, 0x0a, 0x10, 0x6d, 0x73, 0x73, 0x2d, 0x62, + 0x6f, 0x6f, 0x74, 0x2d, 0x69, 0x6f, 0x2e, 0x74, 0x61, 0x73, 0x6b, 0x42, 0x09, 0x54, 0x61, 0x73, + 0x6b, 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x50, 0x01, 0x5a, 0x07, 0x2e, 0x2f, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_task_proto_rawDescOnce sync.Once + file_proto_task_proto_rawDescData = file_proto_task_proto_rawDesc +) + +func file_proto_task_proto_rawDescGZIP() []byte { + file_proto_task_proto_rawDescOnce.Do(func() { + file_proto_task_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_task_proto_rawDescData) + }) + return file_proto_task_proto_rawDescData +} + +var file_proto_task_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_proto_task_proto_goTypes = []interface{}{ + (*ExecRequest)(nil), // 0: proto.ExecRequest + (*ExecResponse)(nil), // 1: proto.ExecResponse + (*StopRequest)(nil), // 2: proto.StopRequest + (*StopResponse)(nil), // 3: proto.StopResponse +} +var file_proto_task_proto_depIdxs = []int32{ + 0, // 0: proto.Task.Exec:input_type -> proto.ExecRequest + 2, // 1: proto.Task.Stop:input_type -> proto.StopRequest + 1, // 2: proto.Task.Exec:output_type -> proto.ExecResponse + 3, // 3: proto.Task.Stop:output_type -> proto.StopResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_proto_task_proto_init() } +func file_proto_task_proto_init() { + if File_proto_task_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_task_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExecRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_task_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ExecResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_task_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StopRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_task_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StopResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + file_proto_task_proto_msgTypes[0].OneofWrappers = []interface{}{} + file_proto_task_proto_msgTypes[2].OneofWrappers = []interface{}{} + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_task_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_proto_task_proto_goTypes, + DependencyIndexes: file_proto_task_proto_depIdxs, + MessageInfos: file_proto_task_proto_msgTypes, + }.Build() + File_proto_task_proto = out.File + file_proto_task_proto_rawDesc = nil + file_proto_task_proto_goTypes = nil + file_proto_task_proto_depIdxs = nil +} diff --git a/proto/task.proto b/proto/task.proto new file mode 100644 index 0000000..0d84067 --- /dev/null +++ b/proto/task.proto @@ -0,0 +1,36 @@ +syntax = "proto3"; + +package proto; + +option go_package = "./proto"; +option java_multiple_files = true; +option java_package = "mss-boot-io.task"; +option java_outer_classname = "TaskProto"; + +service Task{ + // Exec task stream + rpc Exec(ExecRequest) returns (stream ExecResponse) {} + // Stop task stop, optional + rpc Stop(StopRequest) returns (StopResponse) {} +} + +message ExecRequest{ + string id = 1; + optional string name = 2; + string command = 3; + repeated string args = 4; +} + +message ExecResponse { + bytes content = 1; +} + +message StopRequest { + string id = 1; + optional string name = 2; +} + +message StopResponse { + bool success = 1; + string message = 2; +} \ No newline at end of file diff --git a/proto/task_grpc.pb.go b/proto/task_grpc.pb.go new file mode 100644 index 0000000..04f685c --- /dev/null +++ b/proto/task_grpc.pb.go @@ -0,0 +1,173 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.2 +// source: proto/task.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// TaskClient is the client API for Task service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type TaskClient interface { + // Exec task stream + Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (Task_ExecClient, error) + // Stop task stop, optional + Stop(ctx context.Context, in *StopRequest, opts ...grpc.CallOption) (*StopResponse, error) +} + +type taskClient struct { + cc grpc.ClientConnInterface +} + +func NewTaskClient(cc grpc.ClientConnInterface) TaskClient { + return &taskClient{cc} +} + +func (c *taskClient) Exec(ctx context.Context, in *ExecRequest, opts ...grpc.CallOption) (Task_ExecClient, error) { + stream, err := c.cc.NewStream(ctx, &Task_ServiceDesc.Streams[0], "/proto.Task/Exec", opts...) + if err != nil { + return nil, err + } + x := &taskExecClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Task_ExecClient interface { + Recv() (*ExecResponse, error) + grpc.ClientStream +} + +type taskExecClient struct { + grpc.ClientStream +} + +func (x *taskExecClient) Recv() (*ExecResponse, error) { + m := new(ExecResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *taskClient) Stop(ctx context.Context, in *StopRequest, opts ...grpc.CallOption) (*StopResponse, error) { + out := new(StopResponse) + err := c.cc.Invoke(ctx, "/proto.Task/Stop", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// TaskServer is the server API for Task service. +// All implementations must embed UnimplementedTaskServer +// for forward compatibility +type TaskServer interface { + // Exec task stream + Exec(*ExecRequest, Task_ExecServer) error + // Stop task stop, optional + Stop(context.Context, *StopRequest) (*StopResponse, error) + mustEmbedUnimplementedTaskServer() +} + +// UnimplementedTaskServer must be embedded to have forward compatible implementations. +type UnimplementedTaskServer struct { +} + +func (UnimplementedTaskServer) Exec(*ExecRequest, Task_ExecServer) error { + return status.Errorf(codes.Unimplemented, "method Exec not implemented") +} +func (UnimplementedTaskServer) Stop(context.Context, *StopRequest) (*StopResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Stop not implemented") +} +func (UnimplementedTaskServer) mustEmbedUnimplementedTaskServer() {} + +// UnsafeTaskServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to TaskServer will +// result in compilation errors. +type UnsafeTaskServer interface { + mustEmbedUnimplementedTaskServer() +} + +func RegisterTaskServer(s grpc.ServiceRegistrar, srv TaskServer) { + s.RegisterService(&Task_ServiceDesc, srv) +} + +func _Task_Exec_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(ExecRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(TaskServer).Exec(m, &taskExecServer{stream}) +} + +type Task_ExecServer interface { + Send(*ExecResponse) error + grpc.ServerStream +} + +type taskExecServer struct { + grpc.ServerStream +} + +func (x *taskExecServer) Send(m *ExecResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _Task_Stop_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(StopRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(TaskServer).Stop(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/proto.Task/Stop", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(TaskServer).Stop(ctx, req.(*StopRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Task_ServiceDesc is the grpc.ServiceDesc for Task service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Task_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "proto.Task", + HandlerType: (*TaskServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Stop", + Handler: _Task_Stop_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Exec", + Handler: _Task_Exec_Handler, + ServerStreams: true, + }, + }, + Metadata: "proto/task.proto", +} From b441910f4304bdc3f88da1350ecc49d472c896b1 Mon Sep 17 00:00:00 2001 From: lwnmengjing Date: Fri, 8 Dec 2023 17:28:16 +0800 Subject: [PATCH 2/2] :sparkles: feat: task enabled --- core/server/task/options.go | 2 +- core/server/task/server.go | 7 +++---- proto/task.pb.go | 5 +++-- proto/task_grpc.pb.go | 1 + 4 files changed, 8 insertions(+), 7 deletions(-) diff --git a/core/server/task/options.go b/core/server/task/options.go index 209349a..38f762c 100644 --- a/core/server/task/options.go +++ b/core/server/task/options.go @@ -28,7 +28,7 @@ type options struct { // WithSchedule set schedule func WithSchedule(key string, spec string, job cron.Job) Option { return func(o *options) { - o.storage.Set(key, 0, spec, job) + _ = o.storage.Set(key, 0, spec, job) } } diff --git a/core/server/task/server.go b/core/server/task/server.go index 5d59966..70d3fd8 100644 --- a/core/server/task/server.go +++ b/core/server/task/server.go @@ -57,8 +57,7 @@ func UpdateJob(key string, spec string, job cron.Job) error { slog.Error("task add job error", slog.Any("err", err)) return err } - task.opts.storage.Update(key, entryID) - return nil + return task.opts.storage.Update(key, entryID) } // RemoveJob remove job @@ -89,11 +88,11 @@ func (e *Server) Start(ctx context.Context) error { e.ctx = ctx keys, _ := e.opts.storage.ListKeys() for i := range keys { - entryID, spec, job, ok, _ := e.opts.storage.Get(keys[i]) + _, spec, job, ok, _ := e.opts.storage.Get(keys[i]) if !ok { continue } - entryID, err = e.opts.task.AddJob(spec, job) + entryID, err := e.opts.task.AddJob(spec, job) if err != nil { slog.ErrorContext(ctx, "task add job error", slog.Any("err", err)) return err diff --git a/proto/task.pb.go b/proto/task.pb.go index 906c79d..c1be8ac 100644 --- a/proto/task.pb.go +++ b/proto/task.pb.go @@ -7,10 +7,11 @@ package proto import ( - protoreflect "google.golang.org/protobuf/reflect/protoreflect" - protoimpl "google.golang.org/protobuf/runtime/protoimpl" reflect "reflect" sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" ) const ( diff --git a/proto/task_grpc.pb.go b/proto/task_grpc.pb.go index 04f685c..75f0e7c 100644 --- a/proto/task_grpc.pb.go +++ b/proto/task_grpc.pb.go @@ -8,6 +8,7 @@ package proto import ( context "context" + grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status"