-
Notifications
You must be signed in to change notification settings - Fork 9
/
future.go
151 lines (121 loc) · 3.3 KB
/
future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
// Copyright 2022 Namespace Labs Inc; All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package compute
import (
"context"
"sync"
"time"
"namespacelabs.dev/foundation/schema"
"namespacelabs.dev/foundation/std/tasks"
)
type hasAction interface {
Action() *tasks.ActionEvent
}
type Promise[V any] struct {
actionID tasks.ActionID
c hasAction
mu sync.Mutex
waiters []chan atom[V] // We use channels, to allow for select{}ing for cancelation.
resolved atom[V]
done bool
}
type Future[V any] struct {
actionID tasks.ActionID
ch chan atom[V]
atom atom[V]
}
type Result[V any] struct {
Digest schema.Digest
NonDeterministic bool
Value V
}
type ResultWithTimestamp[V any] struct {
Result[V]
Set bool
Cached bool
ActionID tasks.ActionID
Started time.Time
Completed time.Time // When this value was computed (if known).
revision uint64 // Used in a Continuous() flow.
}
type atom[V any] struct {
value ResultWithTimestamp[V]
err error
}
func initializePromise[V any](p *Promise[V], c hasAction, id string) *Promise[V] {
p.actionID = tasks.ActionID(id)
p.c = c
return p
}
func makePromise[V any](c hasAction, id string) *Promise[V] {
return initializePromise(&Promise[V]{}, c, id)
}
func NewPromise[V any](g *Orch, action *tasks.ActionEvent, callback func(context.Context) (ResultWithTimestamp[V], error)) *Promise[V] {
id := tasks.NewActionID()
action = action.ID(id)
p := makePromise[V](wrapHasAction{action}, id.String())
g.Detach(action, func(ctx context.Context) error {
result, err := callback(ctx)
_ = p.resolve(result, err)
return nil
})
return p
}
type wrapHasAction struct{ action *tasks.ActionEvent }
func (w wrapHasAction) Action() *tasks.ActionEvent { return w.action }
func (f *Promise[V]) resolve(v ResultWithTimestamp[V], err error) error {
f.mu.Lock()
resolved := atom[V]{v, err}
f.resolved = resolved
f.done = true
waiters := f.waiters
f.waiters = nil
f.mu.Unlock()
for _, w := range waiters {
w <- resolved
close(w)
}
return err
}
func (f *Promise[V]) fail(err error) error {
return f.resolve(ResultWithTimestamp[V]{}, err)
}
func (f *Promise[V]) Future() *Future[V] {
f.mu.Lock()
defer f.mu.Unlock()
if f.done {
return &Future[V]{actionID: f.actionID, atom: f.resolved}
}
ch := make(chan atom[V], 1)
f.waiters = append(f.waiters, ch)
return &Future[V]{actionID: f.actionID, ch: ch}
}
func (r Result[V]) HasDigest() bool { return r.Digest.IsSet() }
func (f *Future[V]) Wait(ctx context.Context) (ResultWithTimestamp[V], error) {
if f.ch != nil {
if err := tasks.Action("compute.wait").Anchor(f.actionID).Run(ctx, func(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case resolved, ok := <-f.ch:
f.ch = nil
if !ok {
f.atom = atom[V]{err: context.Canceled}
} else {
f.atom = resolved
}
}
return nil
}); err != nil {
return ResultWithTimestamp[V]{}, err
}
}
return f.atom.value, f.atom.err
}
func valueFuture[V any](r ResultWithTimestamp[V]) *Promise[V] {
return &Promise[V]{done: true, resolved: atom[V]{value: r}}
}
func ErrPromise[V any](err error) *Promise[V] {
return &Promise[V]{done: true, resolved: atom[V]{err: err}}
}