Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ SRC_DIR=$(GOPATH)/src

.PHONY: proto
proto:
#protoc-go-inject-tag -I ./proto -I ${GOPATH}/src --go_out=plugins=grpc: proto/${W}/${V}/*;
find proto/ -name '*.proto' -exec protoc --proto_path=$(PROTO_PATH) $(PROTO_FLAGS) --go_out=plugins=grpc:. {} \;
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative proto/*.proto


.PHONY: lint
Expand Down
28 changes: 15 additions & 13 deletions core/server/task/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ package task
*/

import (
"sync"

"github.com/robfig/cron/v3"
)

Expand All @@ -23,26 +21,30 @@ type schedule struct {
}

type options struct {
task *cron.Cron
schedules map[string]schedule
mux sync.Mutex
task *cron.Cron
storage Storage
}

// WithSchedule set schedule
func WithSchedule(key string, spec string, job cron.Job) Option {
return func(o *options) {
o.mux.Lock()
o.schedules[key] = schedule{
spec: spec,
job: job,
}
o.mux.Unlock()
_ = o.storage.Set(key, 0, spec, job)
}
}

// WithStorage set storage
func WithStorage(s Storage) Option {
return func(o *options) {
o.storage = s
}

}

func setDefaultOption() options {
return options{
task: cron.New(cron.WithSeconds(), cron.WithChain()),
schedules: make(map[string]schedule),
task: cron.New(cron.WithSeconds(), cron.WithChain()),
storage: &defaultStorage{
schedules: make(map[string]*schedule),
},
}
}
52 changes: 28 additions & 24 deletions core/server/task/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,47 +32,42 @@ func New(opts ...Option) *Server {

// GetJob get job
func GetJob(key string) (string, cron.Job, bool) {
task.opts.mux.Lock()
defer task.opts.mux.Unlock()
s, ok := task.opts.schedules[key]
_, spec, job, ok, _ := task.opts.storage.Get(key)
if !ok {
return "", nil, false
}
return s.spec, s.job, true
return spec, job, true
}

// Entry get entry
func Entry(entryID cron.EntryID) cron.Entry {
return task.opts.task.Entry(entryID)

}

// UpdateJob update or create job
func UpdateJob(key string, spec string, job cron.Job) error {
task.opts.mux.Lock()
defer task.opts.mux.Unlock()
s, ok := task.opts.schedules[key]
var err error
entryID, _, _, ok, _ := task.opts.storage.Get(key)
if ok {
task.opts.task.Remove(s.entryID)
task.opts.task.Remove(entryID)
}
entryID, err := task.opts.task.AddJob(spec, job)
entryID, err = task.opts.task.AddJob(spec, job)
if err != nil {
slog.Error("task add job error", slog.Any("err", err))
return err
}
task.opts.schedules[key] = schedule{
spec: spec,
job: job,
entryID: entryID,
}
return nil
return task.opts.storage.Update(key, entryID)
}

// RemoveJob remove job
func RemoveJob(key string) error {
task.opts.mux.Lock()
defer task.opts.mux.Unlock()
s, ok := task.opts.schedules[key]
entryID, _, _, ok, _ := task.opts.storage.Get(key)
if !ok {
return nil
}
task.opts.task.Remove(s.entryID)
delete(task.opts.schedules, key)
return nil
task.opts.task.Remove(entryID)
return task.opts.storage.Remove(key)
}

// Options set options
Expand All @@ -91,13 +86,22 @@ func (e *Server) String() string {
func (e *Server) Start(ctx context.Context) error {
var err error
e.ctx = ctx
for i, s := range e.opts.schedules {
s.entryID, err = e.opts.task.AddJob(e.opts.schedules[i].spec, e.opts.schedules[i].job)
keys, _ := e.opts.storage.ListKeys()
for i := range keys {
_, spec, job, ok, _ := e.opts.storage.Get(keys[i])
if !ok {
continue
}
entryID, err := e.opts.task.AddJob(spec, job)
if err != nil {
slog.ErrorContext(ctx, "task add job error", slog.Any("err", err))
return err
}
e.opts.schedules[i] = s
err = e.opts.storage.Update(keys[i], entryID)
if err != nil {
slog.ErrorContext(ctx, "task update job error", slog.Any("err", err))
return err
}
}
go func() {
e.opts.task.Run()
Expand Down
90 changes: 90 additions & 0 deletions core/server/task/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package task

import (
"sync"

"github.com/robfig/cron/v3"
)

/*
* @Author: lwnmengjing<lwnmengjing@qq.com>
* @Date: 2023/12/5 16:56:16
* @Last Modified by: lwnmengjing<lwnmengjing@qq.com>
* @Last Modified time: 2023/12/5 16:56:16
*/

// Storage storage interface
type Storage interface {
Get(key string) (entryID cron.EntryID, spec string, job cron.Job, exist bool, err error)
Set(key string, entryID cron.EntryID, spec string, job cron.Job) error
Update(key string, entryID cron.EntryID) error
Remove(key string) error
ListKeys() ([]string, error)
}

type defaultStorage struct {
schedules map[string]*schedule
mux sync.Mutex
}

// Get schedule
func (s *defaultStorage) Get(key string) (entryID cron.EntryID, spec string, job cron.Job, exist bool, err error) {
if s.schedules == nil {
return
}
item, ok := s.schedules[key]
if !ok {
return
}
entryID = item.entryID
spec = item.spec
job = item.job
exist = true
return
}

// Set schedule
func (s *defaultStorage) Set(key string, entryID cron.EntryID, spec string, job cron.Job) error {
s.mux.Lock()
defer s.mux.Unlock()
if s.schedules == nil {
s.schedules = make(map[string]*schedule)
}
s.schedules[key] = &schedule{
spec: spec,
entryID: entryID,
job: job,
}
return nil
}

// Update schedule
func (s *defaultStorage) Update(key string, entryID cron.EntryID) error {
if s.schedules == nil {
s.schedules = make(map[string]*schedule)
return nil
}
item, ok := s.schedules[key]
if !ok {
return nil
}
item.entryID = entryID
return nil
}

func (s *defaultStorage) Remove(key string) error {
if s.schedules == nil {
return nil
}
delete(s.schedules, key)
return nil
}

// ListKeys list keys
func (s *defaultStorage) ListKeys() ([]string, error) {
keys := make([]string, 0, len(s.schedules))
for k := range s.schedules {
keys = append(keys, k)
}
return keys, nil
}
3 changes: 3 additions & 0 deletions pkg/response/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (e *API) AddError(err error) *API {
} else if err != nil {
e.Error = fmt.Errorf("%v; %w", e.Error, err)
}
if e.Error != nil {
e.Log = e.Log.With("error", e.Error)
}
return e
}

Expand Down
33 changes: 18 additions & 15 deletions pkg/search/gorms/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,14 @@ func parseSQL(driver string, searchTag *resolveSearchTag, condition Condition, q
if driver == Postgres {
iStr = "i"
}
if searchTag.Table != "" {
searchTag.Table = fmt.Sprintf("`%s`.", searchTag.Table)
}
switch searchTag.Type {
case "left":
//左关联
join := condition.SetJoinOn(searchTag.Type, fmt.Sprintf(
"left join %s on %s.%s = %s.%s",
"left join `%s` on `%s`.`%s` = %s.`%s`",
searchTag.Join,
searchTag.Join,
searchTag.On[0],
Expand All @@ -87,37 +90,37 @@ func parseSQL(driver string, searchTag *resolveSearchTag, condition Condition, q
))
ResolveSearchQuery(driver, qValue.Field(i).Interface(), join)
case "exact", "iexact":
condition.SetWhere(fmt.Sprintf("%s.%s = ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` = ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "contains":
condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String() + "%"})
condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String() + "%"})
case "icontains":
condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String() + "%"})
condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String() + "%"})
case "gt":
condition.SetWhere(fmt.Sprintf("%s.%s > ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` > ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "gte":
condition.SetWhere(fmt.Sprintf("%s.%s >= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` >= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "lt":
condition.SetWhere(fmt.Sprintf("%s.%s < ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` < ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "lte":
condition.SetWhere(fmt.Sprintf("%s.%s <= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` <= ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "startswith":
condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).String() + "%"})
condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).String() + "%"})
case "istartswith":
condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{qValue.Field(i).String() + "%"})
condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{qValue.Field(i).String() + "%"})
case "endswith":
condition.SetWhere(fmt.Sprintf("%s.%s like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String()})
condition.SetWhere(fmt.Sprintf("%s`%s` like ?", searchTag.Table, searchTag.Column), []interface{}{"%" + qValue.Field(i).String()})
case "iendswith":
condition.SetWhere(fmt.Sprintf("%s.%s %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String()})
condition.SetWhere(fmt.Sprintf("%s`%s` %slike ?", searchTag.Table, searchTag.Column, iStr), []interface{}{"%" + qValue.Field(i).String()})
case "in":
condition.SetWhere(fmt.Sprintf("`%s`.`%s` in (?)", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
condition.SetWhere(fmt.Sprintf("%s`%s` in (?)", searchTag.Table, searchTag.Column), []interface{}{qValue.Field(i).Interface()})
case "isnull":
if !(qValue.Field(i).IsZero() && qValue.Field(i).IsNil()) {
condition.SetWhere(fmt.Sprintf("%s.%s` isnull", searchTag.Table, searchTag.Column), make([]interface{}, 0))
condition.SetWhere(fmt.Sprintf("%s`%s` isnull", searchTag.Table, searchTag.Column), make([]interface{}, 0))
}
case "order":
switch strings.ToLower(qValue.Field(i).String()) {
case "desc", "asc":
condition.SetOrder(fmt.Sprintf("%s.%s %s", searchTag.Table, searchTag.Column, qValue.Field(i).String()))
condition.SetOrder(fmt.Sprintf("%s`%s` %s", searchTag.Table, searchTag.Column, qValue.Field(i).String()))
}
}
}
Loading