Skip to content

Commit

Permalink
Use EventRecorder instead of emitting events manually (#36)
Browse files Browse the repository at this point in the history
  • Loading branch information
anandswaminathan committed Jun 27, 2019
1 parent 6dd5f55 commit 6d92339
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 101 deletions.
22 changes: 9 additions & 13 deletions cmd/flinkk8soperator/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/spf13/cobra"

"github.com/lyft/flinkk8soperator/pkg/controller"
controller_config "github.com/lyft/flinkk8soperator/pkg/controller/config"
controllerConfig "github.com/lyft/flinkk8soperator/pkg/controller/config"
ctrlRuntimeConfig "sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/kubernetes-sigs/controller-runtime/pkg/runtime/signals"
Expand All @@ -29,10 +29,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/manager"
)

const (
appName = "flinkk8soperator"
)

var (
cfgFile string
configAccessor = viper.NewAccessor(config.Options{})
Expand All @@ -46,27 +42,27 @@ var rootCmd = &cobra.Command{
return initConfig(cmd.Flags())
},
RunE: func(cmd *cobra.Command, args []string) error {
return executeRootCmd(controller_config.GetConfig())
return executeRootCmd(controllerConfig.GetConfig())
},
}

// Execute adds all child commands to the root command and sets flags appropriately.
// This is called by main.main(). It only needs to happen once to the rootCmd.
func Execute() {
version.LogBuildInformation(appName)
version.LogBuildInformation(controllerConfig.AppName)
if err := rootCmd.Execute(); err != nil {
fmt.Println(err)
os.Exit(1)
}
}

func Run(config *controller_config.Config) error {
if err := controller_config.SetConfig(config); err != nil {
func Run(config *controllerConfig.Config) error {
if err := controllerConfig.SetConfig(config); err != nil {
logger.Errorf(context.Background(), "Failed to set config: %v", err)
return err
}

return executeRootCmd(controller_config.GetConfig())
return executeRootCmd(controllerConfig.GetConfig())
}

func init() {
Expand Down Expand Up @@ -104,7 +100,7 @@ func logAndExit(err error) {
os.Exit(-1)
}

func executeRootCmd(controllerCfg *controller_config.Config) error {
func executeRootCmd(controllerCfg *controllerConfig.Config) error {
ctx, cancelNow := context.WithCancel(context.Background())

labeled.SetMetricKeys(common.GetValidLabelNames()...)
Expand Down Expand Up @@ -141,7 +137,7 @@ func executeRootCmd(controllerCfg *controller_config.Config) error {
}

func operatorEntryPoint(ctx context.Context, metricsScope promutils.Scope,
controllerCfg *controller_config.Config) (stopCh <-chan struct{}, err error) {
controllerCfg *controllerConfig.Config) (stopCh <-chan struct{}, err error) {

// Get a config to talk to the apiserver
cfg, err := ctrlRuntimeConfig.GetConfig()
Expand All @@ -167,7 +163,7 @@ func operatorEntryPoint(ctx context.Context, metricsScope promutils.Scope,

// Setup all Controllers
logger.Infof(ctx, "Adding controllers.")
if err := controller.AddToManager(ctx, mgr, controller_config.RuntimeConfig{
if err := controller.AddToManager(ctx, mgr, controllerConfig.RuntimeConfig{
MetricsScope: metricsScope,
}); err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions pkg/controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

//go:generate pflags Config

const AppName = "flinkK8sOperator"
const configSectionKey = "operator"

var ConfigSection = config.MustRegisterSection(configSectionKey, &Config{})
Expand Down
52 changes: 25 additions & 27 deletions pkg/controller/flink/flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package flink

import (
"context"
"encoding/json"
"errors"
"fmt"
"time"

"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/manager"

"k8s.io/apimachinery/pkg/runtime"

"github.com/lyft/flinkk8soperator/pkg/controller/common"

"github.com/lyft/flinkk8soperator/pkg/controller/config"
controllerConfig "github.com/lyft/flinkk8soperator/pkg/controller/config"
"github.com/lyft/flytestdlib/logger"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
Expand Down Expand Up @@ -82,7 +84,7 @@ type ControllerInterface interface {
FindExternalizedCheckpoint(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (string, error)

// Logs an event to the FlinkApplication resource and to the operator log
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, fieldPath string, eventType string, message string)
LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string)

// Compares and updates new cluster status with current cluster status
// Returns true if there is a change in ClusterStatus
Expand All @@ -93,14 +95,15 @@ type ControllerInterface interface {
CompareAndUpdateJobStatus(ctx context.Context, app *v1alpha1.FlinkApplication, hash string) (bool, error)
}

func NewController(k8sCluster k8.ClusterInterface, config config.RuntimeConfig) ControllerInterface {
func NewController(k8sCluster k8.ClusterInterface, mgr manager.Manager, config controllerConfig.RuntimeConfig) ControllerInterface {
metrics := newControllerMetrics(config.MetricsScope)
return &Controller{
k8Cluster: k8sCluster,
jobManager: NewJobManagerController(k8sCluster, config),
taskManager: NewTaskManagerController(k8sCluster, config),
flinkClient: client.NewFlinkJobManagerClient(config),
metrics: metrics,
k8Cluster: k8sCluster,
jobManager: NewJobManagerController(k8sCluster, config),
taskManager: NewTaskManagerController(k8sCluster, config),
flinkClient: client.NewFlinkJobManagerClient(config),
metrics: metrics,
eventRecorder: mgr.GetRecorder(controllerConfig.AppName),
}
}

Expand All @@ -122,16 +125,17 @@ type controllerMetrics struct {
}

type Controller struct {
k8Cluster k8.ClusterInterface
jobManager JobManagerControllerInterface
taskManager TaskManagerControllerInterface
flinkClient client.FlinkAPIInterface
metrics *controllerMetrics
k8Cluster k8.ClusterInterface
jobManager JobManagerControllerInterface
taskManager TaskManagerControllerInterface
flinkClient client.FlinkAPIInterface
metrics *controllerMetrics
eventRecorder record.EventRecorder
}

func getURLFromApp(application *v1alpha1.FlinkApplication, hash string) string {
service := VersionedJobManagerServiceName(application, hash)
cfg := config.GetConfig()
cfg := controllerConfig.GetConfig()
if cfg.UseProxy {
return fmt.Sprintf(proxyURL, cfg.ProxyPort.Port, application.Namespace, service)
}
Expand Down Expand Up @@ -204,21 +208,21 @@ func (f *Controller) CreateCluster(ctx context.Context, application *v1alpha1.Fl
newlyCreatedJm, err := f.jobManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Job manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, "", corev1.EventTypeWarning,
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create job managers: %v", err))

return err
}
newlyCreatedTm, err := f.taskManager.CreateIfNotExist(ctx, application)
if err != nil {
logger.Errorf(ctx, "Task manager cluster creation did not succeed %v", err)
f.LogEvent(ctx, application, "", corev1.EventTypeWarning,
f.LogEvent(ctx, application, corev1.EventTypeWarning,
fmt.Sprintf("Failed to create task managers: %v", err))
return err
}

if newlyCreatedJm || newlyCreatedTm {
f.LogEvent(ctx, application, "", corev1.EventTypeNormal, "Flink cluster created")
f.LogEvent(ctx, application, corev1.EventTypeNormal, "Flink cluster created")
}
return nil
}
Expand Down Expand Up @@ -397,7 +401,7 @@ func (f *Controller) DeleteOldResourcesForApp(ctx context.Context, app *v1alpha1
}

for k := range deletedHashes {
f.LogEvent(ctx, app, "", corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k))
f.LogEvent(ctx, app, corev1.EventTypeNormal, fmt.Sprintf("Deleted old cluster with hash %s", k))
}

return nil
Expand All @@ -420,7 +424,7 @@ func (f *Controller) FindExternalizedCheckpoint(ctx context.Context, application
return checkpoint.ExternalPath, nil
}

func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, fieldPath string, eventType string, message string) {
func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
reason := "Create"
if app.Status.DeployHash != "" {
// this is not the first deploy
Expand All @@ -430,14 +434,8 @@ func (f *Controller) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplicatio
reason = "Delete"
}

event := k8.CreateEvent(app, fieldPath, eventType, reason, message)
f.eventRecorder.Event(app, eventType, reason, message)
logger.Infof(ctx, "Logged %s event: %s: %s", eventType, reason, message)

// TODO: switch to using EventRecorder once we switch to controller runtime
if err := f.k8Cluster.CreateK8Object(ctx, &event); err != nil {
b, _ := json.Marshal(event)
logger.Errorf(ctx, "Failed to log event %v: %v", string(b), err)
}
}

// Gets and updates the cluster status
Expand Down
17 changes: 12 additions & 5 deletions pkg/controller/flink/flink_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"context"
"testing"

"github.com/lyft/flinkk8soperator/pkg/client/clientset/versioned/scheme"
"k8s.io/client-go/tools/record"

"time"

"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
Expand Down Expand Up @@ -35,12 +38,16 @@ const testFlinkVersion = "1.7"
func getTestFlinkController() Controller {
testScope := mockScope.NewTestScope()
labeled.SetMetricKeys(common.GetValidLabelNames()...)

recorderProvider := record.NewBroadcaster()

return Controller{
jobManager: &mock.JobManagerController{},
taskManager: &mock.TaskManagerController{},
k8Cluster: &k8mock.K8Cluster{},
flinkClient: &clientMock.JobManagerClient{},
metrics: newControllerMetrics(testScope),
jobManager: &mock.JobManagerController{},
taskManager: &mock.TaskManagerController{},
k8Cluster: &k8mock.K8Cluster{},
flinkClient: &clientMock.JobManagerClient{},
metrics: newControllerMetrics(testScope),
eventRecorder: recorderProvider.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "test"}),
}
}

Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/flink/mock/mock_flink.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"github.com/lyft/flinkk8soperator/pkg/apis/app/v1alpha1"
"github.com/lyft/flinkk8soperator/pkg/controller/common"
"github.com/lyft/flinkk8soperator/pkg/controller/flink/client"
"github.com/lyft/flinkk8soperator/pkg/controller/k8"
corev1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -120,8 +119,17 @@ func (m *FlinkController) FindExternalizedCheckpoint(ctx context.Context, applic
return "", nil
}

func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, fieldPath string, eventType string, message string) {
m.Events = append(m.Events, k8.CreateEvent(app, fieldPath, eventType, "Test", message))
func (m *FlinkController) LogEvent(ctx context.Context, app *v1alpha1.FlinkApplication, eventType string, message string) {
m.Events = append(m.Events, corev1.Event{
InvolvedObject: corev1.ObjectReference{
Kind: app.Kind,
Name: app.Name,
Namespace: app.Namespace,
},
Type: eventType,
Reason: "Test",
Message: message,
})
}

func (m *FlinkController) CompareAndUpdateClusterStatus(ctx context.Context, application *v1alpha1.FlinkApplication, hash string) (bool, error) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/flinkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *ReconcileFlinkApplication) Reconcile(request reconcile.Request) (reconc
// and Start it when the Manager is Started.
func Add(ctx context.Context, mgr manager.Manager, cfg config.RuntimeConfig) error {
k8sCluster := k8.NewK8Cluster(mgr)
flinkStateMachine := NewFlinkStateMachine(k8sCluster, cfg)
flinkStateMachine := NewFlinkStateMachine(k8sCluster, mgr, cfg)

metrics := newReconcilerMetrics(cfg.MetricsScope)
reconciler := ReconcileFlinkApplication{
Expand All @@ -127,7 +127,7 @@ func Add(ctx context.Context, mgr manager.Manager, cfg config.RuntimeConfig) err
flinkStateMachine: flinkStateMachine,
}

c, err := controller.New("flinkAppController", mgr, controller.Options{
c, err := controller.New(config.AppName, mgr, controller.Options{
MaxConcurrentReconciles: config.GetConfig().Workers,
Reconciler: &reconciler,
})
Expand Down

0 comments on commit 6d92339

Please sign in to comment.