diff --git a/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go b/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go index 77998989872c2..e7f461d43a1f9 100644 --- a/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go +++ b/plugins/inputs/elasticsearch_query/elasticsearch_query_test.go @@ -505,7 +505,7 @@ var testEsAggregationData = []esAggregationQueryTest{ }, } -func setupIntegrationTest(t *testing.T) (testutil.Container, error) { +func setupIntegrationTest(t *testing.T) (*testutil.Container, error) { type nginxlog struct { IPaddress string `json:"IP"` Timestamp time.Time `json:"@timestamp"` @@ -542,7 +542,7 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) { err = e.connectToES() if err != nil { - return container, err + return &container, err } bulkRequest := e.esClient.Bulk() @@ -550,7 +550,7 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) { // populate elasticsearch with nginx_logs test data file file, err := os.Open("testdata/nginx_logs") if err != nil { - return container, err + return &container, err } defer file.Close() @@ -579,22 +579,22 @@ func setupIntegrationTest(t *testing.T) (testutil.Container, error) { Doc(logline)) } if scanner.Err() != nil { - return container, err + return &container, err } _, err = bulkRequest.Do(context.Background()) if err != nil { - return container, err + return &container, err } // force elastic to refresh indexes to get new batch data ctx := context.Background() _, err = e.esClient.Refresh().Do(ctx) if err != nil { - return container, err + return &container, err } - return container, nil + return &container, nil } func TestElasticsearchQuery(t *testing.T) { diff --git a/plugins/inputs/mysql/mysql_test.go b/plugins/inputs/mysql/mysql_test.go index 6e0b0b0af6fe7..c79c4b672d374 100644 --- a/plugins/inputs/mysql/mysql_test.go +++ b/plugins/inputs/mysql/mysql_test.go @@ -25,7 +25,10 @@ func TestMysqlDefaultsToLocalIntegration(t *testing.T) { "MYSQL_ALLOW_EMPTY_PASSWORD": "yes", }, ExposedPorts: []string{servicePort}, - WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), + WaitingFor: wait.ForAll( + wait.ForLog("/usr/sbin/mysqld: ready for connections"), + wait.ForListeningPort(nat.Port(servicePort)), + ), } err := container.Start() @@ -59,7 +62,10 @@ func TestMysqlMultipleInstancesIntegration(t *testing.T) { "MYSQL_ALLOW_EMPTY_PASSWORD": "yes", }, ExposedPorts: []string{servicePort}, - WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), + WaitingFor: wait.ForAll( + wait.ForLog("/usr/sbin/mysqld: ready for connections"), + wait.ForListeningPort(nat.Port(servicePort)), + ), } err := container.Start() diff --git a/plugins/inputs/openldap/openldap_test.go b/plugins/inputs/openldap/openldap_test.go index 2eafac5308017..c821fe9a5b2a6 100644 --- a/plugins/inputs/openldap/openldap_test.go +++ b/plugins/inputs/openldap/openldap_test.go @@ -71,7 +71,7 @@ func TestOpenldapGeneratesMetricsIntegration(t *testing.T) { "LDAP_ADMIN_PASSWORD": "secret", }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePort)), ), } @@ -128,7 +128,7 @@ func TestOpenldapStartTLSIntegration(t *testing.T) { "/server.key": tlsKey, }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePort)), ), } @@ -191,7 +191,7 @@ func TestOpenldapLDAPSIntegration(t *testing.T) { "/server.key": tlsKey, }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePortSecure)), ), } @@ -249,7 +249,7 @@ func TestOpenldapInvalidSSLIntegration(t *testing.T) { "/server.key": tlsKey, }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePortSecure)), ), } @@ -289,7 +289,7 @@ func TestOpenldapBindIntegration(t *testing.T) { "LDAP_ADMIN_PASSWORD": "secret", }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePort)), ), } @@ -341,7 +341,7 @@ func TestOpenldapReverseMetricsIntegration(t *testing.T) { "LDAP_ADMIN_PASSWORD": "secret", }, WaitingFor: wait.ForAll( - wait.ForLog("Starting slapd"), + wait.ForLog("slapd starting"), wait.ForListeningPort(nat.Port(servicePort)), ), } diff --git a/plugins/inputs/pgbouncer/pgbouncer_test.go b/plugins/inputs/pgbouncer/pgbouncer_test.go index 13c9e3a9d7c17..8d6d344f3a24c 100644 --- a/plugins/inputs/pgbouncer/pgbouncer_test.go +++ b/plugins/inputs/pgbouncer/pgbouncer_test.go @@ -41,7 +41,10 @@ func TestPgBouncerGeneratesMetricsIntegration(t *testing.T) { "PG_ENV_POSTGRESQL_USER": "pgbouncer", "PG_ENV_POSTGRESQL_PASS": "pgbouncer", }, - WaitingFor: wait.ForListeningPort(nat.Port(pgBouncerServicePort)), + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(pgBouncerServicePort)), + wait.ForLog("LOG process up"), + ), } err = container.Start() require.NoError(t, err, "failed to start container") diff --git a/plugins/inputs/sql/sql_test.go b/plugins/inputs/sql/sql_test.go index 21261e7bb8944..22796159fc3e6 100644 --- a/plugins/inputs/sql/sql_test.go +++ b/plugins/inputs/sql/sql_test.go @@ -1,7 +1,6 @@ package sql import ( - "context" "fmt" "testing" "time" @@ -9,8 +8,8 @@ import ( "math/rand" "path/filepath" + "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" @@ -37,50 +36,35 @@ func TestMariaDB(t *testing.T) { logger := testutil.Logger{} - addr := "127.0.0.1" port := "3306" - passwd := "" + passwd := pwgen(32) database := "foo" - logger.Infof("Spinning up container...") - - // Generate a random password - passwd = pwgen(32) - // Determine the test-data mountpoint testdata, err := filepath.Abs("testdata/mariadb") require.NoError(t, err, "determining absolute path of test-data failed") - // Spin-up the container - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "mariadb", - Env: map[string]string{ - "MYSQL_ROOT_PASSWORD": passwd, - "MYSQL_DATABASE": database, - }, - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": testdata, - }, - ExposedPorts: []string{"3306/tcp"}, - WaitingFor: wait.ForListeningPort("3306/tcp"), + container := testutil.Container{ + Image: "mariadb", + ExposedPorts: []string{port}, + Env: map[string]string{ + "MYSQL_ROOT_PASSWORD": passwd, + "MYSQL_DATABASE": database, + }, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": testdata, }, - Started: true, + WaitingFor: wait.ForAll( + wait.ForLog("Buffer pool(s) load completed at"), + wait.ForListeningPort(nat.Port(port)), + ), } - container, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, container.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - addr, err = container.Host(ctx) - require.NoError(t, err, "getting container host address failed") - p, err := container.MappedPort(ctx, "3306/tcp") - require.NoError(t, err, "getting container host port failed") - port = p.Port() - // Define the testset var testset = []struct { name string @@ -119,8 +103,13 @@ func TestMariaDB(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test plugin := &SQL{ - Driver: "maria", - Dsn: fmt.Sprintf("root:%s@tcp(%s:%s)/%s", passwd, addr, port, database), + Driver: "maria", + Dsn: fmt.Sprintf("root:%s@tcp(%s:%s)/%s", + passwd, + container.Address, + container.Ports[port], + database, + ), Queries: tt.queries, Log: logger, } @@ -154,50 +143,35 @@ func TestPostgreSQL(t *testing.T) { logger := testutil.Logger{} - addr := "127.0.0.1" port := "5432" - passwd := "" + passwd := pwgen(32) database := "foo" - logger.Infof("Spinning up container...") - - // Generate a random password - passwd = pwgen(32) - // Determine the test-data mountpoint testdata, err := filepath.Abs("testdata/postgres") require.NoError(t, err, "determining absolute path of test-data failed") - // Spin-up the container - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "postgres", - Env: map[string]string{ - "POSTGRES_PASSWORD": passwd, - "POSTGRES_DB": database, - }, - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": testdata, - }, - ExposedPorts: []string{"5432/tcp"}, - WaitingFor: wait.ForListeningPort("5432/tcp"), + container := testutil.Container{ + Image: "postgres", + ExposedPorts: []string{port}, + Env: map[string]string{ + "POSTGRES_PASSWORD": passwd, + "POSTGRES_DB": database, + }, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": testdata, }, - Started: true, + WaitingFor: wait.ForAll( + wait.ForLog("database system is ready to accept connections"), + wait.ForListeningPort(nat.Port(port)), + ), } - container, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, container.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - addr, err = container.Host(ctx) - require.NoError(t, err, "getting container host address failed") - p, err := container.MappedPort(ctx, "5432/tcp") - require.NoError(t, err, "getting container host port failed") - port = p.Port() - // Define the testset var testset = []struct { name string @@ -236,8 +210,13 @@ func TestPostgreSQL(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test plugin := &SQL{ - Driver: "pgx", - Dsn: fmt.Sprintf("postgres://postgres:%v@%v:%v/%v", passwd, addr, port, database), + Driver: "pgx", + Dsn: fmt.Sprintf("postgres://postgres:%v@%v:%v/%v", + passwd, + container.Address, + container.Ports[port], + database, + ), Queries: tt.queries, Log: logger, } @@ -271,42 +250,31 @@ func TestClickHouse(t *testing.T) { logger := testutil.Logger{} - addr := "127.0.0.1" port := "9000" user := "default" - logger.Infof("Spinning up container...") - // Determine the test-data mountpoint testdata, err := filepath.Abs("testdata/clickhouse") require.NoError(t, err, "determining absolute path of test-data failed") - // Spin-up the container - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "yandex/clickhouse-server", - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": testdata, - }, - ExposedPorts: []string{"9000/tcp", "8123/tcp"}, - WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"), + container := testutil.Container{ + Image: "yandex/clickhouse-server", + ExposedPorts: []string{port, "8123"}, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": testdata, }, - Started: true, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")), + wait.ForListeningPort(nat.Port(port)), + wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'"), + ), } - container, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, container.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - addr, err = container.Host(ctx) - require.NoError(t, err, "getting container host address failed") - p, err := container.MappedPort(ctx, "9000/tcp") - require.NoError(t, err, "getting container host port failed") - port = p.Port() - // Define the testset var testset = []struct { name string @@ -345,8 +313,12 @@ func TestClickHouse(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // Setup the plugin-under-test plugin := &SQL{ - Driver: "clickhouse", - Dsn: fmt.Sprintf("tcp://%v:%v?username=%v", addr, port, user), + Driver: "clickhouse", + Dsn: fmt.Sprintf("tcp://%v:%v?username=%v", + container.Address, + container.Ports[port], + user, + ), Queries: tt.queries, Log: logger, } diff --git a/plugins/inputs/zookeeper/zookeeper_test.go b/plugins/inputs/zookeeper/zookeeper_test.go index 809aa8f2ea045..994735358eaa2 100644 --- a/plugins/inputs/zookeeper/zookeeper_test.go +++ b/plugins/inputs/zookeeper/zookeeper_test.go @@ -23,7 +23,10 @@ func TestZookeeperGeneratesMetricsIntegration(t *testing.T) { Env: map[string]string{ "ZOO_4LW_COMMANDS_WHITELIST": "mntr", }, - WaitingFor: wait.ForListeningPort(nat.Port(servicePort)), + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("ZooKeeper audit is disabled."), + ), } err := container.Start() require.NoError(t, err, "failed to start container") diff --git a/plugins/outputs/mongodb/mongodb_test.go b/plugins/outputs/mongodb/mongodb_test.go index baba8ab6f6a7f..190ed432d8293 100644 --- a/plugins/outputs/mongodb/mongodb_test.go +++ b/plugins/outputs/mongodb/mongodb_test.go @@ -1,17 +1,16 @@ package mongodb import ( - "context" "fmt" "path/filepath" "testing" "time" + "github.com/docker/go-connections/nat" "github.com/influxdata/telegraf/config" "github.com/influxdata/telegraf/plugins/common/tls" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) @@ -20,34 +19,27 @@ func TestConnectAndWriteIntegrationNoAuth(t *testing.T) { t.Skip("Skipping integration test in short mode") } - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "mongo", - ExposedPorts: []string{"27017/tcp"}, - WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"), - }, - Started: true, + servicePort := "27017" + container := testutil.Container{ + Image: "mongo", + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)), + wait.ForLog("Waiting for connections"), + ), } - - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err := container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, container.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - host, err := container.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - - natPort, err := container.MappedPort(ctx, "27017/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - // Run test plugin := &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, + container.Ports[servicePort], + ), AuthenticationType: "NONE", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -68,34 +60,24 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) { initdb, err := filepath.Abs("testdata/auth_scram") require.NoError(t, err) - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "mongo", - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": initdb, - }, - ExposedPorts: []string{"27017/tcp"}, - WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"), + servicePort := "27017" + container := testutil.Container{ + Image: "mongo", + ExposedPorts: []string{servicePort}, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": initdb, }, - Started: true, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)), + wait.ForLog("Waiting for connections"), + ), } - - ctx := context.Background() - container, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, container.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - host, err := container.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - - natPort, err := container.MappedPort(ctx, "27017/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - tests := []struct { name string plugin *MongoDB @@ -104,7 +86,8 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) { { name: "success with scram authentication", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s/admin", port), + Dsn: fmt.Sprintf("mongodb://%s:%s/admin", + container.Address, container.Ports[servicePort]), AuthenticationType: "SCRAM", Username: "root", Password: "changeme", @@ -118,7 +101,8 @@ func TestConnectAndWriteIntegrationSCRAMAuth(t *testing.T) { { name: "fail with scram authentication bad password", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s/admin", port), + Dsn: fmt.Sprintf("mongodb://%s:%s/admin", + container.Address, container.Ports[servicePort]), AuthenticationType: "SCRAM", Username: "root", Password: "root", @@ -172,43 +156,33 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { serverpem, err := filepath.Abs(pki.ServerCertAndKeyPath()) require.NoError(t, err) - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "mongo", - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": initdb, - "/cacert.pem": cacert, - "/server.pem": serverpem, - }, - ExposedPorts: []string{"27017/tcp"}, - Entrypoint: []string{ - "docker-entrypoint.sh", - "--auth", "--setParameter", "authenticationMechanisms=MONGODB-X509", - "--tlsMode", "preferTLS", - "--tlsCAFile", "/cacert.pem", - "--tlsCertificateKeyFile", "/server.pem", - }, - WaitingFor: wait.NewHTTPStrategy("/").WithPort("27017"), + servicePort := "27017" + container := testutil.Container{ + Image: "mongo", + ExposedPorts: []string{servicePort}, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": initdb, + "/cacert.pem": cacert, + "/server.pem": serverpem, + }, + Entrypoint: []string{ + "docker-entrypoint.sh", + "--auth", "--setParameter", "authenticationMechanisms=MONGODB-X509", + "--tlsMode", "preferTLS", + "--tlsCAFile", "/cacert.pem", + "--tlsCertificateKeyFile", "/server.pem", }, - Started: true, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port(servicePort)), + wait.ForLog("Waiting for connections"), + ), } - - ctx := context.Background() - cont, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, cont.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - host, err := cont.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - - natPort, err := cont.MappedPort(ctx, "27017/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - tests := []struct { name string plugin *MongoDB @@ -217,7 +191,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "success with x509 authentication", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -236,7 +211,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "success with x509 authentication using encrypted key file", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -256,7 +232,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "success with x509 authentication missing ca and using insceure tls", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -274,7 +251,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "fail with x509 authentication missing ca", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -292,7 +270,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "fail with x509 authentication using encrypted key file", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -312,7 +291,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "fail with x509 authentication using invalid ca", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", @@ -331,7 +311,8 @@ func TestConnectAndWriteIntegrationX509Auth(t *testing.T) { { name: "fail with x509 authentication using invalid key", plugin: &MongoDB{ - Dsn: fmt.Sprintf("mongodb://localhost:%s", port), + Dsn: fmt.Sprintf("mongodb://%s:%s", + container.Address, container.Ports[servicePort]), AuthenticationType: "X509", MetricDatabase: "telegraf_test", MetricGranularity: "seconds", diff --git a/plugins/outputs/sql/sql_test.go b/plugins/outputs/sql/sql_test.go index 54b92ad3bdd6b..b54826ab97246 100644 --- a/plugins/outputs/sql/sql_test.go +++ b/plugins/outputs/sql/sql_test.go @@ -1,7 +1,6 @@ package sql import ( - "context" "fmt" "math/rand" "os" @@ -9,11 +8,11 @@ import ( "testing" "time" + "github.com/docker/go-connections/nat" "github.com/influxdata/telegraf" "github.com/influxdata/telegraf/metric" "github.com/influxdata/telegraf/testutil" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" ) @@ -171,40 +170,31 @@ func TestMysqlIntegration(t *testing.T) { password := pwgen(32) outDir := t.TempDir() - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "mariadb", - Env: map[string]string{ - "MARIADB_ROOT_PASSWORD": password, - }, - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": initdb, - "/out": outDir, - }, - ExposedPorts: []string{"3306/tcp"}, - WaitingFor: wait.ForListeningPort("3306/tcp"), + servicePort := "3306" + container := testutil.Container{ + Image: "mariadb", + Env: map[string]string{ + "MARIADB_ROOT_PASSWORD": password, }, - Started: true, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": initdb, + "/out": outDir, + }, + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Buffer pool(s) load completed at"), + ), } - mariadbContainer, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, mariadbContainer.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - host, err := mariadbContainer.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - natPort, err := mariadbContainer.MappedPort(ctx, "3306/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - //use the plugin to write to the database address := fmt.Sprintf("%v:%v@tcp(%v:%v)/%v", - username, password, host, port, dbname, + username, password, container.Address, container.Ports[servicePort], dbname, ) p := newSQL() p.Log = testutil.Logger{} @@ -220,7 +210,7 @@ func TestMysqlIntegration(t *testing.T) { //dump the database var rc int - rc, err = mariadbContainer.Exec(ctx, []string{ + rc, err = container.Exec([]string{ "bash", "-c", "mariadb-dump --user=" + username + @@ -259,41 +249,32 @@ func TestPostgresIntegration(t *testing.T) { password := pwgen(32) outDir := t.TempDir() - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "postgres", - Env: map[string]string{ - "POSTGRES_PASSWORD": password, - }, - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": initdb, - "/out": outDir, - }, - ExposedPorts: []string{"5432/tcp"}, - WaitingFor: wait.ForListeningPort("5432/tcp"), + servicePort := "5432" + container := testutil.Container{ + Image: "postgres", + Env: map[string]string{ + "POSTGRES_PASSWORD": password, }, - Started: true, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": initdb, + "/out": outDir, + }, + ExposedPorts: []string{servicePort}, + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("database system is ready to accept connections"), + ), } - cont, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, cont.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - host, err := cont.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - natPort, err := cont.MappedPort(ctx, "5432/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - //use the plugin to write to the database // host, port, username, password, dbname address := fmt.Sprintf("postgres://%v:%v@%v:%v/%v", - username, password, host, port, dbname, + username, password, container.Address, container.Ports[servicePort], dbname, ) p := newSQL() p.Log = testutil.Logger{} @@ -311,7 +292,7 @@ func TestPostgresIntegration(t *testing.T) { //dump the database //psql -u postgres var rc int - rc, err = cont.Exec(ctx, []string{ + rc, err = container.Exec([]string{ "bash", "-c", "pg_dump" + @@ -359,37 +340,30 @@ func TestClickHouseIntegration(t *testing.T) { outDir := t.TempDir() - ctx := context.Background() - req := testcontainers.GenericContainerRequest{ - ContainerRequest: testcontainers.ContainerRequest{ - Image: "yandex/clickhouse-server", - BindMounts: map[string]string{ - "/docker-entrypoint-initdb.d": initdb, - "/out": outDir, - }, - ExposedPorts: []string{"9000/tcp", "8123/tcp"}, - WaitingFor: wait.NewHTTPStrategy("/").WithPort("8123/tcp"), + servicePort := "9000" + container := testutil.Container{ + Image: "yandex/clickhouse-server", + ExposedPorts: []string{servicePort, "8123"}, + BindMounts: map[string]string{ + "/docker-entrypoint-initdb.d": initdb, + "/out": outDir, }, - Started: true, + WaitingFor: wait.ForAll( + wait.NewHTTPStrategy("/").WithPort(nat.Port("8123")), + wait.ForListeningPort(nat.Port(servicePort)), + wait.ForLog("Saved preprocessed configuration to '/var/lib/clickhouse/preprocessed_configs/users.xml'"), + ), } - cont, err := testcontainers.GenericContainer(ctx, req) - require.NoError(t, err, "starting container failed") + err = container.Start() + require.NoError(t, err, "failed to start container") defer func() { - require.NoError(t, cont.Terminate(ctx), "terminating container failed") + require.NoError(t, container.Terminate(), "terminating container failed") }() - // Get the connection details from the container - host, err := cont.Host(ctx) - require.NoError(t, err, "getting container host address failed") - require.NotEmpty(t, host) - natPort, err := cont.MappedPort(ctx, "9000/tcp") - require.NoError(t, err, "getting container host port failed") - port := natPort.Port() - require.NotEmpty(t, port) - //use the plugin to write to the database // host, port, username, password, dbname - address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v", host, port, username, dbname) + address := fmt.Sprintf("tcp://%v:%v?username=%v&database=%v", + container.Address, container.Ports[servicePort], username, dbname) p := newSQL() p.Log = testutil.Logger{} p.Driver = "clickhouse" @@ -410,7 +384,7 @@ func TestClickHouseIntegration(t *testing.T) { // dump the database var rc int for _, testMetric := range testMetrics { - rc, err = cont.Exec(ctx, []string{ + rc, err = container.Exec([]string{ "bash", "-c", "clickhouse-client" + diff --git a/testutil/container.go b/testutil/container.go index 936b66a44fc6e..a84bf77041f6b 100644 --- a/testutil/container.go +++ b/testutil/container.go @@ -114,6 +114,10 @@ func (c *Container) LookupMappedPorts() error { return nil } +func (c *Container) Exec(cmds []string) (int, error) { + return c.container.Exec(c.ctx, cmds) +} + func (c *Container) PrintLogs() { fmt.Println("--- Container Logs Start ---") for _, msg := range c.Logs.Msgs { @@ -123,15 +127,17 @@ func (c *Container) PrintLogs() { } func (c *Container) Terminate() error { - err := c.container.Terminate(c.ctx) + err := c.container.StopLogProducer() + if err != nil { + fmt.Println(err) + } + + err = c.container.Terminate(c.ctx) if err != nil { fmt.Printf("failed to terminate the container: %s", err) } - // this needs to happen after the container is terminated otherwise there - // is a huge time penalty on the order of 50% increase in test time - _ = c.container.StopLogProducer() c.PrintLogs() - return err + return nil }