diff --git a/cmd/animego/main.go b/cmd/animego/main.go index a52e41fd..0624614a 100644 --- a/cmd/animego/main.go +++ b/cmd/animego/main.go @@ -203,7 +203,7 @@ func Main() { CheckTimeSecond: config.Advanced.Client.CheckTimeSecond, RetryConnectNum: config.Advanced.Client.RetryConnectNum, } - clientSrv := wire.GetClient(config.Setting.Client.Client, clientOpts) + clientSrv := wire.GetClient(config.Setting.Client.Client, clientOpts, bolt) clientSrv.Start() // =============================================================================================================== diff --git a/configs/models.go b/configs/models.go index b128d761..6b99e004 100644 --- a/configs/models.go +++ b/configs/models.go @@ -86,7 +86,7 @@ type Advanced struct { } `yaml:"default" json:"default" attr:"解析季度默认值" comment:"使用tmdb解析季度失败时,同类型默认值按优先级执行。数值越大,优先级越高"` Client struct { - SeedingTimeMinute int `yaml:"seeding_time_minute" json:"seeding_time_minute" attr:"做种时间" comment_key:"seeding_key"` + SeedingTimeMinute int `yaml:"seeding_time_minute" json:"seeding_time_minute" attr:"做种时间" comment:"0不做种,-1无限做种,其他值为做种具体分钟限制"` ConnectTimeoutSecond int `yaml:"connect_timeout_second" json:"connect_timeout_second" attr:"连接超时时间"` RetryConnectNum int `yaml:"retry_connect_num" json:"retry_connect_num" attr:"连接失败重试次数"` CheckTimeSecond int `yaml:"check_time_second" json:"check_time_second" attr:"检查连接状态间隔时间"` diff --git a/go.mod b/go.mod index 2c207355..c69c1e7c 100644 --- a/go.mod +++ b/go.mod @@ -9,6 +9,7 @@ require ( github.com/caarlos0/env/v9 v9.0.0 github.com/gin-gonic/gin v1.9.1 github.com/go-python/gpython v0.2.0 + github.com/google/uuid v1.5.0 github.com/google/wire v0.5.0 github.com/gorilla/websocket v1.5.0 github.com/hekmon/transmissionrpc/v3 v3.0.0 diff --git a/go.sum b/go.sum index cdea0886..f8d40433 100644 --- a/go.sum +++ b/go.sum @@ -159,6 +159,10 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/subcommands v1.0.1 h1:/eqq+otEXm5vhfBrbREPCSVQbvofip6kIz+mX5TUH7k= github.com/google/subcommands v1.0.1/go.mod h1:ZjhPrFU+Olkh9WazFPsl27BQ4UPiG37m3yTrtFlrHVk= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU= +github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/wire v0.5.0 h1:I7ELFeVBr3yfPIcc8+MWvrjk+3VjbcSzoXm3JVa+jD8= github.com/google/wire v0.5.0/go.mod h1:ngWDr9Qvq3yZA10YrxfyGELY/AFWGVpy9c1LTRi1EoU= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= diff --git a/internal/client/client.go b/internal/client/client.go index 6200bd37..74d29e3b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -15,13 +15,13 @@ var Set = wire.NewSet( NewClient, ) -func NewClient(name string, opts *models.ClientOptions) api.Client { +func NewClient(name string, opts *models.ClientOptions, cache api.Cacher) api.Client { var c api.Client switch strings.ToLower(name) { case "qbittorrent": c = qbittorrent.NewQBittorrent(opts) case "transmission": - c = transmission.NewTransmission(opts) + c = transmission.NewTransmission(opts, cache) } return c } diff --git a/internal/client/transmission/const.go b/internal/client/transmission/const.go index a814ab5b..29a9fd9c 100644 --- a/internal/client/transmission/const.go +++ b/internal/client/transmission/const.go @@ -26,3 +26,8 @@ const ( IdleModeSingle // override the global settings, seeding until a certain idle time IdleModeUnlimited // override the global settings, seeding regardless of activity ) + +const ( + TimerRetryCount = 3 + TimerUpdateSecond = 5 +) diff --git a/internal/client/transmission/transmission.go b/internal/client/transmission/transmission.go index 2b1791e8..f7bbd11a 100644 --- a/internal/client/transmission/transmission.go +++ b/internal/client/transmission/transmission.go @@ -15,6 +15,7 @@ import ( "github.com/wetor/AnimeGo/internal/constant" "github.com/wetor/AnimeGo/internal/exceptions" "github.com/wetor/AnimeGo/internal/models" + "github.com/wetor/AnimeGo/internal/pkg/timer" "github.com/wetor/AnimeGo/pkg/log" "github.com/wetor/AnimeGo/pkg/utils" ) @@ -37,16 +38,25 @@ type Transmission struct { client *transmissionrpc.Client endpoint *url.URL + seedTimer *timer.Timer + *models.ClientOptions } -func NewTransmission(opts *models.ClientOptions) *Transmission { +func NewTransmission(opts *models.ClientOptions, cache api.Cacher) *Transmission { c := &Transmission{ retryChan: make(chan int, 1), retryNum: 1, connected: false, ClientOptions: opts, } + c.seedTimer = timer.NewTimer(&timer.Options{ + Name: Name, + Cache: cache, + RetryCount: TimerRetryCount, + UpdateSecond: TimerUpdateSecond, + WG: opts.WG, + }) u, _ := url.Parse(c.Url) c.endpoint, _ = url.Parse(fmt.Sprintf("%s://%s:%s@%s/transmission/rpc", u.Scheme, c.Username, c.Password, u.Host)) @@ -182,6 +192,7 @@ func (c *Transmission) Start() { } } }() + c.seedTimer.Start(c.Ctx) } func (c *Transmission) List(opt *models.ListOptions) ([]*models.TorrentItem, error) { @@ -196,6 +207,34 @@ func (c *Transmission) List(opt *models.ListOptions) ([]*models.TorrentItem, err items := make([]*models.TorrentItem, 0, len(torrents)) for _, torrent := range torrents { + // 下载完成在做种状态 + if int(*torrent.PercentDone) == 1 && torrent.Status.String() == TorrentStatusSeed && + !c.seedTimer.HasTask(*torrent.HashString) { + // 定时任务:达到做种时间,停止做种 + _, err = c.seedTimer.AddTask(&timer.AddOptions{ + Name: *torrent.HashString, + Duration: int64(c.SeedingTimeMinute * 60), + Func: func() error { + err := c.client.TorrentStopHashes(c.Ctx, []string{*torrent.HashString}) + if err != nil { + return err + } + tStatus, err := c.client.TorrentGet(c.Ctx, []string{"status"}, []int64{*torrent.ID}) + if err != nil { + return err + } + if len(tStatus) == 1 && tStatus[0].Status.String() != TorrentStatusStopped { + // 未暂停成功 + return exceptions.ErrClient{Client: Name, Message: fmt.Sprintf("%s 暂停失败", *torrent.HashString)} + } + return nil + }, + }) + if err != nil { + log.Warnf("添加 %s 做种任务 %s 失败,忽略", Name, *torrent.HashString) + } + } + if len(opt.Category) > 0 && *torrent.Group != opt.Category { continue } diff --git a/internal/exceptions/timer.go b/internal/exceptions/timer.go new file mode 100644 index 00000000..7b22d3f5 --- /dev/null +++ b/internal/exceptions/timer.go @@ -0,0 +1,39 @@ +package exceptions + +import "fmt" + +type ErrTimer struct { + Message string +} + +func (e ErrTimer) Error() string { + return e.Message +} + +type ErrTimerExistTask struct { + Name string + Message string +} + +func (e ErrTimerExistTask) Error() string { + if len(e.Message) == 0 { + return fmt.Sprintf("任务 %s 已存在", e.Name) + } + return fmt.Sprintf("任务 %s 已存在,%s", e.Name, e.Message) +} + +func (e ErrTimerExistTask) Exist() bool { + return true +} + +type ErrTimerRun struct { + Name string + Message string +} + +func (e ErrTimerRun) Error() string { + if len(e.Message) == 0 { + return fmt.Sprintf("任务 %s 执行失败", e.Name) + } + return fmt.Sprintf("任务 %s 执行失败,%s", e.Name, e.Message) +} diff --git a/internal/pkg/timer/models.go b/internal/pkg/timer/models.go new file mode 100644 index 00000000..57435094 --- /dev/null +++ b/internal/pkg/timer/models.go @@ -0,0 +1,47 @@ +package timer + +import ( + "sync" + + "github.com/google/uuid" + "github.com/wetor/AnimeGo/internal/api" +) + +const ( + DefaultUpdateSecond = 1 + DefaultRetryCount = 1 +) + +type TaskFunc func() error + +type Options struct { + Name string + + Cache api.Cacher + RetryCount int + UpdateSecond int + + WG *sync.WaitGroup +} + +func (o *Options) Default() { + if o.RetryCount == 0 { + o.RetryCount = DefaultRetryCount + } + if o.UpdateSecond == 0 { + o.UpdateSecond = DefaultUpdateSecond + } +} + +type AddOptions struct { + Name string + Duration int64 + Func TaskFunc + Loop bool +} + +func (o *AddOptions) Default() { + if len(o.Name) == 0 { + o.Name = uuid.NewString() + } +} diff --git a/internal/pkg/timer/timer.go b/internal/pkg/timer/timer.go new file mode 100644 index 00000000..8a40e7c3 --- /dev/null +++ b/internal/pkg/timer/timer.go @@ -0,0 +1,191 @@ +package timer + +import ( + "context" + "sync" + "time" + + "github.com/wetor/AnimeGo/internal/exceptions" + "github.com/wetor/AnimeGo/pkg/log" +) + +const ( + Name = "Timer" +) + +const ( + StatusStop = "stop" + StatusInit = "init" + StatusWait = "wait" + StatusExpired = "expired" +) + +type Task struct { + Name string `json:"name"` // 任务名 + Duration int64 `json:"duration"` // 执行定时,秒 + Start int64 `json:"start"` // 开始时间 + RunDuration int64 `json:"run_duration"` // 此次定时已执行时间,秒。序列化使用 + Status string `json:"status"` // 状态 + RetryCount int `json:"retry_count"` // 剩余重试次数 + Loop bool `json:"loop"` // 是否循环执行 +} + +type Timer struct { + tasks map[string]*Task + funcs map[string]TaskFunc + + sync.Mutex + + *Options +} + +func NewTimer(opts *Options) *Timer { + opts.Default() + t := &Timer{ + tasks: make(map[string]*Task), + funcs: make(map[string]TaskFunc), + Options: opts, + } + t.Cache.Add(Name) + err := t.Unmarshal() + if err != nil { + log.Warnf("[Timer] 载入缓存记录失败,可能不存在") + } + return t +} + +func (t *Timer) HasTask(name string) bool { + _, ok := t.tasks[name] + return ok +} + +func (t *Timer) AddTask(opts *AddOptions) (*Task, error) { + t.Lock() + defer t.Unlock() + opts.Default() + if t.HasTask(opts.Name) { + return nil, exceptions.ErrTimerExistTask{Name: opts.Name} + } + task := &Task{ + Name: opts.Name, + Duration: opts.Duration, + Status: StatusInit, + Loop: opts.Loop, + } + t.tasks[task.Name] = task + t.funcs[task.Name] = opts.Func + + return task, nil +} + +func (t *Timer) Start(ctx context.Context) { + t.WG.Add(1) + go func() { + defer t.WG.Done() + for { + select { + case <-ctx.Done(): + t.Marshal() + log.Debugf("[Timer] 正常退出 %s", Name) + return + default: + t.update() + time.Sleep(time.Duration(t.UpdateSecond) * time.Second) + } + } + }() +} + +func (t *Timer) update() { + t.Lock() + defer t.Unlock() + var err error + deleteTasks := make([]string, 0) + now := time.Now().Unix() + for _, task := range t.tasks { + if task.Status == StatusStop { + continue + } + + if task.Status == StatusWait && now >= task.Start+task.Duration { + // 执行任务 + log.Debugf("[Timer] 任务 %s 开始执行", task.Name) + if f, ok := t.funcs[task.Name]; ok { + err = f() + } else { + err = nil + log.Warnf("[Timer] 任务 %s 执行失败,未注册执行函数,忽略", task.Name) + } + finish := false + if err != nil { + task.RetryCount-- + log.Debugf("[Timer] 任务 %s 执行失败,第 %d 次重试", task.Name, t.RetryCount-task.RetryCount) + log.DebugErr(err) + } else { + finish = true + log.Infof("[Timer] 任务 %s 执行成功", task.Name) + } + + if task.Status != StatusExpired && task.RetryCount <= 0 { + finish = true + log.Warnf("[Timer] 任务 %s 执行失败,重试 %d 次", task.Name, t.RetryCount-task.RetryCount) + } + + if finish { + if task.Loop { + task.Status = StatusInit + } else { + task.Status = StatusExpired + } + } + } + + if task.Status == StatusExpired { + deleteTasks = append(deleteTasks, task.Name) + } + + if task.Status == StatusInit { + task.Start = now + task.Status = StatusWait + task.RetryCount = t.RetryCount + log.Debugf("[Timer] 任务 %s 已添加,下次执行: %d 秒后", task.Name, task.Duration) + } + } + + for _, id := range deleteTasks { + delete(t.tasks, id) + delete(t.funcs, id) + } +} + +func (t *Timer) Marshal() { + t.Lock() + defer t.Unlock() + now := time.Now().Unix() + for _, task := range t.tasks { + task.RunDuration = (now - task.Start) % task.Duration + } + t.Cache.Put(Name, t.Name, t.tasks, 0) +} + +func (t *Timer) Unmarshal() error { + t.Lock() + defer t.Unlock() + t.tasks = make(map[string]*Task) + err := t.Cache.Get(Name, t.Name, &t.tasks) + if err != nil { + return err + } + now := time.Now().Unix() + for _, task := range t.tasks { + task.Start = now - task.RunDuration + task.RunDuration = 0 + } + return nil +} + +func (t *Timer) RegisterTaskFuncs(funcs map[string]TaskFunc) { + for name, f := range funcs { + t.funcs[name] = f + } +} diff --git a/internal/pkg/timer/timer_test.go b/internal/pkg/timer/timer_test.go new file mode 100644 index 00000000..7f78f4bd --- /dev/null +++ b/internal/pkg/timer/timer_test.go @@ -0,0 +1,121 @@ +package timer_test + +import ( + "context" + "fmt" + "os" + "sync" + "testing" + "time" + + "github.com/wetor/AnimeGo/internal/api" + "github.com/wetor/AnimeGo/pkg/cache" + "github.com/wetor/AnimeGo/pkg/log" + + "github.com/wetor/AnimeGo/internal/pkg/timer" +) + +var ( + bolt api.Cacher + tm *timer.Timer +) + +func TestMain(m *testing.M) { + fmt.Println("begin") + log.Init(&log.Options{ + File: "data/test.log", + Debug: true, + }) + bolt = cache.NewBolt() + bolt.Open("data/bolt.db") + m.Run() + bolt.Close() + _ = log.Close() + _ = os.RemoveAll("data") + fmt.Println("end") +} + +func initTest(clean bool) (*sync.WaitGroup, func()) { + if clean { + _ = os.RemoveAll("data") + } + wg := sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + tm = timer.NewTimer(&timer.Options{ + Name: "Test", + Cache: bolt, + UpdateSecond: 1, + RetryCount: 2, + WG: &wg, + }) + tm.Start(ctx) + return &wg, cancel +} + +func TestNewTimer(t *testing.T) { + wg, cancel := initTest(false) + + go func() { + time.Sleep(8 * time.Second) + cancel() + }() + + _, _ = tm.AddTask(&timer.AddOptions{ + Name: "Failed", + Duration: 2, + Loop: true, + Func: func() error { + fmt.Println("[task] run fail 2") + return fmt.Errorf("error2") + }, + }) + _, _ = tm.AddTask(&timer.AddOptions{ + Name: "Success", + Duration: 2, + Loop: true, + Func: func() error { + fmt.Println("[task] run 2") + return nil + }, + }) + _, _ = tm.AddTask(&timer.AddOptions{ + Name: "Once", + Duration: 5, + Func: func() error { + fmt.Println("[task] run 5") + return nil + }, + }) + wg.Wait() + +} + +func TestTimerMarshal(t *testing.T) { + wg, cancel := initTest(false) + _ = wg + _, _ = tm.AddTask(&timer.AddOptions{ + Name: "Success", + Duration: 5, + Loop: true, + Func: func() error { + fmt.Println("[task] run 2") + return nil + }, + }) + log.Info("Sleep 7") + time.Sleep(7 * time.Second) + log.Info("Stop") + cancel() + log.Info("Sleep 2") + time.Sleep(2 * time.Second) + wg, cancel = initTest(false) + log.Info("Start") + tm.RegisterTaskFuncs(map[string]timer.TaskFunc{ + "Success": func() error { + fmt.Println("[task] run 2") + return nil + }, + }) + time.Sleep(9 * time.Second) + cancel() +} diff --git a/internal/wire/client.go b/internal/wire/client.go index c35ea2f0..b03a99ca 100644 --- a/internal/wire/client.go +++ b/internal/wire/client.go @@ -12,7 +12,7 @@ import ( "github.com/wetor/AnimeGo/internal/models" ) -func GetClient(name string, opts *models.ClientOptions) api.Client { +func GetClient(name string, opts *models.ClientOptions, cache api.Cacher) api.Client { wire.Build( client.Set, ) @@ -26,7 +26,7 @@ func GetQBittorrent(opts *models.ClientOptions) *qbittorrent.QBittorrent { return nil } -func GetTransmission(opts *models.ClientOptions) *transmission.Transmission { +func GetTransmission(opts *models.ClientOptions, cache api.Cacher) *transmission.Transmission { wire.Build( transmission.Set, ) diff --git a/internal/wire/wire_gen.go b/internal/wire/wire_gen.go index d853598d..af0d13c3 100644 --- a/internal/wire/wire_gen.go +++ b/internal/wire/wire_gen.go @@ -59,8 +59,8 @@ func GetBangumi(bgmOpts *bangumi.Options, tmdbOpts *themoviedb.Options) *anisour // Injectors from client.go: -func GetClient(name string, opts *models.ClientOptions) api.Client { - apiClient := client.NewClient(name, opts) +func GetClient(name string, opts *models.ClientOptions, cache api.Cacher) api.Client { + apiClient := client.NewClient(name, opts, cache) return apiClient } @@ -69,8 +69,8 @@ func GetQBittorrent(opts *models.ClientOptions) *qbittorrent.QBittorrent { return qBittorrent } -func GetTransmission(opts *models.ClientOptions) *transmission.Transmission { - transmissionTransmission := transmission.NewTransmission(opts) +func GetTransmission(opts *models.ClientOptions, cache api.Cacher) *transmission.Transmission { + transmissionTransmission := transmission.NewTransmission(opts, cache) return transmissionTransmission }