-
-
Notifications
You must be signed in to change notification settings - Fork 444
/
milvus.go
132 lines (110 loc) · 3.93 KB
/
milvus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package milvus
import (
"bytes"
"context"
_ "embed"
"fmt"
"html/template"
"os"
"time"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
)
//go:embed mounts/embedEtcd.yaml.tpl
var embedEtcdConfigTpl string
const embedEtcdContainerPath string = "/milvus/configs/embedEtcd.yaml"
// MilvusContainer represents the Milvus container type used in the module
type MilvusContainer struct {
testcontainers.Container
}
// ConnectionString returns the connection string for the milvus container, using the default 19530 port, and
// obtaining the host and exposed port from the container.
func (c *MilvusContainer) ConnectionString(ctx context.Context) (string, error) {
host, err := c.Host(ctx)
if err != nil {
return "", err
}
port, err := c.MappedPort(ctx, "19530/tcp")
if err != nil {
return "", err
}
return fmt.Sprintf("%s:%s", host, port.Port()), nil
}
// RunContainer creates an instance of the Milvus container type
func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*MilvusContainer, error) {
req := testcontainers.ContainerRequest{
Image: "milvusdb/milvus:v2.3.9",
ExposedPorts: []string{"19530/tcp", "9091/tcp", "2379/tcp"},
Env: map[string]string{
"ETCD_USE_EMBED": "true",
"ETCD_DATA_DIR": "/var/lib/milvus/etcd",
"ETCD_CONFIG_PATH": embedEtcdContainerPath,
"COMMON_STORAGETYPE": "local",
},
Cmd: []string{"milvus", "run", "standalone"},
WaitingFor: wait.ForHTTP("/healthz").WithPort("9091").WithStartupTimeout(60 * time.Second).WithPollInterval(30 * time.Second),
LifecycleHooks: []testcontainers.ContainerLifecycleHooks{
{
PostCreates: []testcontainers.ContainerHook{
// Copy the default embed etcd config to container after it's created.
// Otherwise the milvus container will panic on startup.
createDefaultEmbedEtcdConfig,
},
},
},
}
genericContainerReq := testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
}
for _, opt := range opts {
if err := opt.Customize(&genericContainerReq); err != nil {
return nil, err
}
}
container, err := testcontainers.GenericContainer(ctx, genericContainerReq)
if err != nil {
return nil, err
}
return &MilvusContainer{Container: container}, nil
}
type embedEtcdConfigTplParams struct {
Port int
}
func renderEmbedEtcdConfig(port int) ([]byte, error) {
tplParams := embedEtcdConfigTplParams{
Port: port,
}
etcdCfgTpl, err := template.New("embedEtcd.yaml").Parse(embedEtcdConfigTpl)
if err != nil {
return nil, fmt.Errorf("failed to parse embed etcd config file template: %w", err)
}
var embedEtcdYaml bytes.Buffer
if err := etcdCfgTpl.Execute(&embedEtcdYaml, tplParams); err != nil {
return nil, fmt.Errorf("failed to render embed etcd config template: %w", err)
}
return embedEtcdYaml.Bytes(), nil
}
// createDefaultEmbedEtcdConfig creates a default embed etcd config file,
// using the default port 2379 as the advertised port. The file is then copied to the container.
func createDefaultEmbedEtcdConfig(ctx context.Context, c testcontainers.Container) error {
// Otherwise the milvus container will panic on startup.
defaultEmbedEtcdConfig, err := renderEmbedEtcdConfig(2379)
if err != nil {
return fmt.Errorf("failed to render default config: %w", err)
}
tmpDir := os.TempDir()
defaultEmbedEtcdConfigPath := fmt.Sprintf("%s/embedEtcd.yaml", tmpDir)
if err := os.WriteFile(defaultEmbedEtcdConfigPath, defaultEmbedEtcdConfig, 0o644); err != nil {
return fmt.Errorf("failed to write default embed etcd config to a temporary dir: %w", err)
}
if err != nil {
return fmt.Errorf("can't create default embed etcd config: %w", err)
}
defer os.Remove(defaultEmbedEtcdConfigPath)
err = c.CopyFileToContainer(ctx, defaultEmbedEtcdConfigPath, embedEtcdContainerPath, 0o644)
if err != nil {
return fmt.Errorf("can't copy %s to container: %w", defaultEmbedEtcdConfigPath, err)
}
return nil
}