Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pkg/cri: switch internals to CRI v1, add ingress/egress v1/v1alpha2 CRI conversion. #781

Merged
merged 9 commits into from Nov 23, 2022
303 changes: 285 additions & 18 deletions pkg/cri/client/client.go
Expand Up @@ -15,6 +15,7 @@
package client

import (
"context"
"fmt"
"net"
"os"
Expand All @@ -24,11 +25,14 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"

api "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
criv1 "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/intel/cri-resource-manager/pkg/instrumentation"
logger "github.com/intel/cri-resource-manager/pkg/log"
"github.com/intel/cri-resource-manager/pkg/utils"

v1 "github.com/intel/cri-resource-manager/pkg/cri/client/v1"
v1alpha2 "github.com/intel/cri-resource-manager/pkg/cri/client/v1alpha2"
)

// DialNotifyFn is a function to call after a successful net.Dial[Timeout]().
Expand Down Expand Up @@ -62,22 +66,27 @@ type Client interface {
CheckConnection(ConnectOptions) error
// HasRuntimeService checks if the client is configured with runtime services.
HasRuntimeService() bool
// HasImageService checks if the client is configured with image services.
HasImageService() bool

// We expose full image and runtime client services.
api.ImageServiceClient
api.RuntimeServiceClient
criv1.ImageServiceClient
criv1.RuntimeServiceClient
}

type criClient interface {
criv1.ImageServiceClient
criv1.RuntimeServiceClient
}

// client is the implementation of Client.
type client struct {
logger.Logger
api.ImageServiceClient
api.RuntimeServiceClient
criv1.ImageServiceClient
criv1.RuntimeServiceClient
options Options // client options
icc *grpc.ClientConn // our gRPC connection to the image service
rcc *grpc.ClientConn // our gRPC connection to the runtime service

client criClient
}

const (
Expand Down Expand Up @@ -108,11 +117,6 @@ func (c *client) Connect(options ConnectOptions) error {
return err
}

if c.icc != nil {
c.Debug("starting %s client on socket %s...", kind, socket)
c.ImageServiceClient = api.NewImageServiceClient(c.icc)
}

kind, socket = "runtime services", c.options.RuntimeSocket
if socket == c.options.ImageSocket {
c.rcc = c.icc
Expand All @@ -123,11 +127,15 @@ func (c *client) Connect(options ConnectOptions) error {
}
}

if c.rcc != nil {
c.Debug("starting %s client on socket %s...", kind, socket)
c.RuntimeServiceClient = api.NewRuntimeServiceClient(c.rcc)
client, err := v1.Connect(c.rcc, c.icc)
if err != nil {
client, err = v1alpha2.Connect(c.rcc, c.icc)
}
if err != nil {
return err
}

c.client = client
return nil
}

Expand Down Expand Up @@ -173,9 +181,18 @@ func (c *client) HasRuntimeService() bool {
return c.options.RuntimeSocket != "" && c.options.RuntimeSocket != DontConnect
}

// HasImageService checks if the client is configured with image services.
func (c *client) HasImageService() bool {
return c.options.ImageSocket != "" && c.options.ImageSocket != DontConnect
func (c *client) checkRuntimeService() error {
if c.client == nil || c.rcc == nil {
return clientError("no CRI RuntimeService client")
}
return nil
}

func (c *client) checkImageService() error {
if c.client == nil || c.icc == nil {
return clientError("no CRI ImageService client")
}
return nil
}

// connect attempts to create a gRPC client connection to the given socket.
Expand Down Expand Up @@ -237,6 +254,256 @@ func (c *client) dialNotify(socket string) {
c.options.DialNotify(socket, uid, gid, mode, nil)
}

func (c *client) Version(ctx context.Context, in *criv1.VersionRequest, opts ...grpc.CallOption) (*criv1.VersionResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.Version(ctx, in)
}

func (c *client) RunPodSandbox(ctx context.Context, in *criv1.RunPodSandboxRequest, opts ...grpc.CallOption) (*criv1.RunPodSandboxResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.RunPodSandbox(ctx, in)
}

func (c *client) StopPodSandbox(ctx context.Context, in *criv1.StopPodSandboxRequest, opts ...grpc.CallOption) (*criv1.StopPodSandboxResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.StopPodSandbox(ctx, in)
}

func (c *client) RemovePodSandbox(ctx context.Context, in *criv1.RemovePodSandboxRequest, opts ...grpc.CallOption) (*criv1.RemovePodSandboxResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.RemovePodSandbox(ctx, in)
}

func (c *client) PodSandboxStatus(ctx context.Context, in *criv1.PodSandboxStatusRequest, opts ...grpc.CallOption) (*criv1.PodSandboxStatusResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.PodSandboxStatus(ctx, in)
}

func (c *client) ListPodSandbox(ctx context.Context, in *criv1.ListPodSandboxRequest, opts ...grpc.CallOption) (*criv1.ListPodSandboxResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ListPodSandbox(ctx, in)
}

func (c *client) CreateContainer(ctx context.Context, in *criv1.CreateContainerRequest, opts ...grpc.CallOption) (*criv1.CreateContainerResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.CreateContainer(ctx, in)
}

func (c *client) StartContainer(ctx context.Context, in *criv1.StartContainerRequest, opts ...grpc.CallOption) (*criv1.StartContainerResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.StartContainer(ctx, in)
}

func (c *client) StopContainer(ctx context.Context, in *criv1.StopContainerRequest, opts ...grpc.CallOption) (*criv1.StopContainerResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.StopContainer(ctx, in)
}

func (c *client) RemoveContainer(ctx context.Context, in *criv1.RemoveContainerRequest, opts ...grpc.CallOption) (*criv1.RemoveContainerResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.RemoveContainer(ctx, in)
}

func (c *client) ListContainers(ctx context.Context, in *criv1.ListContainersRequest, opts ...grpc.CallOption) (*criv1.ListContainersResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ListContainers(ctx, in)
}

func (c *client) ContainerStatus(ctx context.Context, in *criv1.ContainerStatusRequest, opts ...grpc.CallOption) (*criv1.ContainerStatusResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ContainerStatus(ctx, in)
}

func (c *client) UpdateContainerResources(ctx context.Context, in *criv1.UpdateContainerResourcesRequest, opts ...grpc.CallOption) (*criv1.UpdateContainerResourcesResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.UpdateContainerResources(ctx, in)
}

func (c *client) ReopenContainerLog(ctx context.Context, in *criv1.ReopenContainerLogRequest, opts ...grpc.CallOption) (*criv1.ReopenContainerLogResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ReopenContainerLog(ctx, in)
}

func (c *client) ExecSync(ctx context.Context, in *criv1.ExecSyncRequest, opts ...grpc.CallOption) (*criv1.ExecSyncResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ExecSync(ctx, in)
}

func (c *client) Exec(ctx context.Context, in *criv1.ExecRequest, opts ...grpc.CallOption) (*criv1.ExecResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.Exec(ctx, in)
}

func (c *client) Attach(ctx context.Context, in *criv1.AttachRequest, opts ...grpc.CallOption) (*criv1.AttachResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.Attach(ctx, in)
}

func (c *client) PortForward(ctx context.Context, in *criv1.PortForwardRequest, opts ...grpc.CallOption) (*criv1.PortForwardResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.PortForward(ctx, in)
}

func (c *client) ContainerStats(ctx context.Context, in *criv1.ContainerStatsRequest, opts ...grpc.CallOption) (*criv1.ContainerStatsResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ContainerStats(ctx, in)
}

func (c *client) ListContainerStats(ctx context.Context, in *criv1.ListContainerStatsRequest, opts ...grpc.CallOption) (*criv1.ListContainerStatsResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ListContainerStats(ctx, in)
}

func (c *client) PodSandboxStats(ctx context.Context, in *criv1.PodSandboxStatsRequest, opts ...grpc.CallOption) (*criv1.PodSandboxStatsResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.PodSandboxStats(ctx, in)
}

func (c *client) ListPodSandboxStats(ctx context.Context, in *criv1.ListPodSandboxStatsRequest, opts ...grpc.CallOption) (*criv1.ListPodSandboxStatsResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.ListPodSandboxStats(ctx, in)
}

func (c *client) UpdateRuntimeConfig(ctx context.Context, in *criv1.UpdateRuntimeConfigRequest, opts ...grpc.CallOption) (*criv1.UpdateRuntimeConfigResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.UpdateRuntimeConfig(ctx, in)
}

func (c *client) Status(ctx context.Context, in *criv1.StatusRequest, opts ...grpc.CallOption) (*criv1.StatusResponse, error) {
if err := c.checkRuntimeService(); err != nil {
return nil, err
}

return c.client.Status(ctx, in)
}

/*
func (c *client) CheckpointContainer(ctx context.Context, in *criv1.CheckpointContainerRequest, opts ...grpc.CallOption) (*criv1.CheckpointContainerResponse, error) {
return nil, fmt.Errorf("unimplemented by CRI RuntimeService")
}

func (c *client) GetContainerEvents(ctx context.Context, in *criv1.GetContainerEventsRequest, opts ...grpc.CallOption) (criv1.RuntimeService_GetContainerEventsClient, error) {
return nil, fmt.Errorf("unimplemented by CRI RuntimeService")
}

func (c *client) ListMetricDescriptors(ctx context.Context, in *criv1.ListMetricDescriptorsRequest, opts ...grpc.CallOption) (*criv1.ListMetricDescriptorsResponse, error) {
return nil, fmt.Errorf("unimplemented by CRI RuntimeService")
}

func (c *client) ListPodSandboxMetrics(ctx context.Context, in *criv1.ListPodSandboxMetricsRequest, opts ...grpc.CallOption) (*criv1.ListPodSandboxMetricsResponse, error) {
return nil, fmt.Errorf("unimplemented by CRI RuntimeService")
}
*/

func (c *client) ListImages(ctx context.Context, in *criv1.ListImagesRequest, opts ...grpc.CallOption) (*criv1.ListImagesResponse, error) {
if err := c.checkImageService(); err != nil {
return nil, err
}

return c.client.ListImages(ctx, in)
}

func (c *client) ImageStatus(ctx context.Context, in *criv1.ImageStatusRequest, opts ...grpc.CallOption) (*criv1.ImageStatusResponse, error) {
if err := c.checkImageService(); err != nil {
return nil, err
}

return c.client.ImageStatus(ctx, in)
}

func (c *client) PullImage(ctx context.Context, in *criv1.PullImageRequest, opts ...grpc.CallOption) (*criv1.PullImageResponse, error) {
if err := c.checkImageService(); err != nil {
return nil, err
}

return c.client.PullImage(ctx, in)
}

func (c *client) RemoveImage(ctx context.Context, in *criv1.RemoveImageRequest, opts ...grpc.CallOption) (*criv1.RemoveImageResponse, error) {
if err := c.checkImageService(); err != nil {
return nil, err
}

return c.client.RemoveImage(ctx, in)
}

func (c *client) ImageFsInfo(ctx context.Context, in *criv1.ImageFsInfoRequest, opts ...grpc.CallOption) (*criv1.ImageFsInfoResponse, error) {
if err := c.checkImageService(); err != nil {
return nil, err
}

return c.client.ImageFsInfo(ctx, in)
}

// Return a formatted client-specific error.
func clientError(format string, args ...interface{}) error {
return fmt.Errorf("cri/client: "+format, args...)
Expand Down