Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strimzy Canary does not work with topic retention policy: compact #173

Closed
stylius opened this issue May 3, 2022 · 1 comment · Fixed by #174
Closed

Strimzy Canary does not work with topic retention policy: compact #173

stylius opened this issue May 3, 2022 · 1 comment · Fixed by #174
Milestone

Comments

@stylius
Copy link

stylius commented May 3, 2022

When the topic is configured with log retention policy to delete, every thing is fine. If the topic however is configured with policy to compact or both compact and delete, then Strimzi Canary fails to write to the topic. There is no documentation that explicitly requires the topic retention to be delete, so I assume this is a defect. If not, maybe this should be properly documented.

I0503 12:18:36.533285       1 canary_manager.go:124] Canary manager reconcile ...
I0503 12:18:36.534801       1 topic.go:188] Going to reassign topic partitions if needed
W0503 12:18:36.930840       1 producer.go:96] Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected.
W0503 12:18:37.202077       1 producer.go:96] Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected.
W0503 12:18:37.477782       1 producer.go:96] Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected.
I0503 12:18:37.477815       1 canary_manager.go:134] ... reconcile done

We have multiple Kafka deployments with mixed default policies for topic log retention. Strimzi canary appears to create a topic using the default policy in Kafka and not set the retention policy itself. As a result Strimzy Canary fails.

Complete log:

I0503 11:55:48.298592       1 main.go:48] Starting Strimzi canary tool [0.2.0] with config: {BootstrapServers:[kafka-sty-kafka-bootstrap:9092], BootstrapBackoffMaxAttempts:10, BootstrapBackoffScale:5000, Topic:strimzi_canary_8, TopicConfig:map[cleanup.policy:compact,delete], ReconcileInterval:10000 ms, ClientID:strimzi-canary-client, ConsumerGroupID:strimzi-canary-group, ProducerLatencyBuckets:[100 200 400 800 1600], EndToEndLatencyBuckets:[100 200 400 800 1600], ExpectedClusterSize:-1, KafkaVersion:2.8.0,SaramaLogEnabled:true, VerbosityLogLevel:2, TLSEnabled:false, TLSCACert:, TLSClientCert:, TLSClientKey:, TLSInsecureSkipVerify:false,SASLMechanism:, SASLUser:, SASLPassword:, ConnectionCheckInterval:120000 ms, ConnectionCheckLatencyBuckets:[100 200 400 800 1600], StatusCheckInterval:30000 ms, StatusTimeWindow:300000 ms}
I0503 11:55:48.298940       1 http_server.go:41] Starting HTTP server
[Sarama] 2022/05/03 11:55:48 [Initializing new client]
[Sarama] 2022/05/03 11:55:48 client/metadata fetching metadata for all topics from broker [kafka-sty-kafka-bootstrap:9092]
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-bootstrap:9092] (unregistered)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[0 %!d(string=kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[2 %!d(string=kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[1 %!d(string=kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 [Successfully initialized new client]
I0503 11:55:48.306205       1 canary_manager.go:58] Starting canary manager
I0503 11:55:48.306229       1 connection_check.go:116] Creating Sarama cluster admin
[Sarama] 2022/05/03 11:55:48 [Initializing new client]
[Sarama] 2022/05/03 11:55:48 client/metadata fetching metadata for all topics from broker [kafka-sty-kafka-bootstrap:9092]
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-bootstrap:9092] (unregistered)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[0 %!d(string=kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[2 %!d(string=kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[1 %!d(string=kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 [Successfully initialized new client]
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=1)] (registered as #%!d(MISSING))
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=0)] (registered as #%!d(MISSING))
I0503 11:55:48.417317       1 connection_check.go:157] Connected to broker 0 in 6 ms
[Sarama] 2022/05/03 11:55:48 Closed connection to broker [kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092]
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=2)] (registered as #%!d(MISSING))
[Sarama] 2022/05/03 11:55:48 Closed connection to broker [kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092]
I0503 11:55:48.497001       1 connection_check.go:157] Connected to broker 2 in 79 ms
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=1)] (registered as #%!d(MISSING))
[Sarama] 2022/05/03 11:55:48 Closed connection to broker [kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092]
I0503 11:55:48.506863       1 connection_check.go:157] Connected to broker 1 in 8 ms
[Sarama] 2022/05/03 11:55:48 [Initializing new client]
[Sarama] 2022/05/03 11:55:48 client/metadata fetching metadata for all topics from broker [kafka-sty-kafka-bootstrap:9092]
I0503 11:55:48.506991       1 topic.go:116] Creating Sarama cluster admin
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-bootstrap:9092] (unregistered)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[0 %!d(string=kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[2 %!d(string=kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/brokers registered new broker #[1 %!d(string=kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092)] at %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 [Successfully initialized new client]
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=1)] (registered as #%!d(MISSING))
I0503 11:55:48.594753       1 topic.go:169] The canary topic strimzi_canary_8 already exists
I0503 11:55:48.594791       1 topic.go:392] Metadata for strimzi_canary_8 topic
I0503 11:55:48.594798       1 topic.go:394] 	{ID:0 Leader:0 Replicas:[0 1 2] Isr:[0 1 2] OfflineReplicas:[]}
I0503 11:55:48.594811       1 topic.go:394] 	{ID:1 Leader:1 Replicas:[1 2 0] Isr:[1 2 0] OfflineReplicas:[]}
I0503 11:55:48.594817       1 topic.go:394] 	{ID:2 Leader:2 Replicas:[2 0 1] Isr:[2 0 1] OfflineReplicas:[]}
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=0)] (registered as #%!d(MISSING))
I0503 11:55:48.615742       1 topic.go:188] Going to reassign topic partitions if needed
I0503 11:55:48.615774       1 topic.go:322] Topic strimzi_canary_8 requested partitions assignments = map[0:[0 1 2] 1:[1 2 0] 2:[2 0 1]], minISR = 2
I0503 11:55:48.649701       1 topic.go:296] Elect leader = false
I0503 11:55:48.649738       1 consumer.go:135] Waiting consumer group to be up and running
I0503 11:55:48.649757       1 consumer.go:118] Consumer group consume starting...
[Sarama] 2022/05/03 11:55:48 client/metadata fetching metadata for [[strimzi_canary_8] kafka-sty-kafka-bootstrap:9092] from broker %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/coordinator requesting coordinator for consumergroup [strimzi-canary-group kafka-sty-kafka-bootstrap:9092] from %!s(MISSING)
[Sarama] 2022/05/03 11:55:48 client/coordinator coordinator for consumergroup [strimzi-canary-group %!s(int32=0) kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092] is #%!d(MISSING) (%!s(MISSING))
[Sarama] 2022/05/03 11:55:48 Connected to broker at [kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=0)] (registered as #%!d(MISSING))
[Sarama] 2022/05/03 11:55:51 client/coordinator requesting coordinator for consumergroup [strimzi-canary-group kafka-sty-kafka-bootstrap:9092] from %!s(MISSING)
[Sarama] 2022/05/03 11:55:51 client/coordinator coordinator for consumergroup [strimzi-canary-group %!s(int32=0) kafka-sty-kafka-0.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092] is #%!d(MISSING) (%!s(MISSING))
I0503 11:55:51.705637       1 consumer.go:191] Consumer group setup
I0503 11:55:51.705708       1 consumer.go:150] Sarama consumer group up and running
I0503 11:55:51.705858       1 producer.go:86] Sending message: value={"producerId":"strimzi-canary-client","messageId":1,"timestamp":1651578951705} on partition=0
[Sarama] 2022/05/03 11:55:51 producer/broker/0 starting up
[Sarama] 2022/05/03 11:55:51 producer/broker/0 state change to [open] on strimzi_canary_8/0
[Sarama] 2022/05/03 11:55:51 producer/broker/1 starting up
W0503 11:55:51.707600       1 producer.go:96] Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected.
I0503 11:55:51.707675       1 producer.go:86] Sending message: value={"producerId":"strimzi-canary-client","messageId":2,"timestamp":1651578951707} on partition=1
I0503 11:55:51.708588       1 consumer.go:203] Consumer group consumeclaim on strimzi_canary_8 [0]
[Sarama] 2022/05/03 11:55:51 producer/broker/1 state change to [open] on strimzi_canary_8/1
[Sarama] 2022/05/03 11:55:51 consumer/broker/0 added subscription to strimzi_canary_8/0
[Sarama] 2022/05/03 11:55:51 Connected to broker at [kafka-sty-kafka-1.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=1)] (registered as #%!d(MISSING))
[Sarama] 2022/05/03 11:55:51 Connected to broker at [kafka-sty-kafka-2.kafka-sty-kafka-brokers.paas-infrastructure.svc:9092 %!s(int32=2)] (registered as #%!d(MISSING))
I0503 11:55:51.797425       1 consumer.go:203] Consumer group consumeclaim on strimzi_canary_8 [2]
[Sarama] 2022/05/03 11:55:51 consumer/broker/2 added subscription to strimzi_canary_8/2
[Sarama] 2022/05/03 11:55:51 producer/broker/2 starting up
[Sarama] 2022/05/03 11:55:51 producer/broker/2 state change to [open] on strimzi_canary_8/2
W0503 11:55:51.797701       1 producer.go:96] Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected.
I0503 11:55:51.797743       1 producer.go:86] Sending message: value={"producerId":"strimzi-canary-client","messageId":3,"timestamp":1651578951797} on partition=2
I0503 11:55:51.798174       1 consumer.go:203] Consumer group consumeclaim on strimzi_canary_8 [1]
[Sarama] 2022/05/03 11:55:51 consumer/broker/1 added subscription to strimzi_canary_8/1

Strimzi Canary deployment manifest:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: strimzi-canary-sty
  labels:
    app: strimzi-canary-sty
spec:
  replicas: 1
  selector:
    matchLabels:
      app: strimzi-canary-sty
  template:
    metadata:
      labels:
        app: strimzi-canary-sty
    spec:
      serviceAccountName: strimzi-canary
      containers:
      - name: strimzi-canary-sty
        image: docker-virtual.artifactory.sbtech.com/strimzi/canary:0.2.0
        ports:
        - containerPort: 8080
          name: metrics
        env:
          - name: KAFKA_BOOTSTRAP_SERVERS
            value: kafka-sty-kafka-bootstrap:9092
          - name: RECONCILE_INTERVAL_MS
            value: "10000"
          - name: TLS_ENABLED
            value: "false"
          - name: KAFKA_VERSION
            value: "2.8.0"
          - name: TOPIC
            value: "strimzi_canary_8"
          - name: SARAMA_LOG_ENABLED
            value: "true"
          - name: VERBOSITY_LOG_LEVEL
            value: "2"
        livenessProbe:
          httpGet:
            path: /liveness
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 30
        readinessProbe:
          httpGet:
            path: /readiness
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 30
        resources:
          limits:
            memory: "64Mi"
            cpu: "100m"
          requests:
            memory: "64Mi"
            cpu: "100m"
  strategy:
    type: Recreate

Kafka deployment manifest (using Strimzi Operator):

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: kafka-sty
spec:
  kafka:
    version: 2.8.0
    replicas: 3
    resources:
      requests:
        cpu: 0.5
        memory: 1Gi
      limits:
        memory: 1Gi
    jvmOptions:
      -Xms: 512m
      -Xmx: 512m
    listeners:
    - name: plain
      port: 9092
      type: internal
      tls: false
    - name: external
      type: nodeport
      configuration:
        preferredNodePortAddressType: InternalIP
      port: 9094
      tls: false
    config:
      num.partitions: 1
      num.recovery.threads.per.data.dir: 1
      log.retention.hours: 24
      log.retention.bytes: 80530636800
      log.segment.bytes: 4194304
      log.segment.ms: 60000
      default.replication.factor: 3
      log.cleaner.min.cleanable.ratio: 0.01
      log.cleanup.policy: compact,delete
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      log.message.format.version: 2.8
      inter.broker.protocol.version: 2.8
      auto.create.topics.enable: false
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: false
      class: ssd
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics-config
          key: kafka-metrics-config.yaml
    template:
      pod:
        terminationGracePeriodSeconds: 300
  zookeeper:
    replicas: 3
    resources:
      requests:
        cpu: 0.2
        memory: 1Gi
      limits:
        memory: 1Gi
    jvmOptions:
      -Xms: 512m
      -Xmx: 512m
    storage:
      type: persistent-claim
      size: 1Gi
      deleteClaim: false
      class: ssd
    metricsConfig:
      type: jmxPrometheusExporter
      valueFrom:
        configMapKeyRef:
          name: kafka-metrics-config
          key: zookeeper-metrics-config.yml

Workaround is available to set the topic configuration through env variable in the Strimzi Canary deployment:

        env:
          - name: TOPIC_CONFIG
            value: "cleanup.policy=delete"
@ppatierno
Copy link
Member

Thanks for reporting this ...

The Error sending message: kafka server: This record has failed the validation on broker and hence will be rejected. should be related to the fact that the canary sends messages without a key, because it sends to specific partitions directly. When a topic is configured having "compact" policy, not having a key doesn't make sense so the broker validates it and fails in this case.
There is the workaround you mentioned but I think it's better for the canary to enforce the topic creation with cleanup.policy=delete internally.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants