/
containersource.go
155 lines (130 loc) · 6.17 KB
/
containersource.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package containersource
import (
"context"
"fmt"
"go.uber.org/zap"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
appsv1listers "k8s.io/client-go/listers/apps/v1"
"knative.dev/eventing/pkg/apis/sources/v1alpha2"
clientset "knative.dev/eventing/pkg/client/clientset/versioned"
"knative.dev/eventing/pkg/client/injection/reconciler/sources/v1alpha2/containersource"
listers "knative.dev/eventing/pkg/client/listers/sources/v1alpha2"
"knative.dev/eventing/pkg/logging"
"knative.dev/eventing/pkg/reconciler/containersource/resources"
"knative.dev/pkg/controller"
pkgreconciler "knative.dev/pkg/reconciler"
)
const (
// Name of the corev1.Events emitted from the reconciliation process
sourceReconciled = "ContainerSourceReconciled"
deploymentCreated = "ContainerSourceDeploymentCreated"
deploymentUpdated = "ContainerSourceDeploymentUpdated"
sinkBindingCreated = "ContainerSourceSinkBindingCreated"
sinkBindingUpdated = "ContainerSourceSinkBindingUpdated"
)
// newReconciledNormal makes a new reconciler event with event type Normal, and
// reason ContainerSourceReconciled.
func newReconciledNormal(namespace, name string) pkgreconciler.Event {
return pkgreconciler.NewEvent(corev1.EventTypeNormal, sourceReconciled, "ContainerSource reconciled: \"%s/%s\"", namespace, name)
}
// Reconciler implements controller.Reconciler for ContainerSource resources.
type Reconciler struct {
kubeClientSet kubernetes.Interface
eventingClientSet clientset.Interface
// listers index properties about resources
containerSourceLister listers.ContainerSourceLister
sinkBindingLister listers.SinkBindingLister
deploymentLister appsv1listers.DeploymentLister
}
// Check that our Reconciler implements Interface
var _ containersource.Interface = (*Reconciler)(nil)
// ReconcileKind implements Interface.ReconcileKind.
func (r *Reconciler) ReconcileKind(ctx context.Context, source *v1alpha2.ContainerSource) pkgreconciler.Event {
_, err := r.reconcileSinkBinding(ctx, source)
if err != nil {
logging.FromContext(ctx).Error("Error reconciling SinkBinding", zap.Error(err))
return err
}
_, err = r.reconcileReceiveAdapter(ctx, source)
if err != nil {
logging.FromContext(ctx).Error("Error reconciling ReceiveAdapter", zap.Error(err))
return err
}
return newReconciledNormal(source.Namespace, source.Name)
}
func (r *Reconciler) reconcileReceiveAdapter(ctx context.Context, source *v1alpha2.ContainerSource) (*appsv1.Deployment, error) {
expected := resources.MakeDeployment(source)
ra, err := r.deploymentLister.Deployments(expected.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Create(expected)
if err != nil {
return nil, fmt.Errorf("creating new Deployment: %v", err)
}
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeNormal, deploymentCreated, "Deployment created %q", ra.Name)
} else if err != nil {
return nil, fmt.Errorf("getting Deployment: %v", err)
} else if !metav1.IsControlledBy(ra, source) {
return nil, fmt.Errorf("Deployment %q is not owned by ContainerSource %q", ra.Name, source.Name)
} else if r.podSpecChanged(&ra.Spec.Template.Spec, &expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
ra, err = r.kubeClientSet.AppsV1().Deployments(expected.Namespace).Update(ra)
if err != nil {
return nil, fmt.Errorf("updating Deployment: %v", err)
}
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeNormal, deploymentUpdated, "Deployment updated %q", ra.Name)
} else {
logging.FromContext(ctx).Debug("Reusing existing Deployment", zap.Any("Deployment", ra))
}
source.Status.PropagateReceiveAdapterStatus(ra)
return ra, nil
}
func (r *Reconciler) reconcileSinkBinding(ctx context.Context, source *v1alpha2.ContainerSource) (*v1alpha2.SinkBinding, error) {
expected := resources.MakeSinkBinding(source)
sb, err := r.sinkBindingLister.SinkBindings(source.Namespace).Get(expected.Name)
if apierrors.IsNotFound(err) {
sb, err = r.eventingClientSet.SourcesV1alpha2().SinkBindings(source.Namespace).Create(expected)
if err != nil {
return nil, fmt.Errorf("creating new SinkBinding: %v", err)
}
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeNormal, sinkBindingCreated, "SinkBinding created %q", sb.Name)
} else if err != nil {
return nil, fmt.Errorf("getting SinkBinding: %v", err)
} else if !metav1.IsControlledBy(sb, source) {
return nil, fmt.Errorf("SinkBinding %q is not owned by ContainerSource %q", sb.Name, source.Name)
} else if r.sinkBindingSpecChanged(&sb.Spec, &expected.Spec) {
sb.Spec = expected.Spec
sb, err = r.eventingClientSet.SourcesV1alpha2().SinkBindings(source.Namespace).Update(sb)
if err != nil {
return nil, fmt.Errorf("updating SinkBinding: %v", err)
}
controller.GetEventRecorder(ctx).Eventf(source, corev1.EventTypeNormal, sinkBindingUpdated, "SinkBinding updated %q", sb.Name)
} else {
logging.FromContext(ctx).Debug("Reusing existing SinkBinding", zap.Any("SinkBinding", sb))
}
source.Status.PropagateSinkBindingStatus(&sb.Status)
return sb, nil
}
func (r *Reconciler) podSpecChanged(have *corev1.PodSpec, want *corev1.PodSpec) bool {
// TODO this won't work, SinkBinding messes with this. n3wscott working on a fix.
return !equality.Semantic.DeepDerivative(want, have)
}
func (r *Reconciler) sinkBindingSpecChanged(have *v1alpha2.SinkBindingSpec, want *v1alpha2.SinkBindingSpec) bool {
return !equality.Semantic.DeepDerivative(want, have)
}