Skip to content

Go event-handler produces multiple Content-Length values #2532

@fbartnitzek

Description

@fbartnitzek

I'm trying to setup knative functions with MTChannelBasedBroker based on KafkaChannels.

A Ping-Source is producing events every minute:

apiVersion: sources.knative.dev/v1
kind: PingSource
metadata:
  name: aia-cron-ping
spec:
  schedule: "*/1 * * * *"
  contentType: "application/json"
  data: '{"ping":"load aia"}'
  ceOverrides:
    extensions:
      load: "aia"
  sink:
    ref:
      apiVersion: eventing.knative.dev/v1
      kind: Broker
      name: default
      namespace: default

My simple function has a subscription to the PingSource-Events:

specVersion: 0.36.0
name: simple-loader
runtime: go
registry: demo.jfrog.io/devops-docker-local/
image: demo.jfrog.io/devops-docker-local/simple-loader:latest
namespace: default
created: 2024-08-02T10:48:42.077191+02:00
invoke: cloudevent
build:
  builder: host
  buildEnvs:
  - name: FUNC_ENABLE_HOST_BUILDER
    value: "true"
  - name: FUNC_BUILDER
    value: host
  - name: FUNC_CONTAINER
    value: "true"
  - name: LOG_LEVEL
    value: error
deploy:
  namespace: default
  image: demo.jfrog.io/devops-docker-local/simple-loader@sha256:ad917ad5ee8e24d250306b355ed1d22ab4374101b0713e2780fb7ab3aba66b22
  options:
    scale:
      min: 1
      max: 2
  subscriptions:
  - source: default
    filters:
      load: aia
      type: dev.knative.sources.ping

And generates a new cloudEvent in it's handle-Method, with a json-Body:

package function

import (
	"context"
	"log/slog"
	"time"

	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/cloudevents/sdk-go/v2/event"
	"github.com/google/uuid"
	"github.com/rs/zerolog"
)

func (h *SimpleHandler) Handle(ctx context.Context, e event.Event) (*event.Event, error) {
	h.logger.Debug("Entering handler", slog.Group("event", "id", e.ID()))
	defer h.logger.Debug("Exiting handler", slog.Group("event", "id", e.ID()))
	h.logger.Info("Event data", slog.Group("event", "id", e.ID(), "data", e.Data()))

	data := struct {
		Simple string `json:"simple"`
		Time   string `json:"time"`
	}{
		Simple: "simple-modified",
		Time:   time.Now().Format(time.RFC3339),
	}

	outEvent := event.New()
	id := uuid.New().String()
	outEvent.SetID(id)
	outEvent.SetType("works.demo.simple")
	outEvent.SetSource("urn:demo:devops:simple:loader:1")
	outEvent.SetExtension("tenant", "some tenant")
	outEvent.SetData(cloudevents.ApplicationJSON, data)

	h.logger.Info("Outgoing event", "id", id, "data", data)
	return &outEvent, nil
}

But the knative-eventing/kafka-channel-dispatcher rejects the responded events from my function with the following message:

{"@timestamp":"2024-10-07T12:38:18.525Z","@version":"1","message":"failed to send event to subscriber context={topics=[knative-messaging-kafka.default.default-kne-trigger], consumerGroup='kafka.default.default-kne-trigger.25ea5c36-820a-4c5a-b39a-5433e198ebb9', reference=uuid: \"25ea5c36-820a-4c5a-b39a-5433e198ebb9\"\nnamespace: \"default\"\nname: \"default-simple-loader-function-656b5b9173502685d2682f8db4c22eca\"\n} target=http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b","logger_name":"dev.knative.eventing.kafka.broker.dispatcher.impl.http.WebClientCloudEventSender","thread_name":"vert.x-worker-thread-5","level":"ERROR","level_value":40000,"stack_trace":"java.lang.IllegalArgumentException: Multiple Content-Length values found: [58, 58]\n\tat io.netty.handler.codec.http.HttpUtil.normalizeAndGetContentLength(HttpUtil.java:612)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.readHeaders(HttpObjectDecoder.java:683)\n\tat io.netty.handler.codec.http.HttpObjectDecoder.decode(HttpObjectDecoder.java:285)\n\tat io.netty.handler.codec.http.HttpClientCodec$Decoder.decode(HttpClientCodec.java:238)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\n","context":{},"target":"http://broker-filter.knative-eventing.svc.cluster.local/triggers/default/simple-loader-function-trigger-0/3b75c7f5-55a4-4187-ada2-c21b31caf13b"}

In short: Multiple Content-Length values found: [58, 58].
So the function sends the json-body to the broker, but sets the HTTP-Header Content-Length twice, which isn't supported by the kafka-channel-dispatcher.

We guess, it has something to do with the used cloudevents-sdk and the Add on this line.

Versions:

  • knative 1.13.0 (serving, eventing, eventing-kafka, mt-channel-broker)
  • kafka via OLM 0.28.0
  • func v1.15.0, but also tried until v.1.12.2 with similar errors

Please have a look at my setup and what I or some part of the knative framework is doing wrong here.

Metadata

Metadata

Assignees

No one assigned

    Labels

    lifecycle/staleDenotes an issue or PR has remained open with no activity and has become stale.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions