-
-
Notifications
You must be signed in to change notification settings - Fork 436
/
influxdb.go
155 lines (134 loc) · 4.7 KB
/
influxdb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package influxdb
import (
"context"
"fmt"
"path"
"strings"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
// defaultImage {
const defaultImage = "influxdb:1.8"
// }
// InfluxDbContainer represents the MySQL container type used in the module
type InfluxDbContainer struct {
testcontainers.Container
}
// RunContainer creates an instance of the InfluxDB container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*InfluxDbContainer, error) {
req := testcontainers.ContainerRequest{
Image: defaultImage,
ExposedPorts: []string{"8086/tcp", "8088/tcp"},
Env: map[string]string{
"INFLUXDB_BIND_ADDRESS": ":8088",
"INFLUXDB_HTTP_BIND_ADDRESS": ":8086",
"INFLUXDB_REPORTING_DISABLED": "true",
"INFLUXDB_MONITOR_STORE_ENABLED": "false",
"INFLUXDB_HTTP_HTTPS_ENABLED": "false",
"INFLUXDB_HTTP_AUTH_ENABLED": "false",
},
WaitingFor: wait.ForListeningPort("8086/tcp"),
}
genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}
for _, opt := range opts {
opt.Customize(&genericContainerReq)
}
hasInitDb := false
for _, f := range genericContainerReq.Files {
if f.ContainerFilePath == "/" && strings.HasSuffix(f.HostFilePath, "docker-entrypoint-initdb.d") {
// Init service in container will start influxdb, run scripts in docker-entrypoint-initdb.d and then
// terminate the influxdb server, followed by restart of influxdb. This is tricky to wait for, and
// in this case, we are assuming that data was added by init script, so we then look for an
// "Open shard" which is the last thing that happens before the server is ready to accept connections.
// This is probably different for InfluxDB 2.x, but that is left as an exercise for the reader.
strategies := []wait.Strategy{
genericContainerReq.WaitingFor,
wait.ForLog("influxdb init process in progress..."),
wait.ForLog("Server shutdown completed"),
wait.ForLog("Opened shard"),
}
genericContainerReq.WaitingFor = wait.ForAll(strategies...)
hasInitDb = true
break
}
}
if !hasInitDb {
if lastIndex := strings.LastIndex(genericContainerReq.Image, ":"); lastIndex != -1 {
tag := genericContainerReq.Image[lastIndex+1:]
if tag == "latest" || tag[0] == '2' {
genericContainerReq.WaitingFor = wait.ForLog(`Listening log_id=[0-9a-zA-Z_~]+ service=tcp-listener transport=http`).AsRegexp()
}
} else {
genericContainerReq.WaitingFor = wait.ForLog("Listening for signals")
}
}
container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
if err != nil {
return nil, err
}
return &InfluxDbContainer{container}, nil
}
func (c *InfluxDbContainer) MustConnectionUrl(ctx context.Context) string {
connectionString, err := c.ConnectionUrl(ctx)
if err != nil {
panic(err)
}
return connectionString
}
func (c *InfluxDbContainer) ConnectionUrl(ctx context.Context) (string, error) {
containerPort, err := c.MappedPort(ctx, "8086/tcp")
if err != nil {
return "", err
}
host, err := c.Host(ctx)
if err != nil {
return "", err
}
return fmt.Sprintf("http://%s:%s", host, containerPort.Port()), nil
}
func WithUsername(username string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["INFLUXDB_USER"] = username
return nil
}
}
func WithPassword(password string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["INFLUXDB_PASSWORD"] = password
return nil
}
}
func WithDatabase(database string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
req.Env["INFLUXDB_DATABASE"] = database
return nil
}
}
func WithConfigFile(configFile string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
cf := testcontainers.ContainerFile{
HostFilePath: configFile,
ContainerFilePath: "/etc/influxdb/influxdb.conf",
FileMode: 0o755,
}
req.Files = append(req.Files, cf)
return nil
}
}
// WithInitDb will copy a 'docker-entrypoint-initdb.d' directory to the container.
// The secPath is the path to the directory on the host machine.
// The directory will be copied to the root of the container.
func WithInitDb(srcPath string) testcontainers.CustomizeRequestOption {
return func(req *testcontainers.GenericContainerRequest) error {
cf := testcontainers.ContainerFile{
HostFilePath: path.Join(srcPath, "docker-entrypoint-initdb.d"),
ContainerFilePath: "/",
FileMode: 0o755,
}
req.Files = append(req.Files, cf)
return nil
}
}