/
create.go
98 lines (83 loc) · 2.15 KB
/
create.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
package runtime
import (
"context"
"fmt"
"os"
"time"
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
"github.com/moby/moby/pkg/jsonmessage"
"github.com/moby/term"
"github.com/phayes/freeport"
"github.com/tensorchord/openmodelz/agent/api/types"
)
func (r *Runtime) InferenceCreate(ctx context.Context, req types.InferenceDeployment) error {
cfg := &container.Config{
Image: req.Spec.Image,
ExposedPorts: nat.PortSet{},
}
var port int32 = 8080
if req.Spec.Port != nil {
port = *req.Spec.Port
}
now := time.Now()
req.Status = types.InferenceDeploymentStatus{
Phase: types.PhaseNotReady,
Replicas: 1,
CreatedAt: &now,
}
// Lock the mutex and set cache
r.mutex.Lock()
r.cache[req.Spec.Name] = req
r.mutex.Unlock()
go func() error {
body, err := r.cli.ImagePull(context.TODO(), req.Spec.Image, dockertypes.ImagePullOptions{})
if err != nil {
return err
}
defer body.Close()
termFd, isTerm := term.GetFdInfo(os.Stdout)
err = jsonmessage.DisplayJSONMessagesStream(body, os.Stdout, termFd, isTerm, nil)
if err != nil {
return err
}
hostCfg := &container.HostConfig{
RestartPolicy: container.RestartPolicy{
Name: "always",
},
PortBindings: nat.PortMap{},
}
natPort := nat.Port(fmt.Sprintf("%d/tcp", port))
hostPort, err := freeport.GetFreePort()
if err != nil {
return err
}
hostCfg.PortBindings[natPort] = []nat.PortBinding{
{
HostIP: Localhost,
HostPort: fmt.Sprintf("%d", hostPort),
},
}
cfg.ExposedPorts[natPort] = struct{}{}
cfg.Labels = expectedLabels(req)
ctr, err := r.cli.ContainerCreate(context.TODO(), cfg, hostCfg, nil, nil, req.Spec.Name)
if err != nil {
return err
}
if err := r.cli.ContainerStart(context.TODO(), ctr.ID, dockertypes.ContainerStartOptions{}); err != nil {
return err
}
r.mutex.Lock()
new := r.cache[req.Spec.Name]
new.Status = types.InferenceDeploymentStatus{
Phase: types.PhaseReady,
Replicas: 1,
AvailableReplicas: 1,
CreatedAt: &now,
}
r.mutex.Unlock()
return nil
}()
return nil
}