-
Notifications
You must be signed in to change notification settings - Fork 111
/
reports.go
128 lines (114 loc) · 3.35 KB
/
reports.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package admin
import (
"context"
"fmt"
"time"
"github.com/rilldata/rill/admin/database"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// TriggerReport triggers an ad-hoc run of a report
func (s *Service) TriggerReport(ctx context.Context, depl *database.Deployment, report string) (err error) {
names := []*runtimev1.ResourceName{
{
Kind: runtime.ResourceKindReport,
Name: report,
},
}
rt, err := s.openRuntimeClientForDeployment(depl)
if err != nil {
return err
}
defer rt.Close()
_, err = rt.CreateTrigger(ctx, &runtimev1.CreateTriggerRequest{
InstanceId: depl.RuntimeInstanceID,
Trigger: &runtimev1.CreateTriggerRequest_RefreshTriggerSpec{
RefreshTriggerSpec: &runtimev1.RefreshTriggerSpec{OnlyNames: names},
},
})
return err
}
// TriggerReconcileAndAwaitReport triggers a reconcile and polls the runtime until the given report's spec version has been updated (or ctx is canceled).
func (s *Service) TriggerReconcileAndAwaitReport(ctx context.Context, depl *database.Deployment, reportName string) error {
rt, err := s.openRuntimeClientForDeployment(depl)
if err != nil {
return err
}
defer rt.Close()
reportReq := &runtimev1.GetResourceRequest{
InstanceId: depl.RuntimeInstanceID,
Name: &runtimev1.ResourceName{
Kind: runtime.ResourceKindReport,
Name: reportName,
},
}
// Get old spec version
var oldSpecVersion *int64
r, err := rt.GetResource(ctx, reportReq)
if err == nil {
oldSpecVersion = &r.Resource.Meta.SpecVersion
}
// Trigger reconcile
_, err = rt.CreateTrigger(ctx, &runtimev1.CreateTriggerRequest{
InstanceId: depl.RuntimeInstanceID,
Trigger: &runtimev1.CreateTriggerRequest_PullTriggerSpec{
PullTriggerSpec: &runtimev1.PullTriggerSpec{},
},
})
if err != nil {
return err
}
// Poll every 1 seconds until the report is found or the ctx is cancelled or times out
ctx, cancel := context.WithTimeout(ctx, time.Minute)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
}
r, err := rt.GetResource(ctx, reportReq)
if err != nil {
if s, ok := status.FromError(err); !ok || s.Code() != codes.NotFound {
return fmt.Errorf("failed to poll for report: %w", err)
}
if oldSpecVersion != nil {
// Success - previously the report was found, now we cannot find it anymore
return nil
}
// Continue polling
continue
}
if oldSpecVersion == nil {
// Success - previously the report was not found, now we found one
return nil
}
if *oldSpecVersion != r.Resource.Meta.SpecVersion {
// Success - the spec version has changed
return nil
}
}
}
// LookupReport fetches a report's spec from a runtime deployment.
func (s *Service) LookupReport(ctx context.Context, depl *database.Deployment, reportName string) (*runtimev1.ReportSpec, error) {
rt, err := s.openRuntimeClientForDeployment(depl)
if err != nil {
return nil, err
}
defer rt.Close()
res, err := rt.GetResource(ctx, &runtimev1.GetResourceRequest{
InstanceId: depl.RuntimeInstanceID,
Name: &runtimev1.ResourceName{
Kind: runtime.ResourceKindReport,
Name: reportName,
},
})
if err != nil {
return nil, err
}
return res.Resource.Resource.(*runtimev1.Resource_Report).Report.Spec, nil
}