Skip to content

Commit

Permalink
[standard]fix: data race of task result sent to result channel while …
Browse files Browse the repository at this point in the history
…result is being assigned when context done
  • Loading branch information
vistart committed Jun 14, 2024
1 parent 83790e9 commit ce1158a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 37 deletions.
30 changes: 10 additions & 20 deletions workflow/standard/cache/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,47 +17,37 @@ type Interface interface {
// Get retrieves the value of the specified key from the cache.
//
// Parameter
//
// - key: an object implementing the KeyGetter interface to retrieve the key to get.
// - key: an object implementing the KeyGetter interface to retrieve the key to get.
//
// Returns
//
// - any: the retrieved value, you may need to check if it is nil.
//
// - error: if an error occurs, it returns the corresponding error message.
// - any: the retrieved value, you may need to check if it is nil.
// - error: if an error occurs, it returns the corresponding error message.
Get(key KeyGetter) (any, error)

// Set sets the value of the specified key in the cache.
//
// Parameter
//
// - key: an object implementing the KeyGetter interface to retrieve the key to set.
//
// - value: the value to set, passed as an empty interface.
//
// - options: optional options to set cache items.
// - key: an object implementing the KeyGetter interface to retrieve the key to set.
// - value: the value to set, passed as an empty interface.
// - options: optional options to set cache items.
//
// Returns
//
// - error: if an error occurs, it returns the corresponding error message.
// - error: if an error occurs, it returns the corresponding error message.
Set(key KeyGetter, value any, options ...ItemOption) error

// Delete deletes the value of the specified key from the cache.
//
// Parameter
//
// - key: an object implementing the KeyGetter interface to retrieve the key to delete.
// - key: an object implementing the KeyGetter interface to retrieve the key to delete.
//
// Returns
//
// - error: if an error occurs, it returns the corresponding error message.
// - error: if an error occurs, it returns the corresponding error message.
Delete(key KeyGetter) error

// Clear clears the cache, deleting all cache items.
//
// Returns
//
// - error: if an error occurs, it returns the corresponding error message.
// - error: if an error occurs, it returns the corresponding error message.
Clear() error
}

Expand Down
34 changes: 17 additions & 17 deletions workflow/standard/worker/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (p *pool) startWorker() string {

// Execute the task asynchronously
done := make(chan struct{})
go func(done chan struct{}, channels *workerChannels) {
go func() {
for {
select {
case <-done:
Expand All @@ -197,12 +197,12 @@ func (p *pool) startWorker() string {
return
}
}
}(done, channels)
go func(done chan struct{}) {
}()
go func() {
defer close(done)
result.Result, err = taskWithArgs.task.Execute(taskWithArgs.ctx, taskWithArgs.args...)
result.Err = err
}(done)
}()

// Wait for the task to complete or the context to be cancelled
delta := metricsUpdate{deltaWorking: -1}
Expand All @@ -213,6 +213,8 @@ func (p *pool) startWorker() string {
// Task was cancelled
delta.deltaCanceled = 1
waiting = false
// Send the empty result back to the submitter
taskWithArgs.resultChan <- TaskResult{}
case <-done:
// Task completed
if err != nil {
Expand All @@ -225,12 +227,11 @@ func (p *pool) startWorker() string {
delta.deltaSuccessful = 1
}
waiting = false
// Send the task result back to the submitter
taskWithArgs.resultChan <- result
}
}
p.metricsChan <- delta

// Send the result back to the submitter
taskWithArgs.resultChan <- result
close(taskWithArgs.resultChan) // Close the resultChan after sending the result
case <-channels.cancel:
continue
Expand Down Expand Up @@ -317,9 +318,9 @@ func (p *pool) getWorkerID() string {
// the order in which tasks finish.
//
// Parameters:
// - ctx: The context used to derive a new context passed to the task's Execute method.
// - task: The task to be executed.
// - args: Additional arguments to be passed to the task.
// - ctx: The context used to derive a new context passed to the task's Execute method.
// - task: The task to be executed.
// - args: Additional arguments to be passed to the task.
//
// Returns a channel that will receive the TaskResult upon completion.
func (p *pool) Submit(ctx context.Context, task Task, args ...interface{}) <-chan TaskResult {
Expand Down Expand Up @@ -355,8 +356,8 @@ func (p *pool) Submit(ctx context.Context, task Task, args ...interface{}) <-cha
// notification will be sent.
//
// Parameters:
// - id: The ID of the worker to be exited.
// - stopWorker: A boolean flag indicating whether to cancel the task's context.
// - id: The ID of the worker to be exited.
// - stopWorker: A boolean flag indicating whether to cancel the task's context.
//
// If the specified ID exists, the worker will be exited immediately. Subsequent calls to this method
// with the same ID will return an error indicating that the worker ID does not exist, even if the
Expand Down Expand Up @@ -404,8 +405,8 @@ func (p *pool) StopWorkerByID(id string) error {
//
// This method changes the ID of a worker to a new ID.
//
// oldID: The current ID of the worker.
// newID: The new ID to assign to the worker.
// - oldID: The current ID of the worker.
// - newID: The new ID to assign to the worker.
//
// Returns an error if the old ID is not found or the new ID already exists.
func (p *pool) RenameWorker(oldID, newID string) error {
Expand Down Expand Up @@ -475,12 +476,11 @@ func (p *pool) handleMetricsUpdates() {
continue
}

p.mutex.Lock()
metrics, ok := p.metrics.(*metrics)
if !ok {
p.mutex.Unlock()
continue
}
metrics.Lock()
metrics.data.CurrentCapacity += update.deltaCapacity
metrics.data.WorkingWorkers += update.deltaWorking
metrics.data.IdleWorkers = metrics.data.CurrentCapacity - metrics.data.WorkingWorkers
Expand All @@ -490,6 +490,6 @@ func (p *pool) handleMetricsUpdates() {
metrics.data.FailedTasks += update.deltaFailed
metrics.data.CanceledTasks += update.deltaCanceled
metrics.data.TotalCompletedTasks += update.deltaSuccessful + update.deltaFailed + update.deltaCanceled
p.mutex.Unlock()
metrics.Unlock()
}
}

0 comments on commit ce1158a

Please sign in to comment.