Skip to content

Commit

Permalink
Fix KafkaSink structured content mode
Browse files Browse the repository at this point in the history
`contentMode` wasn't passed to the contract, so the data plane
would produce binary events regardless of the provided contentMode.

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi committed Aug 4, 2022
1 parent 2412c54 commit 183bd98
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 1 deletion.
3 changes: 2 additions & 1 deletion control-plane/pkg/reconciler/sink/kafka_sink.go
Expand Up @@ -172,7 +172,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *eventing.KafkaSink)
Uid: string(ks.UID),
Topics: []string{ks.Spec.Topic},
Ingress: &contract.Ingress{
Path: receiver.PathFromObject(ks),
Path: receiver.PathFromObject(ks),
ContentMode: coreconfig.ContentModeFromString(*ks.Spec.ContentMode),
},
BootstrapServers: kafka.BootstrapServersCommaSeparated(ks.Spec.BootstrapServers),
Reference: &contract.Reference{
Expand Down
54 changes: 54 additions & 0 deletions control-plane/pkg/reconciler/sink/kafka_sink_test.go
Expand Up @@ -162,6 +162,60 @@ func sinkReconciliation(t *testing.T, format string, env config.Env) {
},
},
},
{
Name: "Reconciled normal - content mode structured",
Objects: []runtime.Object{
NewSink(
StatusControllerOwnsTopic(reconciler.ControllerTopicOwner),
SinkContentMode(v1alpha1.ModeStructured),
),
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.DataPlaneConfigMapName, nil),
SinkReceiverPod(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantUpdates: []clientgotesting.UpdateActionImpl{
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.DataPlaneConfigMapName, env.DataPlaneConfigFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: SinkUUID,
Topics: []string{SinkTopic()},
Ingress: &contract.Ingress{ContentMode: contract.ContentMode_STRUCTURED, Path: receiver.Path(SinkNamespace, SinkName)},
BootstrapServers: bootstrapServers,
Reference: SinkReference(),
},
},
Generation: 1,
}),
SinkReceiverPodUpdate(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
"annotation_to_preserve": "value_to_preserve",
}),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewSink(
StatusControllerOwnsTopic(reconciler.ControllerTopicOwner),
SinkContentMode(v1alpha1.ModeStructured),
InitSinkConditions,
StatusDataPlaneAvailable,
StatusConfigParsed,
BootstrapServers(bootstrapServersArr),
StatusConfigMapUpdatedReady(&env),
StatusTopicReadyWithOwner(SinkTopic(), sink.ControllerTopicOwner),
SinkAddressable(&env),
StatusProbeSucceeded,
),
},
},
},
{
Name: "Reconciled normal - with auth config",
Objects: []runtime.Object{
Expand Down
7 changes: 7 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_sink.go
Expand Up @@ -166,3 +166,10 @@ func SinkReceiverPodUpdate(namespace string, annotations map[string]string) clie
SinkReceiverPod(namespace, annotations),
)
}

func SinkContentMode(cm string) KRShapedOption {
return func(obj duckv1.KRShaped) {
ks := obj.(*eventing.KafkaSink)
ks.Spec.ContentMode = &cm
}
}

0 comments on commit 183bd98

Please sign in to comment.