Skip to content

Commit

Permalink
feat: Prevent EventBus with clients connected from being deleted (arg…
Browse files Browse the repository at this point in the history
…oproj#1066)

* feat: Prevent EventBus with clients connected from being deleted

Signed-off-by: Derek Wang <whynowy@gmail.com>
  • Loading branch information
whynowy committed Feb 13, 2021
1 parent bc75c51 commit c4d9335
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 49 deletions.
19 changes: 13 additions & 6 deletions controllers/eventbus/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"github.com/argoproj/argo-events/common/logging"
"github.com/argoproj/argo-events/controllers/eventbus"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

const (
Expand Down Expand Up @@ -62,22 +64,27 @@ func main() {
}

// Readyness probe
err = mgr.AddReadyzCheck("readiness", healthz.Ping)
if err != nil {
if err := mgr.AddReadyzCheck("readiness", healthz.Ping); err != nil {
logger.Fatalw("unable add a readiness check", zap.Error(err))
}

// Liveness probe
err = mgr.AddHealthzCheck("liveness", healthz.Ping)
if err != nil {
if err := mgr.AddHealthzCheck("liveness", healthz.Ping); err != nil {
logger.Fatalw("unable add a health check", zap.Error(err))
}

err = v1alpha1.AddToScheme(mgr.GetScheme())
if err != nil {
if err := v1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add scheme", zap.Error(err))
}

if err := eventsourcev1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add EventSource scheme", zap.Error(err))
}

if err := sensorv1alpha1.AddToScheme(mgr.GetScheme()); err != nil {
logger.Fatalw("unable to add Sensor scheme", zap.Error(err))
}

// A controller with DefaultControllerRateLimiter
c, err := controller.New(eventbus.ControllerName, mgr, controller.Options{
Reconciler: eventbus.NewReconciler(mgr.GetClient(), mgr.GetScheme(), natsStreamingImage, natsMetricsImage, logger),
Expand Down
6 changes: 3 additions & 3 deletions controllers/eventbus/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
log.Info("deleting eventbus")
if controllerutil.ContainsFinalizer(eventBus, finalizerName) {
// Finalizer logic should be added here.
if err := installer.Uninstall(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log); err != nil {
if err := installer.Uninstall(ctx, eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log); err != nil {
log.Errorw("failed to uninstall", zap.Error(err))
return nil
return err
}
controllerutil.RemoveFinalizer(eventBus, finalizerName)
}
Expand All @@ -87,7 +87,7 @@ func (r *reconciler) reconcile(ctx context.Context, eventBus *v1alpha1.EventBus)
eventBus.Status.MarkDeployFailed("InvalidSpec", err.Error())
return err
}
return installer.Install(eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
return installer.Install(ctx, eventBus, r.client, r.natsStreamingImage, r.natsMetricsImage, log)
}

func (r *reconciler) needsUpdate(old, new *v1alpha1.EventBus) bool {
Expand Down
5 changes: 3 additions & 2 deletions controllers/eventbus/installer/exotic_nats.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package installer

import (
"context"
"errors"

"go.uber.org/zap"
Expand All @@ -23,7 +24,7 @@ func NewExoticNATSInstaller(eventBus *v1alpha1.EventBus, logger *zap.SugaredLogg
}
}

func (i *exoticNATSInstaller) Install() (*v1alpha1.BusConfig, error) {
func (i *exoticNATSInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
natsObj := i.eventBus.Spec.NATS
if natsObj == nil || natsObj.Exotic == nil {
return nil, errors.New("invalid request")
Expand All @@ -37,7 +38,7 @@ func (i *exoticNATSInstaller) Install() (*v1alpha1.BusConfig, error) {
return busConfig, nil
}

func (i *exoticNATSInstaller) Uninstall() error {
func (i *exoticNATSInstaller) Uninstall(ctx context.Context) error {
i.logger.Info("nothing to uninstall")
return nil
}
7 changes: 4 additions & 3 deletions controllers/eventbus/installer/exotic_nats_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -38,17 +39,17 @@ var (
func TestInstallationExotic(t *testing.T) {
t.Run("installation with exotic nats config", func(t *testing.T) {
installer := NewExoticNATSInstaller(testExoticBus, logging.NewArgoEventsLogger())
conf, err := installer.Install()
conf, err := installer.Install(context.TODO())
assert.NoError(t, err)
assert.NotNil(t, conf.NATS)
assert.Equal(t, conf.NATS.URL, testExoticURL)
})
}

func TestUNinstallationExotic(t *testing.T) {
func TestUninstallationExotic(t *testing.T) {
t.Run("uninstallation with exotic nats config", func(t *testing.T) {
installer := NewExoticNATSInstaller(testExoticBus, logging.NewArgoEventsLogger())
err := installer.Uninstall()
err := installer.Uninstall(context.TODO())
assert.NoError(t, err)
})
}
82 changes: 73 additions & 9 deletions controllers/eventbus/installer/installer.go
Original file line number Diff line number Diff line change
@@ -1,31 +1,34 @@
package installer

import (
"errors"
"context"

"github.com/pkg/errors"
"go.uber.org/zap"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/argoproj/argo-events/common"
"github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

// Installer is an interface for event bus installation
type Installer interface {
Install() (*v1alpha1.BusConfig, error)
Install(ctx context.Context) (*v1alpha1.BusConfig, error)
// Uninsall only needs to handle those resources not cascade deleted.
// For example, undeleted PVCs not automatically deleted when deleting a StatefulSet
Uninstall() error
Uninstall(ctx context.Context) error
}

// Install function installs the event bus
func Install(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
func Install(ctx context.Context, eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Desugar().Error("failed to an installer", zap.Error(err))
return err
}
busConfig, err := installer.Install()
busConfig, err := installer.Install(ctx)
if err != nil {
logger.Desugar().Error("installation error", zap.Error(err))
return err
Expand Down Expand Up @@ -54,16 +57,77 @@ func getLabels(bus *v1alpha1.EventBus) map[string]string {
}
}

// Uninstall function uninstalls the extra resources who were not cleaned up
// when an eventbus was deleted. Most of the time this is not needed as all
// Uninstall function will be run before the EventBus object is deleted,
// usually it could be used to uninstall the extra resources who would not be cleaned
// up when an EventBus is deleted. Most of the time this is not needed as all
// the dependency resources should have been deleted by owner references cascade
// deletion, but things like PVC created by StatefulSet need to be cleaned up
// separately.
func Uninstall(eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
//
// It could also be used to check if the EventBus object can be safely deleted.
func Uninstall(ctx context.Context, eventBus *v1alpha1.EventBus, client client.Client, natsStreamingImage, natsMetricsImage string, logger *zap.SugaredLogger) error {
linkedEventSources, err := linkedEventSources(ctx, eventBus.Namespace, eventBus.Name, client)
if err != nil {
logger.Errorw("failed to query linked EventSources", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any EventSource linked")
}
if linkedEventSources > 0 {
return errors.Errorf("Can not delete an EventBus with %v EventSources connected", linkedEventSources)
}

linkedSensors, err := linkedSensors(ctx, eventBus.Namespace, eventBus.Name, client)
if err != nil {
logger.Errorw("failed to query linked Sensors", zap.Error(err))
return errors.Wrap(err, "failed to check if there is any Sensor linked")
}
if linkedSensors > 0 {
return errors.Errorf("Can not delete an EventBus with %v Sensors connected", linkedSensors)
}

installer, err := getInstaller(eventBus, client, natsStreamingImage, natsMetricsImage, logger)
if err != nil {
logger.Desugar().Error("failed to get an installer", zap.Error(err))
return err
}
return installer.Uninstall()
return installer.Uninstall(ctx)
}

func linkedEventSources(ctx context.Context, namespace, eventBusName string, c client.Client) (int, error) {
esl := &eventsourcev1alpha1.EventSourceList{}
if err := c.List(ctx, esl, &client.ListOptions{
Namespace: namespace,
}); err != nil {
return 0, err
}
result := 0
for _, es := range esl.Items {
ebName := es.Spec.EventBusName
if ebName == "" {
ebName = common.DefaultEventBusName
}
if ebName == eventBusName {
result++
}
}
return result, nil
}

func linkedSensors(ctx context.Context, namespace, eventBusName string, c client.Client) (int, error) {
sl := &sensorv1alpha1.SensorList{}
if err := c.List(ctx, sl, &client.ListOptions{
Namespace: namespace,
}); err != nil {
return 0, err
}
result := 0
for _, s := range sl.Items {
sName := s.Spec.EventBusName
if sName == "" {
sName = common.DefaultEventBusName
}
if sName == eventBusName {
result++
}
}
return result, nil
}
90 changes: 90 additions & 0 deletions controllers/eventbus/installer/installer_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
package installer

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"

"github.com/argoproj/argo-events/common/logging"
eventsourcev1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventsource/v1alpha1"
"github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
sensorv1alpha1 "github.com/argoproj/argo-events/pkg/apis/sensor/v1alpha1"
)

func TestGetInstaller(t *testing.T) {
Expand All @@ -23,3 +31,85 @@ func TestGetInstaller(t *testing.T) {
assert.True(t, ok)
})
}

func init() {
_ = eventsourcev1alpha1.AddToScheme(scheme.Scheme)
_ = sensorv1alpha1.AddToScheme(scheme.Scheme)
}

func TestGetLinkedEventSources(t *testing.T) {
t.Run("get linked eventsources", func(t *testing.T) {
es := fakeEmptyEventSource()
es.Spec.EventBusName = "test-sa"
es.Spec.Calendar = fakeCalendarEventSourceMap("test")
cl := fake.NewClientBuilder().Build()
ctx := context.Background()
err := cl.Create(ctx, es, &client.CreateOptions{})
assert.Nil(t, err)
n, err := linkedEventSources(ctx, testNamespace, "test-sa", cl)
assert.Nil(t, err)
assert.Equal(t, n, 1)
})
}

func TestGetLinkedSensors(t *testing.T) {
t.Run("get linked sensors", func(t *testing.T) {
s := fakeSensor()
s.Spec.EventBusName = "test-sa"
cl := fake.NewClientBuilder().Build()
ctx := context.Background()
err := cl.Create(ctx, s, &client.CreateOptions{})
assert.Nil(t, err)
n, err := linkedSensors(ctx, testNamespace, "test-sa", cl)
assert.Nil(t, err)
assert.Equal(t, n, 1)
})
}

func fakeEmptyEventSource() *eventsourcev1alpha1.EventSource {
return &eventsourcev1alpha1.EventSource{
ObjectMeta: metav1.ObjectMeta{
Namespace: testNamespace,
Name: "test-es",
},
Spec: eventsourcev1alpha1.EventSourceSpec{},
}
}

func fakeCalendarEventSourceMap(name string) map[string]eventsourcev1alpha1.CalendarEventSource {
return map[string]eventsourcev1alpha1.CalendarEventSource{name: {Schedule: "*/5 * * * *"}}
}

func fakeSensor() *sensorv1alpha1.Sensor {
return &sensorv1alpha1.Sensor{
ObjectMeta: metav1.ObjectMeta{
Name: "fake-sensor",
Namespace: testNamespace,
},
Spec: sensorv1alpha1.SensorSpec{
Triggers: []v1alpha1.Trigger{
{
Template: &v1alpha1.TriggerTemplate{
Name: "fake-trigger",
K8s: &v1alpha1.StandardK8STrigger{
GroupVersionResource: metav1.GroupVersionResource{
Group: "k8s.io",
Version: "",
Resource: "pods",
},
Operation: "create",
Source: &v1alpha1.ArtifactLocation{},
},
},
},
},
Dependencies: []v1alpha1.EventDependency{
{
Name: "fake-dep",
EventSourceName: "fake-source",
EventName: "fake-one",
},
},
},
}
}
8 changes: 3 additions & 5 deletions controllers/eventbus/installer/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,11 @@ func NewNATSInstaller(client client.Client, eventBus *v1alpha1.EventBus, streami
}

// Install creats a StatefulSet and a Service for NATS
func (i *natsInstaller) Install() (*v1alpha1.BusConfig, error) {
func (i *natsInstaller) Install(ctx context.Context) (*v1alpha1.BusConfig, error) {
natsObj := i.eventBus.Spec.NATS
if natsObj == nil || natsObj.Native == nil {
return nil, errors.New("invalid request")
}
ctx := context.Background()

svc, err := i.createStanService(ctx)
if err != nil {
Expand Down Expand Up @@ -118,13 +117,12 @@ func (i *natsInstaller) Install() (*v1alpha1.BusConfig, error) {
}

// Uninstall deletes those objects not handeled by cascade deletion.
func (i *natsInstaller) Uninstall() error {
ctx := context.Background()
func (i *natsInstaller) Uninstall(ctx context.Context) error {
return i.uninstallPVCs(ctx)
}

func (i *natsInstaller) uninstallPVCs(ctx context.Context) error {
// StatefulSet doens't clean up PVC, needs to do it separately
// StatefulSet doesn't clean up PVC, needs to do it separately
// https://github.com/kubernetes/kubernetes/issues/55045
log := i.logger
pvcs, err := i.getPVCs(ctx, i.labels)
Expand Down

0 comments on commit c4d9335

Please sign in to comment.