Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sometimes KAFKA_CREATE_TOPICS doesn't create the correct number of partitions #661

Open
tneilturner opened this issue May 8, 2021 · 1 comment

Comments

@tneilturner
Copy link

tneilturner commented May 8, 2021

I'm scratching my head on this one. In my test cases I'm using a library to invoke the docker-compose utility and start kafka with the following settings:

 environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_HOST_NAME: localhost
      ADVERTISED_PORT: 9029
      KAFKA_CREATE_TOPICS: "product-catalog.content-updates:${PARTITIONS}:${REPLICAS}"

However, sometimes the broker doesn't create the correct # of topics. The only difference, as far as I can tell, is that it does it correctly when I set a breakpoint in my debugger and step through the execution. The code I use to invoke the docker compose is:

kafkaContainer := testcontainers.NewLocalDockerCompose(
		[]string{"../../docker/docker-compose-kafka.yml"},
		strings.ToLower(uuid.New().String()),
	)

	err := kafkaContainer.
		WithEnv(map[string]string{"PARTITIONS": "2", "REPLICAS": "1"}).
		WithCommand([]string{"up", "-d"}).
		Invoke()
	if err.Error != nil {
		b.Fatal(err)
	}

	// kafka takes some time to spin up
	time.Sleep(5 * time.Second)

It's literally the same code being executed in both cases, the only difference is that it works when I set the breakpoint, and it doesn't when I don't. I've included some logs from kafka below that show the topic partitions being created and not being created.

Successful kafka logs:

❯ docker logs kafka | grep product-catalog.content-updates
creating topics: product-catalog.content-updates:2:1
Created topic product-catalog.content-updates.
[2021-05-08 05:41:15,172] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(product-catalog.content-updates-1, product-catalog.content-updates-0) (kafka.server.ReplicaFetcherManager)
[2021-05-08 05:41:15,284] INFO [Log partition=product-catalog.content-updates-1, dir=/kafka/kafka-logs-e3fd3efe421e] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-05-08 05:41:15,297] INFO Created log for partition product-catalog.content-updates-1 in /kafka/kafka-logs-e3fd3efe421e/product-catalog.content-updates-1 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-05-08 05:41:15,299] INFO [Partition product-catalog.content-updates-1 broker=1001] No checkpointed highwatermark is found for partition product-catalog.content-updates-1 (kafka.cluster.Partition)
[2021-05-08 05:41:15,300] INFO [Partition product-catalog.content-updates-1 broker=1001] Log loaded for partition product-catalog.content-updates-1 with initial high watermark 0 (kafka.cluster.Partition)
[2021-05-08 05:41:15,323] INFO [Log partition=product-catalog.content-updates-0, dir=/kafka/kafka-logs-e3fd3efe421e] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-05-08 05:41:15,325] INFO Created log for partition product-catalog.content-updates-0 in /kafka/kafka-logs-e3fd3efe421e/product-catalog.content-updates-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-05-08 05:41:15,325] INFO [Partition product-catalog.content-updates-0 broker=1001] No checkpointed highwatermark is found for partition product-catalog.content-updates-0 (kafka.cluster.Partition)
[2021-05-08 05:41:15,325] INFO [Partition product-catalog.content-updates-0 broker=1001] Log loaded for partition product-catalog.content-updates-0 with initial high watermark 0 (kafka.cluster.Partition)

Unsuccessful kafka logs:

❯ docker logs kafka | grep product-catalog.content-updates
[2021-05-08 05:44:26,662] INFO Creating topic product-catalog.content-updates with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-05-08 05:44:26,662] INFO Creating topic product-catalog.content-updates with configuration {} and initial partition assignment HashMap(0 -> ArrayBuffer(1001)) (kafka.zk.AdminZkClient)
[2021-05-08 05:44:26,679] INFO [KafkaApi-1001] Auto creation of topic product-catalog.content-updates with 1 partitions and replication factor 1 is successful (kafka.server.KafkaApis)
[2021-05-08 05:44:26,799] INFO [ReplicaFetcherManager on broker 1001] Removed fetcher for partitions Set(product-catalog.content-updates-0) (kafka.server.ReplicaFetcherManager)
[2021-05-08 05:44:26,908] INFO [Log partition=product-catalog.content-updates-0, dir=/kafka/kafka-logs-6987259163b8] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2021-05-08 05:44:26,922] INFO Created log for partition product-catalog.content-updates-0 in /kafka/kafka-logs-6987259163b8/product-catalog.content-updates-0 with properties {compression.type -> producer, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.bytes -> 1073741824, retention.ms -> 604800000, flush.messages -> 9223372036854775807, message.format.version -> 2.7-IV2, file.delete.delay.ms -> 60000, max.compaction.lag.ms -> 9223372036854775807, max.message.bytes -> 1048588, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, segment.ms -> 604800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760}. (kafka.log.LogManager)
[2021-05-08 05:44:26,924] INFO [Partition product-catalog.content-updates-0 broker=1001] No checkpointed highwatermark is found for partition product-catalog.content-updates-0 (kafka.cluster.Partition)
[2021-05-08 05:44:26,925] INFO [Partition product-catalog.content-updates-0 broker=1001] Log loaded for partition product-catalog.content-updates-0 with initial high watermark 0 (kafka.cluster.Partition)
creating topics: product-catalog.content-updates:2:1
@jsirianni
Copy link

Also seeing this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants