Skip to content

Commit

Permalink
runtime: shim: dedup client, socket addr code
Browse files Browse the repository at this point in the history
(1) Add an accessor function, SocketAddress, to the shim-v2 code for
determining the shim's abstract domain socket address, given the sandbox
ID.

(2) In kata monitor, create a function, BuildShimClient, for obtaining the appropriate
http.Client for communicating with the shim's monitoring endpoint.

(3) Update the kata CLI and kata-monitor code to make use of these.

(4) Migrate some kata monitor methods to be functions, in order to ease
future reuse.

(5) drop unused namespace from functions where it is no longer needed.

Signed-off-by: Eric Ernst <eric_ernst@apple.com>
(cherry picked from commit 3caed6f)
Signed-off-by: Fabiano Fidêncio <fidencio@redhat.com>
  • Loading branch information
egernst authored and fidencio committed May 10, 2021
1 parent 7f7c794 commit d369095
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 43 deletions.
19 changes: 2 additions & 17 deletions src/runtime/cli/kata-exec.go
Expand Up @@ -14,7 +14,6 @@ import (
"net/http"
"net/url"
"os"
"path/filepath"
"strings"

"sync"
Expand All @@ -26,7 +25,6 @@ import (
clientUtils "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/agent/protocols/client"
"github.com/pkg/errors"
"github.com/urfave/cli"
"go.opentelemetry.io/otel/label"
)

const (
Expand All @@ -38,10 +36,8 @@ const (

subCommandName = "exec"
// command-line parameters name
paramRuntimeNamespace = "runtime-namespace"
paramDebugConsolePort = "kata-debug-port"
defaultKernelParamDebugConsoleVPortValue = 1026
defaultRuntimeNamespace = "k8s.io"
)

var (
Expand All @@ -57,21 +53,12 @@ var kataExecCLICommand = cli.Command{
Name: subCommandName,
Usage: "Enter into guest by debug console",
Flags: []cli.Flag{
cli.StringFlag{
Name: paramRuntimeNamespace,
Usage: "Namespace that containerd or CRI-O are using for containers. (Default: k8s.io, only works for containerd)",
},
cli.Uint64Flag{
Name: paramDebugConsolePort,
Usage: "Port that debug console is listening on. (Default: 1026)",
},
},
Action: func(context *cli.Context) error {
namespace := context.String(paramRuntimeNamespace)
if namespace == "" {
namespace = defaultRuntimeNamespace
}

port := context.Uint64(paramDebugConsolePort)
if port == 0 {
port = defaultKernelParamDebugConsoleVPortValue
Expand All @@ -83,7 +70,6 @@ var kataExecCLICommand = cli.Command{
return err
}

span.SetAttributes(label.Key("sandbox").String(sandboxID))
conn, err := getConn(sandboxID, port)

if err != nil {
Expand Down Expand Up @@ -169,8 +155,7 @@ func (s *iostream) Read(data []byte) (n int, err error) {
}

func getConn(sandboxID string, port uint64) (net.Conn, error) {
socketAddr := filepath.Join(string(filepath.Separator), "run", "vc", sandboxID, "shim-monitor")
client, err := kataMonitor.BuildUnixSocketClient(socketAddr, defaultTimeout)
client, err := kataMonitor.BuildShimClient(sandboxID, defaultTimeout)
if err != nil {
return nil, err
}
Expand All @@ -181,7 +166,7 @@ func getConn(sandboxID string, port uint64) (net.Conn, error) {
}

if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("Failed to get %s: %d", socketAddr, resp.StatusCode)
return nil, fmt.Errorf("Failure from %s shim-monitor: %d", sandboxID, resp.StatusCode)
}

defer resp.Body.Close()
Expand Down
12 changes: 5 additions & 7 deletions src/runtime/containerd-shim-v2/shim_management.go
Expand Up @@ -128,11 +128,7 @@ func decodeAgentMetrics(body string) []*dto.MetricFamily {

func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path
metricsAddress, err := socketAddress(ctx, s.id)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create socket address")
return
}
metricsAddress := SocketAddress(s.id)

listener, err := cdshim.NewSocket(metricsAddress)
if err != nil {
Expand Down Expand Up @@ -187,6 +183,8 @@ func (s *service) mountPprofHandle(m *http.ServeMux, ociSpec *specs.Spec) {
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}

func socketAddress(ctx context.Context, id string) (string, error) {
return filepath.Join(string(filepath.Separator), "run", "vc", id, "shim-monitor"), nil
// SocketAddress returns the address of the abstract domain socket for communicating with the
// shim management endpoint
func SocketAddress(id string) string {
return filepath.Join(string(filepath.Separator), "run", "vc", id, "shim-monitor")
}
6 changes: 3 additions & 3 deletions src/runtime/pkg/kata-monitor/metrics.go
Expand Up @@ -176,7 +176,7 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
for sandboxID, namespace := range sandboxes {
wg.Add(1)
go func(sandboxID, namespace string, results chan<- []*dto.MetricFamily) {
sandboxMetrics, err := km.getSandboxMetrics(sandboxID, namespace)
sandboxMetrics, err := getSandboxMetrics(sandboxID)
if err != nil {
monitorLog.WithError(err).WithField("sandbox_id", sandboxID).Errorf("failed to get metrics for sandbox")
}
Expand Down Expand Up @@ -234,8 +234,8 @@ func (km *KataMonitor) aggregateSandboxMetrics(encoder expfmt.Encoder) error {
}

// getSandboxMetrics will get sandbox's metrics from shim
func (km *KataMonitor) getSandboxMetrics(sandboxID, namespace string) ([]*dto.MetricFamily, error) {
body, err := km.doGet(sandboxID, namespace, defaultTimeout, "metrics")
func getSandboxMetrics(sandboxID string) ([]*dto.MetricFamily, error) {
body, err := doGet(sandboxID, defaultTimeout, "metrics")
if err != nil {
return nil, err
}
Expand Down
7 changes: 1 addition & 6 deletions src/runtime/pkg/kata-monitor/monitor.go
Expand Up @@ -87,13 +87,8 @@ func (km *KataMonitor) GetAgentURL(w http.ResponseWriter, r *http.Request) {
commonServeError(w, http.StatusBadRequest, err)
return
}
namespace, err := km.getSandboxNamespace(sandboxID)
if err != nil {
commonServeError(w, http.StatusBadRequest, err)
return
}

data, err := km.doGet(sandboxID, namespace, defaultTimeout, "agent-url")
data, err := doGet(sandboxID, defaultTimeout, "agent-url")
if err != nil {
commonServeError(w, http.StatusBadRequest, err)
return
Expand Down
19 changes: 9 additions & 10 deletions src/runtime/pkg/kata-monitor/shim_client.go
Expand Up @@ -11,6 +11,8 @@ import (
"net"
"net/http"
"time"

shim "github.com/kata-containers/kata-containers/src/runtime/containerd-shim-v2"
)

const (
Expand All @@ -33,16 +35,13 @@ func getSandboxIDFromReq(r *http.Request) (string, error) {
return "", fmt.Errorf("sandbox not found in %+v", r.URL.Query())
}

func (km *KataMonitor) buildShimClient(sandboxID, namespace string, timeout time.Duration) (*http.Client, error) {
socketAddr, err := km.getMonitorAddress(sandboxID, namespace)
if err != nil {
return nil, err
}
return BuildUnixSocketClient(socketAddr, timeout)
// BuildShimClient builds and returns an http client for communicating with the provided sandbox
func BuildShimClient(sandboxID string, timeout time.Duration) (*http.Client, error) {
return buildUnixSocketClient(shim.SocketAddress(sandboxID), timeout)
}

// BuildUnixSocketClient build http client for Unix socket
func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
// buildUnixSocketClient build http client for Unix socket
func buildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Client, error) {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(proto, addr string) (conn net.Conn, err error) {
Expand All @@ -61,8 +60,8 @@ func BuildUnixSocketClient(socketAddr string, timeout time.Duration) (*http.Clie
return client, nil
}

func (km *KataMonitor) doGet(sandboxID, namespace string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := km.buildShimClient(sandboxID, namespace, timeoutInSeconds)
func doGet(sandboxID string, timeoutInSeconds time.Duration, urlPath string) ([]byte, error) {
client, err := BuildShimClient(sandboxID, timeoutInSeconds)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d369095

Please sign in to comment.