-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
orm.go
139 lines (120 loc) · 3.99 KB
/
orm.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package directrequestocr
import (
"fmt"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/smartcontractkit/chainlink/core/services/pg"
)
//go:generate mockery --quiet --name ORM --output ./mocks/ --case=underscore
type ORM interface {
CreateRequest(requestID RequestID, receivedAt time.Time, requestTxHash *common.Hash, qopts ...pg.QOpt) error
SetResult(requestID RequestID, runID int64, computationResult []byte, readyAt time.Time, qopts ...pg.QOpt) error
SetError(requestID RequestID, runID int64, errorType ErrType, computationError []byte, readyAt time.Time, qopts ...pg.QOpt) error
SetState(requestID RequestID, state RequestState, qopts ...pg.QOpt) (RequestState, error)
FindOldestEntriesByState(state RequestState, limit uint32, qopts ...pg.QOpt) ([]Request, error)
FindById(requestID RequestID, qopts ...pg.QOpt) (*Request, error)
// TODO add jobID or contract address when moving to the DB ORM
// TODO add state transition validation
// https://app.shortcut.com/chainlinklabs/story/54049/database-table-in-core-node
}
type inmemoryorm struct {
counter int64
db map[[32]byte]Request
mutex *sync.Mutex
}
var _ ORM = (*inmemoryorm)(nil)
func NewInMemoryORM() *inmemoryorm {
return &inmemoryorm{
counter: 0,
db: make(map[[32]byte]Request),
mutex: &sync.Mutex{},
}
}
func (o *inmemoryorm) CreateRequest(requestID RequestID, receivedAt time.Time, requestTxHash *common.Hash, qopts ...pg.QOpt) error {
o.mutex.Lock()
defer o.mutex.Unlock()
if _, ok := o.db[requestID]; ok {
return fmt.Errorf("request already exists")
}
o.counter++
newEntry := Request{
ID: o.counter,
RequestID: requestID,
ReceivedAt: receivedAt,
RequestTxHash: requestTxHash,
State: IN_PROGRESS,
}
o.db[requestID] = newEntry
return nil
}
func (o *inmemoryorm) SetResult(requestID RequestID, runID int64, computationResult []byte, readyAt time.Time, qopts ...pg.QOpt) error {
o.mutex.Lock()
defer o.mutex.Unlock()
if val, ok := o.db[requestID]; ok {
val.RunID = runID
val.ErrorType = NONE
val.Result = computationResult
val.Error = []byte{}
val.State = RESULT_READY
val.ResultReadyAt = readyAt
o.db[requestID] = val
return nil
}
return fmt.Errorf("can't find entry with requestID: %v", requestID)
}
func (o *inmemoryorm) SetError(requestID RequestID, runID int64, errorType ErrType, computationError []byte, readyAt time.Time, qopts ...pg.QOpt) error {
o.mutex.Lock()
defer o.mutex.Unlock()
if val, ok := o.db[requestID]; ok {
val.RunID = runID
val.ErrorType = errorType
val.Error = computationError
val.State = RESULT_READY
val.Result = []byte{}
val.ResultReadyAt = readyAt
o.db[requestID] = val
return nil
}
return fmt.Errorf("can't find entry with requestID: %v", requestID)
}
func (o *inmemoryorm) SetState(requestID RequestID, state RequestState, qopts ...pg.QOpt) (RequestState, error) {
o.mutex.Lock()
defer o.mutex.Unlock()
prevState := IN_PROGRESS
if val, ok := o.db[requestID]; ok {
prevState = val.State
val.State = state
o.db[requestID] = val
return prevState, nil
}
return prevState, fmt.Errorf("can't find entry with requestID: %v", requestID)
}
func (o *inmemoryorm) FindOldestEntriesByState(state RequestState, limit uint32, qopts ...pg.QOpt) ([]Request, error) {
o.mutex.Lock()
defer o.mutex.Unlock()
var result []Request
// NOTE: suboptimal if limit << full result
for _, val := range o.db {
if val.State == state {
result = append(result, val)
}
}
sort.Slice(result, func(i, j int) bool {
return result[i].ReceivedAt.Before(result[j].ReceivedAt)
})
if limit < uint32(len(result)) {
result = result[:limit]
}
return result, nil
}
func (o *inmemoryorm) FindById(requestID RequestID, qopts ...pg.QOpt) (*Request, error) {
o.mutex.Lock()
defer o.mutex.Unlock()
if val, ok := o.db[requestID]; ok {
return &val, nil
}
return nil, fmt.Errorf("can't find entry with dbid: %v", requestID)
}
// TODO actual DB: https://app.shortcut.com/chainlinklabs/story/54049/database-table-in-core-node