Skip to content

Commit

Permalink
Defaultcontroller (#843)
Browse files Browse the repository at this point in the history
* Creating a default channel controller that will watch every channel
and update its status in case the channel is not being watched by any controller
This could happen if the end user creates a channel but doesn't install the
provisioner.
Issue#779: #779

* Removed commetned lines

* Fixed some typos

* Changes based on comments on code review (pull request)

* Moved the code to set ChannelConditionProvisionerInstalled=True to Initialize()
function from MarkProvisioned()

* Changed all %v to %s when logging strings

* Updating some formatting issues called out in PR
  • Loading branch information
akashrv authored and knative-prow-robot committed Mar 7, 2019
1 parent 8a963fc commit 24bcdf8
Show file tree
Hide file tree
Showing 6 changed files with 409 additions and 4 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"time"

"github.com/knative/eventing/pkg/reconciler/v1alpha1/channel"
"github.com/knative/eventing/pkg/reconciler/v1alpha1/subscription"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/controller"
Expand Down Expand Up @@ -119,6 +120,7 @@ func main() {
// manager run it.
providers := []ProvideFunc{
subscription.ProvideController,
channel.ProvideController,
}
for _, provider := range providers {
if _, err := provider(mgr); err != nil {
Expand Down
21 changes: 20 additions & 1 deletion pkg/apis/eventing/v1alpha1/channel_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ type ChannelSpec struct {
Subscribable *eventingduck.Subscribable `json:"subscribable,omitempty"`
}

var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable)
var chanCondSet = duckv1alpha1.NewLivingConditionSet(ChannelConditionProvisioned, ChannelConditionAddressable, ChannelConditionProvisionerInstalled)

// ChannelStatus represents the current state of a Channel.
type ChannelStatus struct {
Expand Down Expand Up @@ -119,6 +119,10 @@ const (
// ChannelConditionAddressable has status true when this Channel meets
// the Addressable contract and has a non-empty hostname.
ChannelConditionAddressable duckv1alpha1.ConditionType = "Addressable"

// ChannelConditionProvisionerFound has status true when the channel is being watched
// by the provisioner's channel controller (in other words, the provisioner is installed)
ChannelConditionProvisionerInstalled duckv1alpha1.ConditionType = "ProvisionerInstalled"
)

// GetCondition returns the condition currently associated with the given type, or nil.
Expand All @@ -134,6 +138,11 @@ func (cs *ChannelStatus) IsReady() bool {
// InitializeConditions sets relevant unset conditions to Unknown state.
func (cs *ChannelStatus) InitializeConditions() {
chanCondSet.Manage(cs).InitializeConditions()
// Channel-default-controller sets ChannelConditionProvisionerInstalled=False, and it needs to be set to True by individual controllers
// This is done so that each individual channel controller gets it for free.
// It is also implied here that the channel-default-controller never calls InitializeConditions(), while individual channel controllers
// call InitializeConditions() as one of the first things in its reconcile loop.
cs.MarkProvisionerInstalled()
}

// MarkProvisioned sets ChannelConditionProvisioned condition to True state.
Expand All @@ -146,6 +155,16 @@ func (cs *ChannelStatus) MarkNotProvisioned(reason, messageFormat string, messag
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisioned, reason, messageFormat, messageA...)
}

// MarkProvisionerInstalled sets ChannelConditionProvisionerInstalled condition to True state.
func (cs *ChannelStatus) MarkProvisionerInstalled() {
chanCondSet.Manage(cs).MarkTrue(ChannelConditionProvisionerInstalled)
}

// MarkProvisionerNotInstalled sets ChannelConditionProvisionerInstalled condition to False state.
func (cs *ChannelStatus) MarkProvisionerNotInstalled(reason, messageFormat string, messageA ...interface{}) {
chanCondSet.Manage(cs).MarkFalse(ChannelConditionProvisionerInstalled, reason, messageFormat, messageA...)
}

// SetAddress makes this Channel addressable by setting the hostname. It also
// sets the ChannelConditionAddressable to true.
func (cs *ChannelStatus) SetAddress(hostname string) {
Expand Down
16 changes: 13 additions & 3 deletions pkg/apis/eventing/v1alpha1/channel_types_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionUnknown,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -121,6 +124,9 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionFalse,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -141,12 +147,15 @@ func TestChannelInitializeConditions(t *testing.T) {
}, {
Type: ChannelConditionProvisioned,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionProvisionerInstalled,
Status: corev1.ConditionTrue,
}, {
Type: ChannelConditionReady,
Status: corev1.ConditionUnknown,
}}},
},
}
}},
},
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand Down Expand Up @@ -178,6 +187,7 @@ func TestChannelIsReady(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
cs := &ChannelStatus{}
cs.InitializeConditions()
if test.markProvisioned {
cs.MarkProvisioned()
} else {
Expand Down
141 changes: 141 additions & 0 deletions pkg/reconciler/v1alpha1/channel/channel.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package channel

import (
"context"

corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/source"

"github.com/golang/glog"
"github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/client-go/dynamic"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
// controllerAgentName is the string used by this controller to identify
// itself when creating events.
controllerAgentName = "channel-default-controller"
channelReconciled = "ChannelReconciled"
channelUpdateStatusFailed = "ChannelUpdateStatusFailed"
)

type reconciler struct {
client client.Client
restConfig *rest.Config
dynamicClient dynamic.Interface
recorder record.EventRecorder
}

// Verify the struct implements reconcile.Reconciler
var _ reconcile.Reconciler = &reconciler{}

// ProvideController returns a Channel controller.
// This Channel controller is a default controller for channels of all provisioner kinds
func ProvideController(mgr manager.Manager) (controller.Controller, error) {
// Setup a new controller to Reconcile channel
c, err := controller.New(controllerAgentName, mgr, controller.Options{
Reconciler: &reconciler{
recorder: mgr.GetRecorder(controllerAgentName),
},
})
if err != nil {
return nil, err
}

// Watch channel events
// This controller is no-op when Channels are deleted
if err := c.Watch(
&source.Kind{Type: &v1alpha1.Channel{}},
&handler.EnqueueRequestForObject{},
predicate.Funcs{
DeleteFunc: func(event.DeleteEvent) bool {
return false
},
}); err != nil {
return nil, err
}

return c, nil
}

// Reconcile will check if the channel is being watched by provisioner's channel controller
// This will improve UX. See https://github.com/knative/eventing/issues/779
func (r *reconciler) Reconcile(request reconcile.Request) (reconcile.Result, error) {
glog.Infof("Reconciling channel %s", request)
ch := &v1alpha1.Channel{}

// Controller-runtime client Get() always deep copies the object. Hence no need to again deep copy it
err := r.client.Get(context.TODO(), request.NamespacedName, ch)

if errors.IsNotFound(err) {
glog.Errorf("could not find channel %s\n", request)
return reconcile.Result{}, nil
}

if err != nil {
glog.Errorf("could not fetch channel %s: %s\n", request, err)
return reconcile.Result{}, err
}

err = r.reconcile(ch)

if err != nil {
glog.Warningf("Error reconciling channel %s: %s. Will retry.", request, err)
r.recorder.Eventf(ch, corev1.EventTypeWarning, channelUpdateStatusFailed, "Failed to update channel status: %s", request)
return reconcile.Result{Requeue: true}, err
}
glog.Infof("Successfully reconciled channel %s", request)
r.recorder.Eventf(ch, corev1.EventTypeNormal, channelReconciled, "Channel reconciled: %s", request)
return reconcile.Result{Requeue: false}, nil
}

func (r *reconciler) reconcile(ch *v1alpha1.Channel) error {
// Do not Initialize() Status in channel-default-controller. It will set ChannelConditionProvisionerInstalled=True
// Directly call GetCondition(). If the Status was never initialized then GetCondition() will return nil and
// IsUnknown() will return true
c := ch.Status.GetCondition(v1alpha1.ChannelConditionProvisionerInstalled)

if c.IsUnknown() {
ch.Status.MarkProvisionerNotInstalled(
"Provisioner not found.",
"Specified provisioner [Name:%s Kind:%s] is not installed or not controlling the channel.",
ch.Spec.Provisioner.Name,
ch.Spec.Provisioner.Kind,
)
err := r.client.Status().Update(context.TODO(), ch)
return err
}
return nil
}

func (r *reconciler) InjectClient(c client.Client) error {
r.client = c
return nil
}

0 comments on commit 24bcdf8

Please sign in to comment.