diff --git a/.circleci/config.yml b/.circleci/config.yml index 329dec4f..a690d192 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -275,6 +275,53 @@ jobs: entrypoint: *entrypoint steps: *steps + kafka-360: + working_directory: *working_directory + environment: + KAFKA_VERSION: "3.6.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/zookeeper:latest + ports: + - 2181:2181 + environment: + ALLOW_ANONYMOUS_LOGIN: yes + - image: bitnami/kafka:3.6.0 + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_ADVERTISED_PORT: '9092' + KAFKA_CFG_ZOOKEEPER_CONNECT: localhost:2181 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + ALLOW_PLAINTEXT_LISTENER: yes + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh + steps: *steps + workflows: version: 2 run: @@ -292,3 +339,4 @@ workflows: - kafka-260 - kafka-270 - kafka-281 + - kafka-360 diff --git a/reader_test.go b/reader_test.go index f413d742..a8cfc484 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1559,8 +1559,8 @@ func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { } } -func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) +func TestConsumerGroupWithGroupTopicsMultiple(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() client, shutdown := newLocalClient() @@ -1587,10 +1587,9 @@ func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { } defer w.Close() - time.Sleep(time.Second) - msgs := make([]Message, 0, len(conf.GroupTopics)) for _, topic := range conf.GroupTopics { + waitForTopic(ctx, t, topic) msgs = append(msgs, Message{Topic: topic}) } if err := w.WriteMessages(ctx, msgs...); err != nil {