-
Notifications
You must be signed in to change notification settings - Fork 14
/
template_future.go
98 lines (90 loc) · 2.64 KB
/
template_future.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package gengorums
var futureCallVariables = `
{{$context := use "context.Context" .GenFile}}
{{$opts := use "grpc.CallOption" .GenFile}}
{{$futureOut := outType .Method $customOut}}
`
var futureCallComment = `
{{$comments := .Method.Comments.Leading}}
{{if ne $comments ""}}
{{$comments -}}
{{else}}
{{if hasPerNodeArg .Method}}
// {{$method}} asynchronously invokes a quorum call on each node in
// configuration c, with the argument returned by the provided function f
// and returns the result as a {{$futureOut}}, which can be used to inspect
// the quorum call reply and error when available.
// The provide per node function f takes the provided {{$in}} argument
// and returns an {{$out}} object to be passed to the given nodeID.
// The per node function f should be thread-safe.
{{else}}
// {{$method}} asynchronously invokes a quorum call on configuration c
// and returns a {{$futureOut}}, which can be used to inspect the quorum call
// reply and error when available.
{{end -}}
{{end -}}
`
var futureCallSignature = `func (c *Configuration) {{$method}}(` +
`ctx {{$context}}, in *{{$in}}` +
`{{perNodeFnType .GenFile .Method ", f"}}` +
`, opts ...{{$opts}}) ` +
`*{{$futureOut}} {`
var futureCallBody = `
fut := &{{$futureOut}}{
NodeIDs: make([]uint32, 0, c.n),
c: make(chan struct{}, 1),
}
go func() {
defer close(fut.c)
c.{{unexport .Method.GoName}}(ctx, in{{perNodeArg .Method ", f"}}, fut, opts...)
}()
return fut
}
`
var futureCallUnexportedSignature = `
func (c *Configuration) {{unexport .Method.GoName}}(` +
`ctx {{$context}}, in *{{$in}}` +
`{{perNodeFnType .GenFile .Method ", f"}}` +
`, resp *{{$futureOut}}, opts ...{{$opts}}) {
`
var futureCallReply = `
var (
reply *{{$customOut}}
errs []GRPCError
quorum bool
replies = make(map[uint32]*{{$out}}, 2*c.n)
)
for {
select {
case r := <-replyChan:
resp.NodeIDs = append(resp.NodeIDs, r.nid)
if r.err != nil {
errs = append(errs, GRPCError{r.nid, r.err})
break
}
{{template "traceLazyLog"}}
replies[r.nid] = r.reply
if reply, quorum = c.qspec.{{$method}}QF(in, replies); quorum {
resp.{{$customOutField}}, resp.err = reply, nil
return
}
case <-ctx.Done():
resp.{{$customOutField}}, resp.err = reply, QuorumCallError{ctx.Err().Error(), len(replies), errs}
return
}
if len(errs)+len(replies) == expected {
resp.{{$customOutField}}, resp.err = reply, QuorumCallError{"incomplete call", len(replies), errs}
return
}
}
}
`
var futureCall = commonVariables +
futureCallVariables +
futureCallComment +
futureCallSignature +
futureCallBody +
futureCallUnexportedSignature +
quorumCallLoop +
futureCallReply +
nodeCallGrpc