forked from Workiva/go-datastructures
-
Notifications
You must be signed in to change notification settings - Fork 0
/
futures.go
105 lines (92 loc) · 2.99 KB
/
futures.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package futures is useful for broadcasting an identical message to a multitude
of listeners as opposed to channels which will choose a listener at random
if multiple listeners are listening to the same channel. The future will
also cache the result so any future interest will be immediately returned
to the consumer.
*/
package futures
import (
"fmt"
"sync"
"time"
)
// Completer is a channel that the future expects to receive
// a result on. The future only receives on this channel.
type Completer <-chan interface{}
// Future represents an object that can be used to perform asynchronous
// tasks. The constructor of the future will complete it, and listeners
// will block on getresult until a result is received. This is different
// from a channel in that the future is only completed once, and anyone
// listening on the future will get the result, regardless of the number
// of listeners.
type Future struct {
triggered bool // because item can technically be nil and still be valid
item interface{}
err error
lock sync.Mutex
wg sync.WaitGroup
}
// GetResult will immediately fetch the result if it exists
// or wait on the result until it is ready.
func (f *Future) GetResult() (interface{}, error) {
f.lock.Lock()
if f.triggered {
f.lock.Unlock()
return f.item, f.err
}
f.lock.Unlock()
f.wg.Wait()
return f.item, f.err
}
// HasResult will return true iff the result exists
func (f *Future) HasResult() bool {
f.lock.Lock()
hasResult := f.triggered
f.lock.Unlock()
return hasResult
}
func (f *Future) setItem(item interface{}, err error) {
f.lock.Lock()
f.triggered = true
f.item = item
f.err = err
f.lock.Unlock()
f.wg.Done()
}
func listenForResult(f *Future, ch Completer, timeout time.Duration, wg *sync.WaitGroup) {
wg.Done()
t := time.NewTimer(timeout)
select {
case item := <-ch:
f.setItem(item, nil)
t.Stop() // we want to trigger GC of this timer as soon as it's no longer needed
case <-t.C:
f.setItem(nil, fmt.Errorf(`timeout after %f seconds`, timeout.Seconds()))
}
}
// New is the constructor to generate a new future. Pass the completed
// item to the toComplete channel and any listeners will get
// notified. If timeout is hit before toComplete is called,
// any listeners will get passed an error.
func New(completer Completer, timeout time.Duration) *Future {
f := &Future{}
f.wg.Add(1)
var wg sync.WaitGroup
wg.Add(1)
go listenForResult(f, completer, timeout, &wg)
wg.Wait()
return f
}