forked from RichardKnop/machinery
-
Notifications
You must be signed in to change notification settings - Fork 0
/
null.go
146 lines (119 loc) · 3.87 KB
/
null.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package null
import (
"fmt"
"github.com/yukimochi/machinery-v1/v1/backends/iface"
"github.com/yukimochi/machinery-v1/v1/common"
"github.com/yukimochi/machinery-v1/v1/config"
"github.com/yukimochi/machinery-v1/v1/tasks"
)
// ErrGroupNotFound ...
type ErrGroupNotFound struct {
groupUUID string
}
// NewErrGroupNotFound returns new instance of ErrGroupNotFound
func NewErrGroupNotFound(groupUUID string) ErrGroupNotFound {
return ErrGroupNotFound{groupUUID: groupUUID}
}
// Error implements error interface
func (e ErrGroupNotFound) Error() string {
return fmt.Sprintf("Group not found: %v", e.groupUUID)
}
// ErrTasknotFound ...
type ErrTasknotFound struct {
taskUUID string
}
// NewErrTasknotFound returns new instance of ErrTasknotFound
func NewErrTasknotFound(taskUUID string) ErrTasknotFound {
return ErrTasknotFound{taskUUID: taskUUID}
}
// Error implements error interface
func (e ErrTasknotFound) Error() string {
return fmt.Sprintf("Task not found: %v", e.taskUUID)
}
// Backend represents an "null" result backend
type Backend struct {
common.Backend
groups map[string]struct{}
}
// New creates NullBackend instance
func New() iface.Backend {
return &Backend{
Backend: common.NewBackend(new(config.Config)),
groups: make(map[string]struct{}),
}
}
// InitGroup creates and saves a group meta data object
func (b *Backend) InitGroup(groupUUID string, taskUUIDs []string) error {
b.groups[groupUUID] = struct{}{}
return nil
}
// GroupCompleted returns true (always)
func (b *Backend) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
_, ok := b.groups[groupUUID]
if !ok {
return false, NewErrGroupNotFound(groupUUID)
}
return true, nil
}
// GroupTaskStates returns null states of all tasks in the group
func (b *Backend) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
_, ok := b.groups[groupUUID]
if !ok {
return nil, NewErrGroupNotFound(groupUUID)
}
ret := make([]*tasks.TaskState, 0, groupTaskCount)
return ret, nil
}
// TriggerChord returns true (always)
func (b *Backend) TriggerChord(groupUUID string) (bool, error) {
return true, nil
}
// SetStatePending updates task state to PENDING
func (b *Backend) SetStatePending(signature *tasks.Signature) error {
state := tasks.NewPendingTaskState(signature)
return b.updateState(state)
}
// SetStateReceived updates task state to RECEIVED
func (b *Backend) SetStateReceived(signature *tasks.Signature) error {
state := tasks.NewReceivedTaskState(signature)
return b.updateState(state)
}
// SetStateStarted updates task state to STARTED
func (b *Backend) SetStateStarted(signature *tasks.Signature) error {
state := tasks.NewStartedTaskState(signature)
return b.updateState(state)
}
// SetStateRetry updates task state to RETRY
func (b *Backend) SetStateRetry(signature *tasks.Signature) error {
state := tasks.NewRetryTaskState(signature)
return b.updateState(state)
}
// SetStateSuccess updates task state to SUCCESS
func (b *Backend) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
state := tasks.NewSuccessTaskState(signature, results)
return b.updateState(state)
}
// SetStateFailure updates task state to FAILURE
func (b *Backend) SetStateFailure(signature *tasks.Signature, err string) error {
state := tasks.NewFailureTaskState(signature, err)
return b.updateState(state)
}
// GetState returns the latest task state
func (b *Backend) GetState(taskUUID string) (*tasks.TaskState, error) {
return nil, NewErrTasknotFound(taskUUID)
}
// PurgeState deletes stored task state
func (b *Backend) PurgeState(taskUUID string) error {
return NewErrTasknotFound(taskUUID)
}
// PurgeGroupMeta deletes stored group meta data
func (b *Backend) PurgeGroupMeta(groupUUID string) error {
_, ok := b.groups[groupUUID]
if !ok {
return NewErrGroupNotFound(groupUUID)
}
return nil
}
func (b *Backend) updateState(s *tasks.TaskState) error {
return nil
}