From ca08b8037063b158d3922434ed2a31e44492beab Mon Sep 17 00:00:00 2001 From: "Chen, Tingjie" Date: Thu, 25 Sep 2025 07:55:27 +0000 Subject: [PATCH] Add device_bucket_container and some enhancements Signed-off-by: Chen, Tingjie --- actuator/function_actuator.go | 2 +- .../device_bucket_container.go | 210 ++++++++++++++++++ example/add_example/main.go | 4 +- example/docker_example/main.go | 4 +- example/videocut_example/main_demo/main.go | 4 +- .../video_cut/example_sql_container.go | 4 +- task.go | 4 +- task_scheduler.go | 183 +++++++-------- 8 files changed, 318 insertions(+), 97 deletions(-) create mode 100644 container/memory_container/device_bucket_container.go diff --git a/actuator/function_actuator.go b/actuator/function_actuator.go index 9df8943..5986451 100644 --- a/actuator/function_actuator.go +++ b/actuator/function_actuator.go @@ -119,7 +119,7 @@ func (fc *fucntionActuator) Start(ctx context.Context, ftask *framework.Task) ( // 如果需要回调 callbackTask := *ftask callbackTask.TaskStatus = newStatus.TaskStatus - callbackTask.TaskEnbTime = time.Now() + callbackTask.TaskEndTime = time.Now() if newStatus.FailedReason != nil { callbackTask.FailedReason = newStatus.FailedReason } diff --git a/container/memory_container/device_bucket_container.go b/container/memory_container/device_bucket_container.go new file mode 100644 index 0000000..9462226 --- /dev/null +++ b/container/memory_container/device_bucket_container.go @@ -0,0 +1,210 @@ +package memeorycontainer + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + "time" + + lighttaskscheduler "github.com/memory-overflow/light-task-scheduler" +) + +// deviceBucketContainer: 按设备分桶的内存任务容器,每个设备一个等待队列(channel),支持同时多设备调度。 +// 优点:不同设备等待队列互不阻塞,避免单一设备任务把全局 FIFO 阻塞; +// 缺点:仍然是进程内结构,不支持多进程共享;没有优先级。 +type deviceBucketContainer struct { + MemeoryContainer + + // deviceId -> waiting queue + deviceQueues map[string]chan lighttaskscheduler.Task + defaultQueue chan lighttaskscheduler.Task // 未指定或未配置的设备任务进入默认队列 + + timeout time.Duration + + // 运行中任务 + runningTaskMap sync.Map // taskId -> Task + runningTaskCount int32 + + // 等待中被请求停止/删除的任务标记(只在取出时判定丢弃) + stoppedOrDeleted sync.Map // taskId -> struct{} +} + +// MakeDeviceBucketContainer 构造设备分桶容器 +// deviceQueueSize: 每个设备队列大小;defaultQueueSize: 默认队列大小;timeout: 入队/出队等待超时时间 +func MakeDeviceBucketContainer(deviceQueueSize map[string]int, defaultQueueSize int, timeout time.Duration) *deviceBucketContainer { + dq := make(map[string]chan lighttaskscheduler.Task) + for dev, sz := range deviceQueueSize { + if sz <= 0 { + sz = 1024 + } + dq[dev] = make(chan lighttaskscheduler.Task, sz) + } + if defaultQueueSize <= 0 { + defaultQueueSize = 1024 + } + return &deviceBucketContainer{ + deviceQueues: dq, + defaultQueue: make(chan lighttaskscheduler.Task, defaultQueueSize), + timeout: timeout, + } +} + +// AddTask 把任务加入对应设备队列 +func (d *deviceBucketContainer) AddTask(ctx context.Context, task lighttaskscheduler.Task) error { + // 如果之前被标记停止/删除,再次 Add 视为复活,从标记表移除 + d.stoppedOrDeleted.LoadAndDelete(task.TaskId) + task.TaskStatus = lighttaskscheduler.TASK_STATUS_WAITING + ch, ok := d.deviceQueues[task.DeviceId] + if !ok || task.DeviceId == "" { // 使用默认队列 + ch = d.defaultQueue + } + select { + case ch <- task: + return nil + case <-time.After(d.timeout): + return fmt.Errorf("add task timeout") + case <-ctx.Done(): + return ctx.Err() + } +} + +// AddRunningTask 恢复运行中任务(用于可持久化恢复场景) +func (d *deviceBucketContainer) AddRunningTask(ctx context.Context, task lighttaskscheduler.Task) error { + if _, ok := d.runningTaskMap.LoadOrStore(task.TaskId, task); !ok { + atomic.AddInt32(&d.runningTaskCount, 1) + } + return nil +} + +// GetRunningTask 返回所有运行中任务快照 +func (d *deviceBucketContainer) GetRunningTask(ctx context.Context) ([]lighttaskscheduler.Task, error) { + var tasks []lighttaskscheduler.Task + d.runningTaskMap.Range(func(_, v interface{}) bool { + tasks = append(tasks, v.(lighttaskscheduler.Task)) + return true + }) + return tasks, nil +} + +// GetRunningTaskCount 返回运行中任务总数 +func (d *deviceBucketContainer) GetRunningTaskCount(ctx context.Context) (int32, error) { + return atomic.LoadInt32(&d.runningTaskCount), nil +} + +// GetWaitingTask 轮询各设备队列(含默认队列)做近似轮询聚合,直到获取 limit 或超时。 +func (d *deviceBucketContainer) GetWaitingTask(ctx context.Context, limit int32) ([]lighttaskscheduler.Task, error) { + if limit <= 0 { + return nil, nil + } + var result []lighttaskscheduler.Task + // 构造顺序列表:deviceQueues + defaultQueue 最后 + order := make([]chan lighttaskscheduler.Task, 0, len(d.deviceQueues)+1) + for _, ch := range d.deviceQueues { + order = append(order, ch) + } + order = append(order, d.defaultQueue) + + // 轮询次数上限,防止全部空队列忙等:设备数 * limit * 2 + maxScan := len(order)*int(limit)*2 + 1 + attempts := 0 +Outer: + for int32(len(result)) < limit && attempts < maxScan { + attempts++ + for _, ch := range order { + if int32(len(result)) >= limit { + break Outer + } + select { + case task := <-ch: + if _, skipped := d.stoppedOrDeleted.LoadAndDelete(task.TaskId); skipped { + continue // 丢弃 + } + result = append(result, task) + default: + // 没有数据,继续下一个队列 + } + } + // 如果一轮什么都没取到,做一个短暂让步或等待 + if int32(len(result)) < limit { + select { + case <-time.After(d.timeout / 10): + case <-ctx.Done(): + return result, ctx.Err() + } + } + } + return result, nil +} + +// ToRunningStatus 转成运行中 +func (d *deviceBucketContainer) ToRunningStatus(ctx context.Context, task *lighttaskscheduler.Task) (*lighttaskscheduler.Task, error) { + task.TaskStartTime = time.Now() + task.TaskStatus = lighttaskscheduler.TASK_STATUS_RUNNING + if _, ok := d.runningTaskMap.LoadOrStore(task.TaskId, *task); !ok { + atomic.AddInt32(&d.runningTaskCount, 1) + } else { // 更新尝试次数 + v, _ := d.runningTaskMap.Load(task.TaskId) + t := v.(lighttaskscheduler.Task) + t.TaskAttemptsTime = task.TaskAttemptsTime + d.runningTaskMap.Store(task.TaskId, t) + } + return task, nil +} + +// ToStopStatus 停止(运行中释放;等待中标记) +func (d *deviceBucketContainer) ToStopStatus(ctx context.Context, task *lighttaskscheduler.Task) (*lighttaskscheduler.Task, error) { + if _, ok := d.runningTaskMap.LoadAndDelete(task.TaskId); ok { + atomic.AddInt32(&d.runningTaskCount, -1) + } else { + d.stoppedOrDeleted.Store(task.TaskId, struct{}{}) + } + task.TaskStatus = lighttaskscheduler.TASK_STATUS_STOPED + return task, nil +} + +// ToDeleteStatus 删除(语义与 Stop 类似但状态不同) +func (d *deviceBucketContainer) ToDeleteStatus(ctx context.Context, task *lighttaskscheduler.Task) (*lighttaskscheduler.Task, error) { + if _, ok := d.runningTaskMap.LoadAndDelete(task.TaskId); ok { + atomic.AddInt32(&d.runningTaskCount, -1) + } else { + d.stoppedOrDeleted.Store(task.TaskId, struct{}{}) + } + task.TaskStatus = lighttaskscheduler.TASK_STATUS_DELETE + return task, nil +} + +// ToFailedStatus 失败 +func (d *deviceBucketContainer) ToFailedStatus(ctx context.Context, task *lighttaskscheduler.Task, reason error) (*lighttaskscheduler.Task, error) { + if _, ok := d.runningTaskMap.LoadAndDelete(task.TaskId); ok { + atomic.AddInt32(&d.runningTaskCount, -1) + } + task.TaskStatus = lighttaskscheduler.TASK_STATUS_FAILED + task.FailedReason = reason + task.TaskEndTime = time.Now() + return task, nil +} + +// ToExportStatus 导出中 +func (d *deviceBucketContainer) ToExportStatus(ctx context.Context, task *lighttaskscheduler.Task) (*lighttaskscheduler.Task, error) { + if _, ok := d.runningTaskMap.LoadAndDelete(task.TaskId); ok { + atomic.AddInt32(&d.runningTaskCount, -1) + } + task.TaskStatus = lighttaskscheduler.TASK_STATUS_EXPORTING + return task, nil +} + +// ToSuccessStatus 成功 +func (d *deviceBucketContainer) ToSuccessStatus(ctx context.Context, task *lighttaskscheduler.Task) (*lighttaskscheduler.Task, error) { + if _, ok := d.runningTaskMap.LoadAndDelete(task.TaskId); ok { + atomic.AddInt32(&d.runningTaskCount, -1) + } + task.TaskStatus = lighttaskscheduler.TASK_STATUS_SUCCESS + task.TaskEndTime = time.Now() + return task, nil +} + +// UpdateRunningTaskStatus 这里简单忽略(可扩展进度持久化) +func (d *deviceBucketContainer) UpdateRunningTaskStatus(ctx context.Context, task *lighttaskscheduler.Task, status lighttaskscheduler.AsyncTaskStatus) error { + return nil +} diff --git a/example/add_example/main.go b/example/add_example/main.go index f902416..3d2554c 100644 --- a/example/add_example/main.go +++ b/example/add_example/main.go @@ -157,10 +157,10 @@ func main() { for task := range sch.FinshedTasks() { if task.TaskStatus == lighttaskscheduler.TASK_STATUS_FAILED { log.Printf("failed task %s, reason: %s, timecost: %dms, attempt times: %d\n", - task.TaskId, task.FailedReason, task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) + task.TaskId, task.FailedReason, task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) } else if task.TaskStatus == lighttaskscheduler.TASK_STATUS_SUCCESS { log.Printf("success task %s, timecost: %dms, attempt times: %d\n", task.TaskId, - task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) + task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) } } diff --git a/example/docker_example/main.go b/example/docker_example/main.go index 36c6312..9c5fb2c 100644 --- a/example/docker_example/main.go +++ b/example/docker_example/main.go @@ -48,10 +48,10 @@ func main() { for task := range sch.FinshedTasks() { if task.TaskStatus == lighttaskscheduler.TASK_STATUS_FAILED { log.Printf("failed task %s, reason: %s, timecost: %dms, attempt times: %d\n", - task.TaskId, task.FailedReason, task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) + task.TaskId, task.FailedReason, task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) } else if task.TaskStatus == lighttaskscheduler.TASK_STATUS_SUCCESS { log.Printf("success task %s, timecost: %dms, attempt times: %d\n", task.TaskId, - task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) + task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds(), task.TaskAttemptsTime) } } diff --git a/example/videocut_example/main_demo/main.go b/example/videocut_example/main_demo/main.go index aad059a..16aeff5 100644 --- a/example/videocut_example/main_demo/main.go +++ b/example/videocut_example/main_demo/main.go @@ -78,10 +78,10 @@ func main() { for task := range sch.FinshedTasks() { if task.TaskStatus == lighttaskscheduler.TASK_STATUS_FAILED { log.Printf("failed task %s, reason: %s, timecost: %dms\n", - task.TaskId, task.FailedReason, task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds()) + task.TaskId, task.FailedReason, task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds()) } else if task.TaskStatus == lighttaskscheduler.TASK_STATUS_SUCCESS { log.Printf("success task %s, timecost: %dms\n", task.TaskId, - task.TaskEnbTime.Sub(task.TaskStartTime).Milliseconds()) + task.TaskEndTime.Sub(task.TaskStartTime).Milliseconds()) } } } diff --git a/example/videocut_example/video_cut/example_sql_container.go b/example/videocut_example/video_cut/example_sql_container.go index 366e7b0..251a021 100644 --- a/example/videocut_example/video_cut/example_sql_container.go +++ b/example/videocut_example/video_cut/example_sql_container.go @@ -262,7 +262,7 @@ func (e *videoCutSqlContainer) ToFailedStatus(ctx context.Context, ftask *framew } task.Status, ftask.TaskStatus = framework.TASK_STATUS_FAILED, framework.TASK_STATUS_FAILED task.FailedReason, ftask.FailedReason = reason.Error(), reason - task.EndAt, ftask.TaskEnbTime = &t, t + task.EndAt, ftask.TaskEndTime = &t, t ftask.TaskItem = task return ftask, nil } @@ -319,7 +319,7 @@ func (e *videoCutSqlContainer) ToSuccessStatus(ctx context.Context, ftask *frame return ftask, fmt.Errorf("task %s not found, may status has been changed", task.TaskId) } task.Status, ftask.TaskStatus = framework.TASK_STATUS_SUCCESS, framework.TASK_STATUS_SUCCESS - task.EndAt, ftask.TaskEnbTime = &t, t + task.EndAt, ftask.TaskEndTime = &t, t ftask.TaskItem = task return ftask, nil } diff --git a/task.go b/task.go index b40c81d..1f6d26b 100644 --- a/task.go +++ b/task.go @@ -25,9 +25,11 @@ type Task struct { TaskPriority int // 任务对象,创建任务的时候赋予 TaskItem interface{} + // DeviceId 可选:用于按设备分桶的容器(deviceBucketContainer)。不需要分桶可留空。 + DeviceId string TaskStartTime time.Time // 框架赋予值 - TaskEnbTime time.Time // 框架赋予值 + TaskEndTime time.Time // 任务结束时间 (renamed from TaskEnbTime) // 任务状态,任务容器负责赋予值 TaskStatus TaskStatus // 任务容器负责赋予值 diff --git a/task_scheduler.go b/task_scheduler.go index d6c308a..33aee4d 100644 --- a/task_scheduler.go +++ b/task_scheduler.go @@ -96,8 +96,6 @@ type TaskScheduler struct { } // MakeScheduler 新建任务调度器 -// 如果不需要对任务数据此久化,persistencer 可以设置为 nil -// 调度器构建以后,自动开始任务调度 func MakeScheduler( container TaskContainer, actuator TaskActuator, @@ -137,9 +135,7 @@ func (s *TaskScheduler) AddTask(ctx context.Context, task Task) error { } // FinshedTasks 返回的完成的任务的 channel -func (s *TaskScheduler) FinshedTasks() chan *Task { - return s.finshedTask -} +func (s *TaskScheduler) FinshedTasks() chan *Task { return s.finshedTask } // StopTask 停止一个任务 func (s *TaskScheduler) StopTask(ctx context.Context, ftask *Task) error { @@ -157,10 +153,12 @@ func (s *TaskScheduler) StopTask(ctx context.Context, ftask *Task) error { // Close 停止调度 func (s *TaskScheduler) Close() { + // 先取消上下文让后续 goroutine 快速退出 + s.cancel() + // 此处不等待所有 goroutine(仅部分任务使用 s.wg 计数),保持原有轻量语义 if s.config.EnableFinshedTaskList { close(s.finshedTask) } - s.cancel() } func (s *TaskScheduler) checkProcessed(t *Task) bool { @@ -258,23 +256,15 @@ func (s *TaskScheduler) start() { func (s *TaskScheduler) schedulerTask() { if s.config.SchedulingPollInterval == 0 { - for { - select { - case <-s.ctx.Done(): - return - default: - s.scheduleOnce(s.ctx) - } - } - } else { - ticker := time.NewTicker(s.config.SchedulingPollInterval) - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - s.scheduleOnce(s.ctx) - } + s.config.SchedulingPollInterval = 10 * time.Millisecond + } + ticker := time.NewTicker(s.config.SchedulingPollInterval) + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.scheduleOnce(s.ctx) } } } @@ -323,62 +313,64 @@ func (s *TaskScheduler) scheduleOnce(ctx context.Context) { func (s *TaskScheduler) updateTaskStatus() { if s.config.StatePollInterval == 0 { - for { - select { - case <-s.ctx.Done(): - return - default: - s.updateOnce(s.ctx) - } - } - } else { - ticker := time.NewTicker(s.config.StatePollInterval) - for { - select { - case <-s.ctx.Done(): - return - case <-ticker.C: - s.updateOnce(s.ctx) - } + s.config.StatePollInterval = 50 * time.Millisecond + } + ticker := time.NewTicker(s.config.StatePollInterval) + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + s.updateOnce(s.ctx) } } } +// improved: add ctx awareness func (s *TaskScheduler) updateCallbackTask() { - for t := range s.config.CallbackReceiver.GetCallbackChannel(s.ctx) { - // 可能是轮询已经处理过,或者重复回调 - if !s.checkProcessed(&t) { - continue - } - s.wg.Add(1) - task := t - go func() { - defer s.wg.Done() - if task.TaskStatus == TASK_STATUS_FAILED { - // 失败可以重试 - if task.TaskAttemptsTime < s.config.MaxFailedAttempts { - task.TaskAttemptsTime++ - newTask, _, err := s.Actuator.Start(s.ctx, &task) - // 尝试重启失败 - if err != nil { - resaon := fmt.Errorf("任务执行失败:%v, 并且尝试重启也失败 %v", task.FailedReason, err) - s.failed(s.ctx, &task, resaon) - } else { - _, err = s.Container.ToRunningStatus(s.ctx, newTask) // 更新状态 + ch := s.config.CallbackReceiver.GetCallbackChannel(s.ctx) + for { + select { + case <-s.ctx.Done(): + return + case t, ok := <-ch: + if !ok { + return + } + // 可能是轮询已经处理过,或者重复回调 + if !s.checkProcessed(&t) { + continue + } + s.wg.Add(1) + task := t + go func() { + defer s.wg.Done() + if task.TaskStatus == TASK_STATUS_FAILED { + // 失败可以重试 + if task.TaskAttemptsTime < s.config.MaxFailedAttempts { + task.TaskAttemptsTime++ + newTask, _, err := s.Actuator.Start(s.ctx, &task) + // 尝试重启失败 if err != nil { - s.Actuator.Stop(s.ctx, newTask) resaon := fmt.Errorf("任务执行失败:%v, 并且尝试重启也失败 %v", task.FailedReason, err) s.failed(s.ctx, &task, resaon) - return + } else { + _, err = s.Container.ToRunningStatus(s.ctx, newTask) // 更新状态 + if err != nil { + s.Actuator.Stop(s.ctx, newTask) + resaon := fmt.Errorf("任务执行失败:%v, 并且尝试重启也失败 %v", task.FailedReason, err) + s.failed(s.ctx, &task, resaon) + return + } } + } else { + s.failed(s.ctx, &task, task.FailedReason) } - } else { - s.failed(s.ctx, &task, task.FailedReason) + } else if task.TaskStatus == TASK_STATUS_SUCCESS { + s.export(s.ctx, &task) } - } else if task.TaskStatus == TASK_STATUS_SUCCESS { - s.export(s.ctx, &task) - } - }() + }() + } } } @@ -453,29 +445,25 @@ func (s *TaskScheduler) updateOnce(ctx context.Context) { func (s *TaskScheduler) finshed(ctx context.Context, task *Task) { // 添加到完成的任务 channel - task.TaskEnbTime = time.Now() + task.TaskEndTime = time.Now() if s.config.EnableFinshedTaskList { - c := time.NewTimer(50 * time.Millisecond) - retryCount := 0 - select { - case s.finshedTask <- task: - return - case <-c.C: - // 因为缓存满了,导致加入不进去,chan 弹出一个元素 - // 最多超时三次 - if retryCount >= 3 { - return - } - c.Reset(0) + const maxRetry = 3 + for i := 0; i <= maxRetry; i++ { select { - case <-s.finshedTask: - break - case <-c.C: - break + case s.finshedTask <- task: + return + default: + if i == maxRetry { + return + } + // 尝试释放一个老的元素 + select { + case <-s.finshedTask: + default: + } + time.Sleep(10 * time.Millisecond) } - c.Reset(0) - retryCount++ } } } @@ -528,3 +516,24 @@ func (s *TaskScheduler) success(ctx context.Context, task *Task) (*Task, error) } return newtask, err } + +// retryOrFail 统一重试逻辑 +func (s *TaskScheduler) retryOrFail(ctx context.Context, task *Task, failedReason error) { + if task.TaskAttemptsTime < s.config.MaxFailedAttempts { + task.TaskAttemptsTime++ + newTask, _, err := s.Actuator.Start(ctx, task) + if err != nil { // 重启失败 + resaon := fmt.Errorf("任务执行失败:%v, 并且尝试重启也失败 %v", failedReason, err) + s.failed(ctx, task, resaon) + return + } + if _, err = s.Container.ToRunningStatus(ctx, newTask); err != nil { + s.Actuator.Stop(ctx, newTask) + resaon := fmt.Errorf("任务执行失败:%v, 并且尝试重启也失败 %v", failedReason, err) + s.failed(ctx, task, resaon) + return + } + return + } + s.failed(ctx, task, failedReason) +}