-
Notifications
You must be signed in to change notification settings - Fork 0
/
archived_workflow_server.go
117 lines (108 loc) · 3.19 KB
/
archived_workflow_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package workflowarchive
import (
"context"
"fmt"
"sort"
"strconv"
"strings"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/argoproj/argo/persist/sqldb"
"github.com/argoproj/argo/pkg/apis/workflow"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/argoproj/argo/server/auth"
)
type archivedWorkflowServer struct {
wfArchive sqldb.WorkflowArchive
}
func NewWorkflowArchiveServer(wfArchive sqldb.WorkflowArchive) ArchivedWorkflowServiceServer {
return &archivedWorkflowServer{wfArchive: wfArchive}
}
func (w *archivedWorkflowServer) ListArchivedWorkflows(ctx context.Context, req *ListArchivedWorkflowsRequest) (*wfv1.WorkflowList, error) {
options := req.ListOptions
if options == nil {
options = &metav1.ListOptions{}
}
if options.Continue == "" {
options.Continue = "0"
}
limit := int(options.Limit)
if limit == 0 {
limit = 10
}
offset, err := strconv.Atoi(options.Continue)
if err != nil {
return nil, status.Error(codes.InvalidArgument, "listOptions.continue must be int")
}
if offset < 0 {
return nil, status.Error(codes.InvalidArgument, "listOptions.continue must >= 0")
}
namespace := ""
if strings.HasPrefix(options.FieldSelector, "metadata.namespace=") {
namespace = strings.TrimPrefix(options.FieldSelector, "metadata.namespace=")
}
items := make(wfv1.Workflows, 0)
authorizer := auth.NewAuthorizer(ctx)
// keep trying until we have enough
for len(items) < limit {
moreItems, err := w.wfArchive.ListWorkflows(namespace, limit, offset)
if err != nil {
return nil, err
}
for _, wf := range moreItems {
allowed, err := authorizer.CanI("get", workflow.WorkflowPlural, wf.Namespace, wf.Name)
if err != nil {
return nil, err
}
if allowed {
items = append(items, wf)
}
}
if len(moreItems) < limit {
break
}
offset = offset + limit
}
meta := metav1.ListMeta{}
if len(items) >= limit {
meta.Continue = fmt.Sprintf("%v", offset)
}
sort.Sort(items)
return &wfv1.WorkflowList{ListMeta: meta, Items: items}, nil
}
func (w *archivedWorkflowServer) GetArchivedWorkflow(ctx context.Context, req *GetArchivedWorkflowRequest) (*wfv1.Workflow, error) {
wf, err := w.wfArchive.GetWorkflow(req.Uid)
if err != nil {
return nil, err
}
if wf == nil {
return nil, status.Error(codes.NotFound, "not found")
}
allowed, err := auth.CanI(ctx, "get", workflow.WorkflowPlural, wf.Namespace, wf.Name)
if err != nil {
return nil, err
}
if !allowed {
return nil, status.Error(codes.PermissionDenied, "permission denied")
}
return wf, err
}
func (w *archivedWorkflowServer) DeleteArchivedWorkflow(ctx context.Context, req *DeleteArchivedWorkflowRequest) (*ArchivedWorkflowDeletedResponse, error) {
wf, err := w.GetArchivedWorkflow(ctx, &GetArchivedWorkflowRequest{Uid: req.Uid})
if err != nil {
return nil, err
}
allowed, err := auth.CanI(ctx, "delete", workflow.WorkflowPlural, wf.Namespace, wf.Name)
if err != nil {
return nil, err
}
if !allowed {
return nil, status.Error(codes.PermissionDenied, "permission denied")
}
err = w.wfArchive.DeleteWorkflow(req.Uid)
if err != nil {
return nil, err
}
return &ArchivedWorkflowDeletedResponse{}, nil
}