Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 15 additions & 16 deletions docs/eventing/samples/writing-receive-adapter-source/01-theory.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Specifically, the `clientset`, `cache`, `informers`, and `listers` can all be ge
import (
// ...
sampleSourceClient "knative.dev/sample-source/pkg/client/injection/client"
samplesourceinformer knative.dev/sample-source/pkg/client/injection/informers/samples/v1alpha1/samplesource"
samplesourceinformer "knative.dev/sample-source/pkg/client/injection/informers/samples/v1alpha1/samplesource"
)
// ...
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
Expand All @@ -83,22 +83,21 @@ func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl
// ...
}
```
Ensure that the specific source subdirectory has been added to the injection portion of the `hack/update-codegen.sh` script.

```patch

# Sources
+API_DIRS_SOURCES=(github/pkg camel/source/pkg kafka/source/pkg awssqs/pkg couchdb/source/pkg prometheus/pkg YourSourceHere/pkg)
-API_DIRS_SOURCES=(github/pkg camel/source/pkg kafka/source/pkg awssqs/pkg couchdb/source/pkg prometheus/pkg)

```
and
```patch
-i knative.dev/eventing-contrib/camel/source/pkg/apis \
- -i knative.dev/eventing-contrib/github/pkg/apis
+ -i knative.dev/eventing-contrib/github/pkg/apis \
+ -i knative.dev/eventing-contrib/YourSourceHere/pkg/apis
Comment on lines -86 to -100
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Sample source's [`update-codegen.sh`](https://github.com/knative/sample-source/blob/master/hack/update-codegen.sh) have the configuration
to have the required things above generated and injected:
```bash
# Generation
${CODEGEN_PKG}/generate-groups.sh "deepcopy,client,informer,lister" \
knative.dev/sample-source/pkg/client knative.dev/sample-source/pkg/apis \
"samples:v1alpha1" \
--go-header-file ${REPO_ROOT}/hack/boilerplate/boilerplate.go.txt

# Injection
${KNATIVE_CODEGEN_PKG}/hack/generate-knative.sh "injection" \
knative.dev/sample-source/pkg/client knative.dev/sample-source/pkg/apis \
"samples:v1alpha1" \
--go-header-file ${REPO_ROOT}/hack/boilerplate/boilerplate.go.txt
```

File Layout & Hierarchy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,23 +32,27 @@ type SampleSource struct {

// SampleSourceSpec holds the desired state of the SampleSource (from the client).
type SampleSourceSpec struct {
// ServiceAccountName holds the name of the Kubernetes service account
// as which the underlying K8s resources should be run. If unspecified
// this will default to the "default" service account for the namespace
// in which the SampleSource exists.
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`
// inherits duck/v1 SourceSpec, which currently provides:
// * Sink - a reference to an object that will resolve to a domain name or
// a URI directly to use as the sink.
// * CloudEventOverrides - defines overrides to control the output format
// and modifications of the event sent to the sink.
duckv1.SourceSpec `json:",inline"`

// Interval is the time interval between events.
//
// The string format is a sequence of decimal numbers, each with optional
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
// units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
Interval string `json:"interval"`
// ServiceAccountName holds the name of the Kubernetes service account
// as which the underlying K8s resources should be run. If unspecified
// this will default to the "default" service account for the namespace
// in which the SampleSource exists.
// +optional
ServiceAccountName string `json:"serviceAccountName,omitempty"`

// Sink is a reference to an object that will resolve to a host
// name to use as the sink.
Sink *duckv1.Destination `json:"sink"`
// Interval is the time interval between events.
//
// The string format is a sequence of decimal numbers, each with optional
// fraction and a unit suffix, such as "300ms", "-1.5h" or "2h45m". Valid time
// units are "ns", "us" (or "µs"), "ms", "s", "m", "h". If unspecified
// this will default to "10s".
Interval string `json:"interval"`
}

// SampleSourceStatus communicates the observed state of the SampleSource (from the controller).
Expand All @@ -72,7 +76,7 @@ const (

```
Define the functions that will be called from the Reconciler functions to set the lifecycle conditions. This is typically done in
`pkg/apis/samples/VERSION/sampleservice_lifecycle.go`
`pkg/apis/samples/VERSION/samplesource_lifecycle.go`

```go
// InitializeConditions sets relevant unset conditions to Unknown state.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,55 @@ Pass the new controller implementation to the shared main
```go
import (
// The set of controllers this controller process runs.
"knative.dev/sample-source/pkg/reconciler"
"knative.dev/sample-source/pkg/reconciler/sample"

// This defines the shared main for injected controllers.
"knative.dev/pkg/injection/sharedmain"
)

func main() {
sharedmain.Main("sample-source-controller",
reconciler.NewController
)
sharedmain.Main("sample-source-controller", sample.NewController)
}
```
Define the NewController implementation, it will be passed a configmap.Watcher, as well as a context which the injected listers will use for the reconciler struct arguments
Define the NewController implementation, it will be passed a `configmap.Watcher`, as well as a context which the injected listers will use for the reconciler struct arguments
```go
func NewController(
ctx context.Context,
cmw configmap.Watcher,
) *controller.Impl {
// ...
deploymentInformer := deploymentinformer.Get(ctx)
sinkBindingInformer := sinkbindinginformer.Get(ctx)
sampleSourceInformer := samplesourceinformer.Get(ctx)

r := &Reconciler{
KubeClientSet: kubeclient.Get(ctx),
EventingClientSet: eventingclient.Get(ctx),
samplesourceLister: sampleSourceInformer.Lister(),
deploymentLister: deploymentInformer.Lister(),
samplesourceClientSet: samplesourceClient.Get(ctx),
}
dr: &reconciler.DeploymentReconciler{KubeClientSet: kubeclient.Get(ctx)},
sbr: &reconciler.SinkBindingReconciler{EventingClientSet: eventingclient.Get(ctx)},
// Config accessor takes care of tracing/config/logging config propagation to the receive adapter
configAccessor: reconcilersource.WatchConfigurations(ctx, "sample-source", cmw),
}
```
The base reconciler is imported from the knative.dev/pkg dependency:
```go
import (
// ...
"knative.dev/eventing/pkg/reconciler"
reconcilersource "knative.dev/eventing/pkg/reconciler/source"
// ...
)
```
Ensure the correct informers have EventHandlers filtered to them
```go
sampleSourceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
```
Controller for the `SampleSource` uses `Deployment` and `SinkBinding` resources to deploy and also bind the event source and the receive adapter. Also ensure the informers are set up correctly for these secondary resources
```go
deploymentInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("SampleSource")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

sinkBindingInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterGroupKind(v1alpha1.Kind("SampleSource")),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
```
100 changes: 68 additions & 32 deletions docs/eventing/samples/writing-receive-adapter-source/04-reconciler.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,64 +7,100 @@ type: "docs"

## Reconciler Functionality
General steps the reconciliation process needs to cover:
1. Target the specific samplesource via the `sampleServiceClientSet`:
1. Update the `ObservedGeneration` and initialize the `Status` conditions (as defined in `samplesource_lifecycle.go` and `samplesource_types.go`)
```go
// Get the resource with this namespace/name.
original, err := r.Lister.SampleSources(namespace).Get(name)
src.Status.InitializeConditions()
src.Status.ObservedGeneration = src.Generation
```
2. Create/reconcile the Receive Adapter (detailed below)
3. If successful, update the `Status` and `MarkDeployed`
```go
src.Status.PropagateDeploymentAvailability(ra)
```
4. Create/reconcile the `SinkBinding` for the Receive Adapter targeting the `Sink` (detailed below)
5. MarkSink with the result
```go
src.Status.MarkSink(sb.Status.SinkURI)
```
6. Return a new reconciler event stating that the process is done
```go
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SampleSourceReconciled", "SampleSource reconciled: \"%s/%s\"", namespace, name)
```
2. Update the ObservedGeneration
Initialize the Status conditions (as defined in `samplesource_lifecycle.go` and `samplesource_types.go`)
3. Reconcile the Sink and MarkSink with the result
Create the Receive Adapter (detailed below)
3. If successful, update the Status and MarkDeployed
4. Reconcile the EventTypes and corresponding Status
Creation and deletion of the events is done with the inherited `EventingClientSet().EventingV1alpha1()` api
5. Update the full status field from the resulting reconcile attempt via the source’s clientset and api
`r.samplesourceClientSet.SamplesV1alpha1().SampleSources(desired.Namespace).UpdateStatus(existing)`


## Reconcile/Create The Receive Adapter
As part of the source reconciliation, we have to create and deploy
(and update if necessary) the underlying receive adapter. The two
client sets used in this process is the `kubeClientSet` for the
Deployment tracking, and the `EventingClientSet` for the event
recording.
(and update if necessary) the underlying receive adapter.

Verify the specified kubernetes resources are valid, and update the `Status` accordingly

Assemble the ReceiveAdapterArgs
```go
raArgs := resources.ReceiveAdapterArgs{
EventSource: eventSource,
Image: r.receiveAdapterImage,
Source: src,
Labels: resources.GetLabels(src.Name),
SinkURI: sinkURI,
EventSource: src.Namespace + "/" + src.Name,
Image: r.ReceiveAdapterImage,
Source: src,
Labels: resources.Labels(src.Name),
AdditionalEnvs: r.configAccessor.ToEnvVars(), // Grab config envs for tracing/logging/metrics
}
```
NB The exact arguments may change based on functional requirements
Create the underlying deployment from the arguments provided, matching pod templates, labels, owner references, etc as needed to fill out the deployment
Example: [pkg/reconciler/resources/receive_adapter.go](https://github.com/knative/sample-source/tree/master/pkg/reconciler/resources/receive_adapter.go)
Example: [pkg/reconciler/sample/resources/receive_adapter.go](https://github.com/knative/sample-source/blob/master/pkg/reconciler/sample/resources/receive_adapter.go)

1. Fetch the existing receive adapter deployment
```go
ra, err := r.KubeClientSet.AppsV1().Deployments(src.Namespace).Get(expected.Name, metav1.GetOptions{})
namespace := owner.GetObjectMeta().GetNamespace()
ra, err := r.KubeClientSet.AppsV1().Deployments(namespace).Get(expected.Name, metav1.GetOptions{})
```
2. Otherwise, create the deployment
```go
ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Create(expected)
ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Create(expected)
```
3. Check if the expected vs existing spec is different, and update the deployment if required
```go
} else if podSpecChanged(ra.Spec.Template.Spec, expected.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(src.Namespace).Update(ra); err != nil {
return ra, err
}
} else if r.podSpecImageSync(expected.Spec.Template.Spec, ra.Spec.Template.Spec) {
ra.Spec.Template.Spec = expected.Spec.Template.Spec
if ra, err = r.KubeClientSet.AppsV1().Deployments(namespace).Update(ra); err != nil {
return ra, err
}
```
4. If updated, record the event
```go
r.Recorder.Eventf(src, corev1.EventTypeNormal, samplesourceDeploymentUpdated, "Deployment updated")
return ra, nil
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "DeploymentUpdated", "updated deployment: \"%s/%s\"", namespace, name)
```

## Reconcile/Create The SinkBinding
Instead of directly giving the details of the sink to the receive adapter, use a `SinkBinding` to bind the receive adapter with the sink.

Steps here are almost the same with the `Deployment` reconciliation above, but it is for another resource, `SinkBinding`.

1. Create a `Reference` for the receive adapter deployment. This deployment will be `SinkBinding`'s source:
```go
tracker.Reference{
APIVersion: appsv1.SchemeGroupVersion.String(),
Kind: "Deployment",
Namespace: ra.Namespace,
Name: ra.Name,
}
```
2. Fetch the existing `SinkBinding`
```go
namespace := owner.GetObjectMeta().GetNamespace()
sb, err := r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Get(expected.Name, metav1.GetOptions{})
```
2. If it doesn't exist, create it
```go
sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Create(expected)
```
3. Check if the expected vs existing spec is different, and update the `SinkBinding` if required
```go
else if r.specChanged(sb.Spec, expected.Spec) {
sb.Spec = expected.Spec
if sb, err = r.EventingClientSet.SourcesV1alpha2().SinkBindings(namespace).Update(sb); err != nil {
return sb, err
}
```
4. If updated, record the event
```go
return pkgreconciler.NewEvent(corev1.EventTypeNormal, "SinkBindingUpdated", "updated SinkBinding: \"%s/%s\"", namespace, name)
```
Loading