Skip to content

Commit

Permalink
docker-proxy: support createContainer and stopSandbox hook (#236)
Browse files Browse the repository at this point in the history
Signed-off-by: Yue Zhang <huaihuan.zy@alibaba-inc.com>
  • Loading branch information
ZYecho committed Jun 15, 2022
1 parent c619738 commit cef1028
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 30 deletions.
8 changes: 7 additions & 1 deletion docs/design-archive/koord-runtime-proxy-design-doc.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,18 @@ build: `cd koordinator; make build-koord-runtime-proxy`
TODO

### Setup Kubelet
To make koord-runtime-proxy a proxy between kubelet and containerd(dockerd), kubelet parameters should be altered as shown
Under containerd scenario, to make koord-runtime-proxy a proxy between kubelet and containerd, kubelet parameters should be altered as shown
below:
```
kubelet <other options> --container-runtime=remote --container-runtime=/var/run/koord-runtimeproxy/runtimeproxy.sock
```

Under docker scenario, to make koord-runtime-proxy a proxy between kubelet and dockerd, kubelet parameters should be altered as shown
below:
```
kubelet <other options> --docker-endpoint=/var/run/koord-runtimeproxy/runtimeproxy.sock
```

### Setup KoordRuntimeProxy
Firstly, please make sure your runtime backend is containerd or dockerd.

Expand Down
51 changes: 29 additions & 22 deletions pkg/runtimeproxy/server/docker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,

// pre check
runtimeResourceType := GetRuntimeResourceType(ContainerConfig.Labels)
containerName := ""
if len(req.URL.Query()["name"]) >= 1 {
containerName = req.URL.Query()["name"][0]
}
containerName := req.URL.Query().Get("name")
tokens := strings.Split(containerName, "_")
if len(tokens) != 6 {
klog.Errorf("Failed to split k8s container name, containerName: %s", containerName)
Expand All @@ -60,7 +57,7 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,
labels, annos := splitLabelsAndAnnotations(ContainerConfig.Labels)
var podInfo *store.PodSandboxInfo
var containerInfo *store.ContainerInfo
runtimeHookPath := config.NoneRuntimeHookPath
var runtimeHookPath config.RuntimeRequestPath
var hookReq interface{}
if runtimeResourceType == resource_executor.RuntimeContainerResource {
podID := ContainerConfig.Labels[types.SandboxIDLabelKey]
Expand All @@ -70,7 +67,6 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,
http.Error(wr, "Failed to get pod info", http.StatusInternalServerError)
return
}
// TODO(ZYEcho): implement create container hook
containerInfo = &store.ContainerInfo{
ContainerResourceHookRequest: &v1alpha1.ContainerResourceHookRequest{
PodMeta: podInfo.PodMeta,
Expand All @@ -80,8 +76,13 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,
},
ContainerAnnotations: annos,
ContainerResources: HostConfigToResource(hostConfig),
PodAnnotations: podInfo.Annotations,
PodLabels: podInfo.Labels,
PodCgroupParent: podInfo.CgroupParent,
},
}
runtimeHookPath = config.CreateContainer
hookReq = containerInfo.GetContainerResourceHookRequest()
} else {
runtimeHookPath = config.RunPodSandbox
podInfo = &store.PodSandboxInfo{
Expand Down Expand Up @@ -117,16 +118,17 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,
if runtimeResourceType == resource_executor.RuntimePodResource && hookResp != nil {
resp := hookResp.(*v1alpha1.PodSandboxHookResponse)
if resp.Resources != nil {
cfgBody.HostConfig.CPUPeriod = resp.Resources.CpuPeriod
cfgBody.HostConfig.CPUQuota = resp.Resources.CpuQuota
cfgBody.HostConfig.CPUShares = resp.Resources.CpuShares
cfgBody.HostConfig.Memory = resp.Resources.MemoryLimitInBytes
cfgBody.HostConfig.OomScoreAdj = int(resp.Resources.OomScoreAdj)
cfgBody.HostConfig.CpusetCpus = resp.Resources.CpusetCpus
cfgBody.HostConfig.CpusetMems = resp.Resources.CpusetMems
cfgBody.HostConfig.MemorySwap = resp.Resources.MemorySwapLimitInBytes
cfgBody.HostConfig = UpdateHostConfigByResource(cfgBody.HostConfig, resp.Resources)
podInfo.Resources = resp.Resources
}
cfgBody.HostConfig.CgroupParent = resp.CgroupParent
} else if hookResp != nil {
resp := hookResp.(*v1alpha1.ContainerResourceHookResponse)
if resp.ContainerResources != nil {
cfgBody.HostConfig = UpdateHostConfigByResource(cfgBody.HostConfig, resp.ContainerResources)
containerInfo.ContainerResources = resp.ContainerResources
}
cfgBody.HostConfig.CgroupParent = resp.PodCgroupParent
}
// send req to docker
nBody, err := encodeBody(cfgBody)
Expand All @@ -152,6 +154,7 @@ func (d *RuntimeManagerDockerServer) HandleCreateContainer(ctx context.Context,
if runtimeResourceType == resource_executor.RuntimePodResource {
store.WritePodSandboxInfo(createResp.ID, podInfo)
} else {
containerInfo.ContainerMata.Id = createResp.ID
store.WriteContainerInfo(createResp.ID, containerInfo)
}
}
Expand Down Expand Up @@ -196,12 +199,22 @@ func (d *RuntimeManagerDockerServer) HandleStopContainer(ctx context.Context, wr
return
}

runtimeHookPath := config.NoneRuntimeHookPath
var runtimeHookPath config.RuntimeRequestPath
var hookReq interface{}
containerMeta := store.GetContainerInfo(containerID)
if containerMeta != nil {
runtimeHookPath = config.StopContainer
hookReq = containerMeta.GetContainerResourceHookRequest()
} else {
// sandbox container
runtimeHookPath = config.StopPodSandbox
podInfo := store.GetPodSandboxInfo(containerID)
if podInfo == nil {
// refuse the req
http.Error(wr, "Failed to get pod info", http.StatusInternalServerError)
return
}
hookReq = podInfo.GetPodSandboxHookRequest()
}

d.Direct(wr, req)
Expand Down Expand Up @@ -259,13 +272,7 @@ func (d *RuntimeManagerDockerServer) HandleUpdateContainer(ctx context.Context,
resp := response.(*v1alpha1.ContainerResourceHookResponse)
if resp.ContainerResources != nil {
containerMeta.ContainerResources = resp.ContainerResources
containerConfig.CPUPeriod = resp.ContainerResources.CpuPeriod
containerConfig.CPUQuota = resp.ContainerResources.CpuQuota
containerConfig.CPUShares = resp.ContainerResources.CpuShares
containerConfig.Memory = resp.ContainerResources.MemoryLimitInBytes
containerConfig.CpusetCpus = resp.ContainerResources.CpusetCpus
containerConfig.CpusetMems = resp.ContainerResources.CpusetMems
containerConfig.MemorySwap = resp.ContainerResources.MemorySwapLimitInBytes
containerConfig = UpdateUpdateConfigByResource(containerConfig, resp.ContainerResources)
store.WriteContainerInfo(containerID, containerMeta)
}
}
Expand Down
244 changes: 244 additions & 0 deletions pkg/runtimeproxy/server/docker/handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
/*
Copyright 2022 The Koordinator Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package docker

import (
"io"
"io/ioutil"
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"strings"
"testing"

"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/stretchr/testify/assert"

"github.com/koordinator-sh/koordinator/pkg/runtimeproxy/server/types"
)

func Test_CreateContainer(t *testing.T) {
// create docker reverse proxy
fakeDockerBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "create") {
w.WriteHeader(200)
w.Write([]byte("{\"Id\": \"containerdID\"}"))
}
}))
defer fakeDockerBackend.Close()
backendURL, err := url.Parse(fakeDockerBackend.URL)
if err != nil {
t.Fatal(err)
}
proxyHandler := httputil.NewSingleHostReverseProxy(backendURL)
proxyHandler.ErrorLog = log.New(io.Discard, "", 0) // quiet for tests
manager := NewRuntimeManagerDockerServer()
manager.reverseProxy = proxyHandler

frontend := httptest.NewServer(manager)
defer frontend.Close()
frontendClient := frontend.Client()
req, _ := http.NewRequest("POST", frontend.URL, nil)
cfg := types.ConfigWrapper{
Config: &container.Config{
Image: "ubuntu",
Labels: map[string]string{
types.ContainerTypeLabelKey: types.ContainerTypeLabelSandbox,
},
},
HostConfig: &container.HostConfig{
Resources: container.Resources{
Memory: 100,
CPUPeriod: 200,
},
},
NetworkingConfig: &network.NetworkingConfig{},
}
nBody, err := encodeBody(cfg)
if err != nil {
t.Fatal("failed to encode", err)
}
req.Body = ioutil.NopCloser(nBody)
nBody, _ = encodeBody(cfg)
newLength, _ := calculateContentLength(nBody)
req.ContentLength = newLength
req.URL.Path = "/v1.3/containers/create"
query := url.Values{}
query.Set("name", "xx_xx_xx_xx_xx_xx")
req.URL.RawQuery = query.Encode()
resp, err := frontendClient.Do(req)
assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 200)
}

func Test_StopContainer(t *testing.T) {
fakeDockerBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "stop") {
w.WriteHeader(200)
} else if strings.Contains(r.URL.Path, "create") {
w.WriteHeader(200)
w.Write([]byte("{\"Id\": \"containerdID\"}"))
}
}))
defer fakeDockerBackend.Close()
backendURL, err := url.Parse(fakeDockerBackend.URL)
if err != nil {
t.Fatal(err)
}
proxyHandler := httputil.NewSingleHostReverseProxy(backendURL)
proxyHandler.ErrorLog = log.New(io.Discard, "", 0) // quiet for tests
manager := NewRuntimeManagerDockerServer()
manager.reverseProxy = proxyHandler

frontend := httptest.NewServer(manager)
defer frontend.Close()
frontendClient := frontend.Client()

// create a sandbox first
req, _ := http.NewRequest("POST", frontend.URL, nil)
cfg := types.ConfigWrapper{
Config: &container.Config{
Image: "ubuntu",
Labels: map[string]string{
types.ContainerTypeLabelKey: types.ContainerTypeLabelSandbox,
},
},
HostConfig: &container.HostConfig{
Resources: container.Resources{
Memory: 100,
CPUPeriod: 200,
},
},
NetworkingConfig: &network.NetworkingConfig{},
}
nBody, err := encodeBody(cfg)
if err != nil {
t.Fatal("failed to encode", err)
}
req.Body = ioutil.NopCloser(nBody)
nBody, _ = encodeBody(cfg)
newLength, _ := calculateContentLength(nBody)
req.ContentLength = newLength
req.URL.Path = "/v1.3/containers/create"
query := url.Values{}
query.Set("name", "xx_xx_xx_xx_xx_xx")
req.URL.RawQuery = query.Encode()
resp, err := frontendClient.Do(req)

assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 200)

req, _ = http.NewRequest("POST", frontend.URL, nil)
req.URL.Path = "/v1.3/containers/containerdID/stop"
resp, err = frontendClient.Do(req)
assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 200)
}

func Test_StartContainer(t *testing.T) {
fakeDockerBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "start") {
w.WriteHeader(200)
} else if strings.Contains(r.URL.Path, "create") {
w.WriteHeader(200)
w.Write([]byte("{\"Id\": \"containerdID\"}"))
}
}))
defer fakeDockerBackend.Close()
backendURL, err := url.Parse(fakeDockerBackend.URL)
if err != nil {
t.Fatal(err)
}
proxyHandler := httputil.NewSingleHostReverseProxy(backendURL)
proxyHandler.ErrorLog = log.New(io.Discard, "", 0) // quiet for tests
manager := NewRuntimeManagerDockerServer()
manager.reverseProxy = proxyHandler

frontend := httptest.NewServer(manager)
defer frontend.Close()
frontendClient := frontend.Client()

// create a sandbox first
req, _ := http.NewRequest("POST", frontend.URL, nil)
cfg := types.ConfigWrapper{
Config: &container.Config{
Image: "ubuntu",
Labels: map[string]string{
types.ContainerTypeLabelKey: types.ContainerTypeLabelSandbox,
},
},
HostConfig: &container.HostConfig{
Resources: container.Resources{
Memory: 100,
CPUPeriod: 200,
},
},
NetworkingConfig: &network.NetworkingConfig{},
}
nBody, err := encodeBody(cfg)
if err != nil {
t.Fatal("failed to encode", err)
}
req.Body = ioutil.NopCloser(nBody)
nBody, _ = encodeBody(cfg)
newLength, _ := calculateContentLength(nBody)
req.ContentLength = newLength
req.URL.Path = "/v1.3/containers/create"
query := url.Values{}
query.Set("name", "xx_xx_xx_xx_xx_xx")
req.URL.RawQuery = query.Encode()
resp, err := frontendClient.Do(req)

assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 200)

req, _ = http.NewRequest("POST", frontend.URL, nil)
req.URL.Path = "/v1.3/containers/containerdID/start"
resp, err = frontendClient.Do(req)
assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 200)
}

func Test_StopContainerError(t *testing.T) {
fakeDockerBackend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "stop") {
w.WriteHeader(200)
}
}))
defer fakeDockerBackend.Close()
backendURL, err := url.Parse(fakeDockerBackend.URL)
if err != nil {
t.Fatal(err)
}
proxyHandler := httputil.NewSingleHostReverseProxy(backendURL)
proxyHandler.ErrorLog = log.New(io.Discard, "", 0) // quiet for tests
manager := NewRuntimeManagerDockerServer()
manager.reverseProxy = proxyHandler

frontend := httptest.NewServer(manager)
defer frontend.Close()
frontendClient := frontend.Client()

req, _ := http.NewRequest("POST", frontend.URL, nil)
req.URL.Path = "/v1.3/containers/xxxxxx/stop"
resp, err := frontendClient.Do(req)
assert.Equal(t, err, nil)
assert.Equal(t, resp.StatusCode, 500)
}
8 changes: 1 addition & 7 deletions pkg/runtimeproxy/server/docker/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ func (d *RuntimeManagerDockerServer) failOver() error {
ContainerAnnotations: c.Labels,
ContainerMata: &v1alpha1.ContainerMetadata{
Name: c.Name,
Id: c.ID,
},
PodResources: podCheckPoint.Resources,
},
Expand Down Expand Up @@ -194,10 +195,3 @@ func (d *RuntimeManagerDockerServer) Run() error {
}
return nil
}

func GetRuntimeResourceType(labels map[string]string) resource_executor.RuntimeResourceType {
if labels[types.ContainerTypeLabelKey] == types.ContainerTypeLabelSandbox {
return resource_executor.RuntimePodResource
}
return resource_executor.RuntimeContainerResource
}
Loading

0 comments on commit cef1028

Please sign in to comment.