/
containers.go
120 lines (97 loc) · 3.13 KB
/
containers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package containers
import (
"context"
"fmt"
"io"
"log"
"strconv"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
apiClient "github.com/kuskoman/logstash-metric-samples/internal/client"
"github.com/kuskoman/logstash-metric-samples/internal/manager"
)
var containerManager = manager.NewContainerManager()
const logstashRegistry = "docker.elastic.co/logstash/logstash"
func RemoveContainer(ctx context.Context, dockerClient *client.Client, containerID string) {
log.Printf("Removing container %s", containerID)
err := dockerClient.ContainerStop(ctx, containerID, container.StopOptions{})
handleRemoveContainerError(err, containerID)
err = dockerClient.ContainerRemove(ctx, containerID, types.ContainerRemoveOptions{})
handleRemoveContainerError(err, containerID)
}
func handleRemoveContainerError(err error, containerID string) {
if err == nil {
return
}
log.Printf("Error removing container: %s", err)
fmt.Printf("To remove container manually, run: docker rm %s", containerID)
}
func pullLogstashImage(ctx context.Context, dockerClient *client.Client, version string) (string, error) {
imageName := fmt.Sprintf("%s:%s", logstashRegistry, version)
log.Printf("Pulling image %s", imageName)
pullResp, err := dockerClient.ImagePull(ctx, imageName, types.ImagePullOptions{})
if err != nil {
return "", err
}
defer pullResp.Close()
if _, err := io.Copy(io.Discard, pullResp); err != nil {
return "", err
}
return imageName, nil
}
func createLogstashContainerConfig(imageName string, port int) (*container.Config, *container.HostConfig) {
containerConfig := &container.Config{
Image: imageName,
ExposedPorts: nat.PortSet{
"9600/tcp": struct{}{},
},
}
hostConfig := &container.HostConfig{
PortBindings: nat.PortMap{
"9600/tcp": []nat.PortBinding{
{HostIP: "0.0.0.0", HostPort: strconv.Itoa(port)},
},
},
}
return containerConfig, hostConfig
}
func SpawnLogstashContainer(ctx context.Context, dockerClient *client.Client, version string) (string, int, error) {
imageName, err := pullLogstashImage(ctx, dockerClient, version)
if err != nil {
return "", 0, err
}
port, err := containerManager.AssignFreePort()
if err != nil {
return "", 0, err
}
containerName := fmt.Sprintf("logstash-%s", version)
log.Printf("Creating container %s on port %d", containerName, port)
containerConfig, hostConfig := createLogstashContainerConfig(imageName, port)
resp, err := dockerClient.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, containerName)
if err != nil {
return "", 0, err
}
containerID := resp.ID
if err := dockerClient.ContainerStart(ctx, containerID, types.ContainerStartOptions{}); err != nil {
return "", 0, err
}
return containerID, port, nil
}
func WaitForContainerToBeReady(ctx context.Context, port int) error {
url := fmt.Sprintf("http://localhost:%d/_node/stats", port)
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
data, _ := apiClient.GetJSONFromAPI(ctx, url)
if data != nil {
return nil
}
time.Sleep(1 * time.Second)
}
}
}