-
Notifications
You must be signed in to change notification settings - Fork 126
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
how can I create a Schema Registry test container and connect it to a Kafka test container? #182
Comments
All you need that is:
I don't pretend to the perfect usage of testcontainers (so if anyone can review it and notice my mistakes that will be great). I prefer to create a generic (in case Schema Registry, Zookeeper and etc) container and pass it to the custom wrapper with necessary methods. import org.testcontainers.containers.{KafkaContainer, Network}
import org.testcontainers.utility.DockerImageName
import scala.jdk.CollectionConverters._
class KafkaNodeContainer(container: KafkaContainer) {
def getInternalHost: String = container.getEnvMap.asScala("KAFKA_HOST_NAME")
def getBootstrap: String = container.getBootstrapServers
def close(): Unit = container.close()
}
object KafkaNodeContainer {
def make(network: Network, brokerId: Int = 1, withExternalZookeeper: Boolean = false): KafkaNodeContainer = {
val hostName = s"kafka$brokerId"
val container = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:6.1.1").asCompatibleSubstituteFor("confluentinc/cp-kafka")
)
.withNetwork(network)
.withNetworkAliases(hostName)
.withEnv(
Map[String, String](
"KAFKA_BROKER_ID" -> brokerId.toString,
"KAFKA_HOST_NAME" -> hostName,
"KAFKA_AUTO_CREATE_TOPICS_ENABLE" -> "false"
).asJava
)
if (withExternalZookeeper) container.withExternalZookeeper("zookeeper:2181")
container.start()
Thread.sleep(10000L)
new KafkaNodeContainer(container)
}
} Note: Thread.sleep(10000L) need for initialization #134 then, SchemaRegistryContainer import com.dimafeng.testcontainers.GenericContainer
import org.testcontainers.containers.Network
import scala.jdk.CollectionConverters._
class SchemaRegistryContainer(container: GenericContainer) {
private val schemaPort = 8081
def schemaUrl: SchemaUrl = SchemaUrl(s"http://${container.container.getHost}:${container.container.getMappedPort(schemaPort)}")
def close(): Unit = container.container.close()
}
object SchemaRegistryContainer {
def make(network: Network, kafkaHost: String): SchemaRegistryContainer = {
val container = new GenericContainer(dockerImage = "confluentinc/cp-schema-registry:6.1.1")
container.container.withNetwork(network)
container.container.setEnv(List(
s"SCHEMA_REGISTRY_HOST_NAME=${container.container.getHost}",
s"SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS=$kafkaHost:9092"
).asJava)
container.start()
Thread.sleep(3000L)
new SchemaRegistryContainer(container)
}
} and, finally, combine containers inside test val network: Network = Network.newNetwork()
val kfkNode: KafkaNodeContainer = KafkaNodeContainer.make(network)
val schNode: SchemaRegistryContainer = SchemaRegistryContainer.make(network, kfkNode.getInternalHost) You could check the full example in repo |
@mikhailchuryakov thanks for sharing! Would you like to contribute this as a PR? |
@dimafeng Yes, sure! But I need a little time. |
No description provided.
The text was updated successfully, but these errors were encountered: