Skip to content

Commit

Permalink
增加定时器,修复TR做种问题
Browse files Browse the repository at this point in the history
  • Loading branch information
wetor committed Feb 21, 2024
1 parent 18af3b9 commit 59f4d6b
Show file tree
Hide file tree
Showing 7 changed files with 404 additions and 1 deletion.
2 changes: 1 addition & 1 deletion configs/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ type Advanced struct {
} `yaml:"default" json:"default" attr:"解析季度默认值" comment:"使用tmdb解析季度失败时,同类型默认值按优先级执行。数值越大,优先级越高"`

Client struct {
SeedingTimeMinute int `yaml:"seeding_time_minute" json:"seeding_time_minute" attr:"做种时间" comment_key:"seeding_key"`
SeedingTimeMinute int `yaml:"seeding_time_minute" json:"seeding_time_minute" attr:"做种时间" comment:"0不做种,-1无限做种,其他值为做种具体分钟限制"`
ConnectTimeoutSecond int `yaml:"connect_timeout_second" json:"connect_timeout_second" attr:"连接超时时间"`
RetryConnectNum int `yaml:"retry_connect_num" json:"retry_connect_num" attr:"连接失败重试次数"`
CheckTimeSecond int `yaml:"check_time_second" json:"check_time_second" attr:"检查连接状态间隔时间"`
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/caarlos0/env/v9 v9.0.0
github.com/gin-gonic/gin v1.9.1
github.com/go-python/gpython v0.2.0
github.com/google/uuid v1.5.0
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.5.0
github.com/hekmon/transmissionrpc/v3 v3.0.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,10 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k=
github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8=
github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
Expand Down
39 changes: 39 additions & 0 deletions internal/exceptions/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package exceptions

import "fmt"

type ErrTimer struct {
Message string
}

func (e ErrTimer) Error() string {
return e.Message
}

type ErrTimerExistTask struct {
Name string
Message string
}

func (e ErrTimerExistTask) Error() string {
if len(e.Message) == 0 {
return fmt.Sprintf("任务 %s 已存在", e.Name)
}
return fmt.Sprintf("任务 %s 已存在,%s", e.Name, e.Message)
}

func (e ErrTimerExistTask) Exist() bool {
return true
}

type ErrTimerRun struct {
Name string
Message string
}

func (e ErrTimerRun) Error() string {
if len(e.Message) == 0 {
return fmt.Sprintf("任务 %s 执行失败", e.Name)
}
return fmt.Sprintf("任务 %s 执行失败,%s", e.Name, e.Message)
}
47 changes: 47 additions & 0 deletions internal/pkg/timer/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package timer

import (
"sync"

"github.com/google/uuid"
"github.com/wetor/AnimeGo/internal/api"
)

const (
DefaultUpdateSecond = 1
DefaultRetryCount = 1
)

type TaskFunc func() error

type Options struct {
Name string

Cache api.Cacher
RetryCount int
UpdateSecond int

WG *sync.WaitGroup
}

func (o *Options) Default() {
if o.RetryCount == 0 {
o.RetryCount = DefaultRetryCount
}
if o.UpdateSecond == 0 {
o.UpdateSecond = DefaultUpdateSecond
}
}

type AddOptions struct {
Name string
Duration int64
Func TaskFunc
Loop bool
}

func (o *AddOptions) Default() {
if len(o.Name) == 0 {
o.Name = uuid.NewString()
}
}
191 changes: 191 additions & 0 deletions internal/pkg/timer/timer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
package timer

import (
"context"
"sync"
"time"

"github.com/wetor/AnimeGo/internal/exceptions"
"github.com/wetor/AnimeGo/pkg/log"
)

const (
Name = "Timer"
)

const (
StatusStop = "stop"
StatusInit = "init"
StatusWait = "wait"
StatusExpired = "expired"
)

type Task struct {
Name string `json:"name"` // 任务名
Duration int64 `json:"duration"` // 执行定时,秒
Start int64 `json:"start"` // 开始时间
RunDuration int64 `json:"run_duration"` // 此次定时已执行时间,秒。序列化使用
Status string `json:"status"` // 状态
RetryCount int `json:"retry_count"` // 剩余重试次数
Loop bool `json:"loop"` // 是否循环执行
}

type Timer struct {
tasks map[string]*Task
funcs map[string]TaskFunc

sync.Mutex

*Options
}

func NewTimer(opts *Options) *Timer {
opts.Default()
t := &Timer{
tasks: make(map[string]*Task),
funcs: make(map[string]TaskFunc),
Options: opts,
}
t.Cache.Add(Name)
return t
}

func (t *Timer) HasTask(name string) bool {
_, ok := t.tasks[name]
return ok
}

func (t *Timer) AddTask(opts *AddOptions) (*Task, error) {
t.Lock()
defer t.Unlock()
opts.Default()
if t.HasTask(opts.Name) {
return nil, exceptions.ErrTimerExistTask{Name: opts.Name}
}
task := &Task{
Name: opts.Name,
Duration: opts.Duration,
Status: StatusInit,
Loop: opts.Loop,
}
t.tasks[task.Name] = task
t.funcs[task.Name] = opts.Func

return task, nil
}

func (t *Timer) Start(ctx context.Context) {
t.WG.Add(1)
err := t.Unmarshal()
if err != nil {
log.Warnf("[Timer] 载入缓存记录失败,可能不存在")
}
go func() {
defer t.WG.Done()
for {
select {
case <-ctx.Done():
t.Marshal()
log.Debugf("[Timer] 正常退出 %s", Name)
return
default:
t.update()
time.Sleep(time.Duration(t.UpdateSecond) * time.Second)
}
}
}()
}

func (t *Timer) update() {
t.Lock()
defer t.Unlock()
var err error
deleteTasks := make([]string, 0)
now := time.Now().Unix()
for _, task := range t.tasks {
if task.Status == StatusStop {
continue
}

if task.Status == StatusWait && now >= task.Start+task.Duration {
// 执行任务
log.Debugf("[Timer] 任务 %s 开始执行", task.Name)
if f, ok := t.funcs[task.Name]; ok {
err = f()
} else {
err = nil
log.Warnf("[Timer] 任务 %s 执行失败,未注册执行函数,忽略", task.Name)
}
finish := false
if err != nil {
task.RetryCount--
log.Debugf("[Timer] 任务 %s 执行失败,第 %d 次重试", task.Name, t.RetryCount-task.RetryCount)
log.DebugErr(err)
} else {
finish = true
log.Infof("[Timer] 任务 %s 执行成功", task.Name)
}

if task.Status != StatusExpired && task.RetryCount <= 0 {
finish = true
log.Warnf("[Timer] 任务 %s 执行失败,重试 %d 次", task.Name, t.RetryCount-task.RetryCount)
}

if finish {
if task.Loop {
task.Status = StatusInit
} else {
task.Status = StatusExpired
}
}
}

if task.Status == StatusExpired {
deleteTasks = append(deleteTasks, task.Name)
}

if task.Status == StatusInit {
task.Start = now
task.Status = StatusWait
task.RetryCount = t.RetryCount
log.Debugf("[Timer] 任务 %s 已添加,下次执行: %d 秒后", task.Name, task.Duration)
}
}

for _, id := range deleteTasks {
delete(t.tasks, id)
delete(t.funcs, id)
}
}

func (t *Timer) Marshal() {
t.Lock()
defer t.Unlock()
now := time.Now().Unix()
for _, task := range t.tasks {
task.RunDuration = (now - task.Start) % task.Duration
}
t.Cache.Put(Name, t.Name, t.tasks, 0)
}

func (t *Timer) Unmarshal() error {
t.Lock()
defer t.Unlock()
t.tasks = make(map[string]*Task)
err := t.Cache.Get(Name, t.Name, &t.tasks)
if err != nil {
return err
}
now := time.Now().Unix()
for _, task := range t.tasks {
task.Start = now - task.RunDuration
task.RunDuration = 0
}
return nil
}

func (t *Timer) RegisterTaskFuncs(funcs map[string]TaskFunc) {
for name, f := range funcs {
t.funcs[name] = f
}
}
Loading

0 comments on commit 59f4d6b

Please sign in to comment.