-
Notifications
You must be signed in to change notification settings - Fork 787
/
step_report_activities.go
194 lines (166 loc) · 4.96 KB
/
step_report_activities.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package cmd
import (
"io"
"time"
"fmt"
"github.com/jenkins-x/jx/pkg/apis/jenkins.io/v1"
"github.com/jenkins-x/jx/pkg/client/clientset/versioned"
"github.com/jenkins-x/jx/pkg/jx/cmd/templates"
"github.com/jenkins-x/jx/pkg/kube"
"github.com/jenkins-x/jx/pkg/log"
pe "github.com/jenkins-x/jx/pkg/pipeline_events"
"github.com/spf13/cobra"
"gopkg.in/AlecAivazis/survey.v1/terminal"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/tools/cache"
)
// StepReportActivitiesOptions contains the command line flags
type StepReportActivitiesOptions struct {
StepReportOptions
Watch bool
pe.PipelineEventsProvider
}
var (
StepReportActivitiesLong = templates.LongDesc(`
This pipeline step reports activities to pluggable backends like ElasticSearch
`)
StepReportActivitiesExample = templates.Examples(`
jx step report activities
`)
)
func NewCmdStepReportActivities(f Factory, in terminal.FileReader, out terminal.FileWriter, errOut io.Writer) *cobra.Command {
options := StepReportActivitiesOptions{
StepReportOptions: StepReportOptions{
StepOptions: StepOptions{
CommonOptions: CommonOptions{
Factory: f,
In: in,
Out: out,
Err: errOut,
},
},
},
}
cmd := &cobra.Command{
Use: "activities",
Short: "Reports activities",
Long: StepReportActivitiesLong,
Example: StepReportActivitiesExample,
Run: func(cmd *cobra.Command, args []string) {
options.Cmd = cmd
options.Args = args
err := options.Run()
CheckErr(err)
},
}
cmd.Flags().BoolVarP(&options.Watch, "watch", "w", false, "Whether to watch activities")
options.addCommonFlags(cmd)
return cmd
}
func (o *StepReportActivitiesOptions) Run() error {
// look up services that we want to send events to using a label?
// watch activities and send an event for each backend i.e elasticsearch
f := o.Factory
_, _, err := o.KubeClient()
if err != nil {
return fmt.Errorf("cannot connect to Kubernetes cluster: %v", err)
}
jxClient, _, err := o.JXClient()
if err != nil {
return fmt.Errorf("cannot create jx client: %v", err)
}
apisClient, err := f.CreateApiExtensionsClient()
if err != nil {
return err
}
err = kube.RegisterPipelineActivityCRD(apisClient)
if err != nil {
return err
}
esServiceName := kube.AddonServices[defaultPEName]
externalURL, err := o.ensureAddonServiceAvailable(esServiceName)
if err != nil {
log.Warnf("no %s service found, are you in your teams dev environment? Type `jx env` to switch.\n", esServiceName)
return fmt.Errorf("try running `jx create addon pipeline-events` in your teams dev environment: %v", err)
}
server, auth, err := o.CommonOptions.getAddonAuthByKind(kube.ValueKindPipelineEvent, externalURL)
if err != nil {
return fmt.Errorf("error getting %s auth details, %v", kube.ValueKindPipelineEvent, err)
}
o.PipelineEventsProvider, err = pe.NewElasticsearchProvider(server, auth)
if err != nil {
return fmt.Errorf("error creating elasticsearch provider, %v", err)
}
if o.Watch {
err = o.watchPipelineActivities(jxClient, o.currentNamespace)
if err != nil {
return err
}
}
err = o.getPipelineActivities(jxClient, o.currentNamespace)
if err != nil {
return err
}
return nil
}
func (o *StepReportActivitiesOptions) getPipelineActivities(jxClient versioned.Interface, ns string) error {
activities, err := jxClient.JenkinsV1().PipelineActivities(ns).List(metav1.ListOptions{})
if err != nil {
return err
}
for _, a := range activities.Items {
err := o.PipelineEventsProvider.SendActivity(&a)
if err != nil {
log.Errorf("%v\n", err)
return err
}
}
return nil
}
func (o *StepReportActivitiesOptions) watchPipelineActivities(jxClient versioned.Interface, ns string) error {
activity := &v1.PipelineActivity{}
listWatch := cache.NewListWatchFromClient(jxClient.JenkinsV1().RESTClient(), "pipelineactivities", ns, fields.Everything())
kube.SortListWatchByName(listWatch)
_, controller := cache.NewInformer(
listWatch,
activity,
time.Hour*24,
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
// send to registered backends
activity, ok := obj.(*v1.PipelineActivity)
if !ok {
log.Errorf("Object is not a PipelineActivity %#v\n", obj)
return
}
log.Infof("New activity added %s\n", activity.ObjectMeta.Name)
err := o.PipelineEventsProvider.SendActivity(activity)
if err != nil {
log.Errorf("%v\n", err)
return
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
activity, ok := newObj.(*v1.PipelineActivity)
if !ok {
log.Errorf("Object is not a PipelineActivity %#v\n", newObj)
return
}
log.Infof("Updated activity added %s\n", activity.ObjectMeta.Name)
err := o.PipelineEventsProvider.SendActivity(activity)
if err != nil {
log.Errorf("%v\n", err)
return
}
},
DeleteFunc: func(obj interface{}) {
// no need to send event
},
},
)
stop := make(chan struct{})
go controller.Run(stop)
// Wait forever
select {}
}