Skip to content

Commit

Permalink
pkg/cri: add bridging for pre-v1 CRI API.
Browse files Browse the repository at this point in the history
Add bridging for pre-v1 CRI client requests. Note that
this only allows pre-v1 *clients* to talk to us. We do
expect the CRI runtime/server to talk CRI v1 to us.
  • Loading branch information
klihub committed Mar 1, 2022
1 parent 25051c8 commit 0fed389
Show file tree
Hide file tree
Showing 5 changed files with 246 additions and 0 deletions.
3 changes: 3 additions & 0 deletions pkg/cri/relay/relay.go
Expand Up @@ -37,6 +37,8 @@ type Options struct {
ImageSocket string
// RuntimeSocket is the socket path for the (real) CRI runtime services.
RuntimeSocket string
// PreV1CRI enables request bridging for pre-v1alpha2 CRI clients.
PreV1CRI bool
// QualifyReqFn produces context for disambiguating a CRI request/reply.
QualifyReqFn func(interface{}) string
}
Expand Down Expand Up @@ -87,6 +89,7 @@ func NewRelay(options Options) (Relay, error) {
User: -1,
Group: -1,
Mode: 0660,
PreV1CRI: options.PreV1CRI,
QualifyReqFn: r.options.QualifyReqFn,
}
if r.server, err = server.NewServer(srvopts); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/cri/resource-manager/flags.go
Expand Up @@ -41,6 +41,7 @@ type options struct {
MetricsTimer time.Duration
RebalanceTimer time.Duration
DisableUI bool
DisablePreV1CRI bool
}

// Relay command line options.
Expand Down Expand Up @@ -83,4 +84,7 @@ func init() {

flag.BoolVar(&opt.DisableUI, "disable-ui", false,
"Disable serving container placement visualization UIs.")

flag.BoolVar(&opt.DisablePreV1CRI, "disable-pre-v1-cri", false,
"Disable request bridging for pre-v1 CRI clients.")
}
1 change: 1 addition & 0 deletions pkg/cri/resource-manager/resource-manager.go
Expand Up @@ -465,6 +465,7 @@ func (m *resmgr) setupRelay() error {
RelaySocket: opt.RelaySocket,
ImageSocket: opt.ImageSocket,
RuntimeSocket: opt.RuntimeSocket,
PreV1CRI: !opt.DisablePreV1CRI,
QualifyReqFn: m.disambiguate,
}
if m.relay, err = relay.NewRelay(options); err != nil {
Expand Down
13 changes: 13 additions & 0 deletions pkg/cri/server/server.go
Expand Up @@ -27,6 +27,7 @@ import (

"google.golang.org/grpc"

v1alpha2 "github.com/intel/cri-resource-manager/pkg/cri/server/v1alpha2"
api "k8s.io/cri-api/pkg/apis/runtime/v1"

"github.com/intel/cri-resource-manager/pkg/cri/resource-manager/sockets"
Expand All @@ -48,6 +49,8 @@ type Options struct {
Group int
// Mode is the permission mode bits for our gRPC socket.
Mode os.FileMode
// PreV1CRI enables request bridging for v1alpha2 CRI clients.
PreV1CRI bool
// QualifyReqFn produces return context for disambiguating a CRI request/reply.
QualifyReqFn func(interface{}) string
}
Expand Down Expand Up @@ -88,6 +91,7 @@ type server struct {
chkBypassFn func() bool // function to check interception bypass
runtime *api.RuntimeServiceServer // CRI runtime service
image *api.ImageServiceServer // CRI image service
v1alpha2 *v1alpha2.Server // CRI v1alpha2 bridging service
}

// NewServer creates a new server instance.
Expand Down Expand Up @@ -118,6 +122,7 @@ func (s *server) RegisterImageService(service api.ImageServiceServer) error {
is := service
s.image = &is
api.RegisterImageServiceServer(s.server, s)
s.v1alpha2.RegisterImageService(s)

return nil
}
Expand All @@ -135,6 +140,7 @@ func (s *server) RegisterRuntimeService(service api.RuntimeServiceServer) error
rs := service
s.runtime = &rs
api.RegisterRuntimeServiceServer(s.server, s)
s.v1alpha2.RegisterRuntimeService(s)

return nil
}
Expand Down Expand Up @@ -229,6 +235,13 @@ func (s *server) createGrpcServer() error {

s.server = grpc.NewServer(instrumentation.InjectGrpcServerTrace()...)

if s.options.PreV1CRI {
s.Info("pre-V1 CRI client bridging enabled")
s.v1alpha2 = v1alpha2.NewServer(s.server)
} else {
s.Info("pre-V1 CRI client bridging disabled")
}

return nil
}

Expand Down
225 changes: 225 additions & 0 deletions pkg/cri/server/v1alpha2/server.go
@@ -0,0 +1,225 @@
// Copyright 2022 Intel Corporation. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"unsafe"

"google.golang.org/grpc"

v1 "k8s.io/cri-api/pkg/apis/runtime/v1"
"k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
)

type Server struct {
server *grpc.Server
runtime v1.RuntimeServiceServer
image v1.ImageServiceServer
}

func NewServer(s *grpc.Server) *Server {
return &Server{
server: s,
}
}

func (s *Server) RegisterImageService(image v1.ImageServiceServer) {
if s == nil {
return
}
s.image = image
v1alpha2.RegisterImageServiceServer(s.server, s)
}

func (s *Server) RegisterRuntimeService(runtime v1.RuntimeServiceServer) {
if s == nil {
return
}
s.runtime = runtime
v1alpha2.RegisterRuntimeServiceServer(s.server, s)
}

func (s *Server) ListImages(ctx context.Context,
req *v1alpha2.ListImagesRequest) (*v1alpha2.ListImagesResponse, error) {
rsp, err := s.image.ListImages(ctx, (*v1.ListImagesRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ListImagesResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ImageStatus(ctx context.Context,
req *v1alpha2.ImageStatusRequest) (*v1alpha2.ImageStatusResponse, error) {
rsp, err := s.image.ImageStatus(ctx, (*v1.ImageStatusRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ImageStatusResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) PullImage(ctx context.Context,
req *v1alpha2.PullImageRequest) (*v1alpha2.PullImageResponse, error) {
rsp, err := s.image.PullImage(ctx, (*v1.PullImageRequest)(unsafe.Pointer(req)))
return (*v1alpha2.PullImageResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) RemoveImage(ctx context.Context,
req *v1alpha2.RemoveImageRequest) (*v1alpha2.RemoveImageResponse, error) {
rsp, err := s.image.RemoveImage(ctx, (*v1.RemoveImageRequest)(unsafe.Pointer(req)))
return (*v1alpha2.RemoveImageResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ImageFsInfo(ctx context.Context,
req *v1alpha2.ImageFsInfoRequest) (*v1alpha2.ImageFsInfoResponse, error) {
rsp, err := s.image.ImageFsInfo(ctx, (*v1.ImageFsInfoRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ImageFsInfoResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) Version(ctx context.Context,
req *v1alpha2.VersionRequest) (*v1alpha2.VersionResponse, error) {
rsp, err := s.runtime.Version(ctx, (*v1.VersionRequest)(unsafe.Pointer(req)))
return (*v1alpha2.VersionResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) RunPodSandbox(ctx context.Context,
req *v1alpha2.RunPodSandboxRequest) (*v1alpha2.RunPodSandboxResponse, error) {
rsp, err := s.runtime.RunPodSandbox(ctx, (*v1.RunPodSandboxRequest)(unsafe.Pointer(req)))
return (*v1alpha2.RunPodSandboxResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) StopPodSandbox(ctx context.Context,
req *v1alpha2.StopPodSandboxRequest) (*v1alpha2.StopPodSandboxResponse, error) {
rsp, err := s.runtime.StopPodSandbox(ctx, (*v1.StopPodSandboxRequest)(unsafe.Pointer(req)))
return (*v1alpha2.StopPodSandboxResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) RemovePodSandbox(ctx context.Context,
req *v1alpha2.RemovePodSandboxRequest) (*v1alpha2.RemovePodSandboxResponse, error) {
rsp, err := s.runtime.RemovePodSandbox(ctx, (*v1.RemovePodSandboxRequest)(unsafe.Pointer(req)))
return (*v1alpha2.RemovePodSandboxResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) PodSandboxStatus(ctx context.Context,
req *v1alpha2.PodSandboxStatusRequest) (*v1alpha2.PodSandboxStatusResponse, error) {
rsp, err := s.runtime.PodSandboxStatus(ctx, (*v1.PodSandboxStatusRequest)(unsafe.Pointer(req)))
return (*v1alpha2.PodSandboxStatusResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ListPodSandbox(ctx context.Context,
req *v1alpha2.ListPodSandboxRequest) (*v1alpha2.ListPodSandboxResponse, error) {
rsp, err := s.runtime.ListPodSandbox(ctx, (*v1.ListPodSandboxRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ListPodSandboxResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) CreateContainer(ctx context.Context,
req *v1alpha2.CreateContainerRequest) (*v1alpha2.CreateContainerResponse, error) {
rsp, err := s.runtime.CreateContainer(ctx, (*v1.CreateContainerRequest)(unsafe.Pointer(req)))
return (*v1alpha2.CreateContainerResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) StartContainer(ctx context.Context,
req *v1alpha2.StartContainerRequest) (*v1alpha2.StartContainerResponse, error) {
rsp, err := s.runtime.StartContainer(ctx, (*v1.StartContainerRequest)(unsafe.Pointer(req)))
return (*v1alpha2.StartContainerResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) StopContainer(ctx context.Context,
req *v1alpha2.StopContainerRequest) (*v1alpha2.StopContainerResponse, error) {
rsp, err := s.runtime.StopContainer(ctx, (*v1.StopContainerRequest)(unsafe.Pointer(req)))
return (*v1alpha2.StopContainerResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) RemoveContainer(ctx context.Context,
req *v1alpha2.RemoveContainerRequest) (*v1alpha2.RemoveContainerResponse, error) {
rsp, err := s.runtime.RemoveContainer(ctx, (*v1.RemoveContainerRequest)(unsafe.Pointer(req)))
return (*v1alpha2.RemoveContainerResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ListContainers(ctx context.Context,
req *v1alpha2.ListContainersRequest) (*v1alpha2.ListContainersResponse, error) {
rsp, err := s.runtime.ListContainers(ctx, (*v1.ListContainersRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ListContainersResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ContainerStatus(ctx context.Context,
req *v1alpha2.ContainerStatusRequest) (*v1alpha2.ContainerStatusResponse, error) {
rsp, err := s.runtime.ContainerStatus(ctx, (*v1.ContainerStatusRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ContainerStatusResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) UpdateContainerResources(ctx context.Context,
req *v1alpha2.UpdateContainerResourcesRequest) (*v1alpha2.UpdateContainerResourcesResponse, error) {
rsp, err := s.runtime.UpdateContainerResources(ctx, (*v1.UpdateContainerResourcesRequest)(unsafe.Pointer(req)))
return (*v1alpha2.UpdateContainerResourcesResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ReopenContainerLog(ctx context.Context,
req *v1alpha2.ReopenContainerLogRequest) (*v1alpha2.ReopenContainerLogResponse, error) {
rsp, err := s.runtime.ReopenContainerLog(ctx, (*v1.ReopenContainerLogRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ReopenContainerLogResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ExecSync(ctx context.Context,
req *v1alpha2.ExecSyncRequest) (*v1alpha2.ExecSyncResponse, error) {
rsp, err := s.runtime.ExecSync(ctx, (*v1.ExecSyncRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ExecSyncResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) Exec(ctx context.Context,
req *v1alpha2.ExecRequest) (*v1alpha2.ExecResponse, error) {
rsp, err := s.runtime.Exec(ctx, (*v1.ExecRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ExecResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) Attach(ctx context.Context,
req *v1alpha2.AttachRequest) (*v1alpha2.AttachResponse, error) {
rsp, err := s.runtime.Attach(ctx, (*v1.AttachRequest)(unsafe.Pointer(req)))
return (*v1alpha2.AttachResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) PortForward(ctx context.Context,
req *v1alpha2.PortForwardRequest) (*v1alpha2.PortForwardResponse, error) {
rsp, err := s.runtime.PortForward(ctx, (*v1.PortForwardRequest)(unsafe.Pointer(req)))
return (*v1alpha2.PortForwardResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ContainerStats(ctx context.Context,
req *v1alpha2.ContainerStatsRequest) (*v1alpha2.ContainerStatsResponse, error) {
rsp, err := s.runtime.ContainerStats(ctx, (*v1.ContainerStatsRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ContainerStatsResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ListContainerStats(ctx context.Context,
req *v1alpha2.ListContainerStatsRequest) (*v1alpha2.ListContainerStatsResponse, error) {
rsp, err := s.runtime.ListContainerStats(ctx, (*v1.ListContainerStatsRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ListContainerStatsResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) PodSandboxStats(ctx context.Context, req *v1alpha2.PodSandboxStatsRequest) (*v1alpha2.PodSandboxStatsResponse, error) {
rsp, err := s.runtime.PodSandboxStats(ctx, (*v1.PodSandboxStatsRequest)(unsafe.Pointer(req)))
return (*v1alpha2.PodSandboxStatsResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) ListPodSandboxStats(ctx context.Context, req *v1alpha2.ListPodSandboxStatsRequest) (*v1alpha2.ListPodSandboxStatsResponse, error) {
rsp, err := s.runtime.ListPodSandboxStats(ctx, (*v1.ListPodSandboxStatsRequest)(unsafe.Pointer(req)))
return (*v1alpha2.ListPodSandboxStatsResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) UpdateRuntimeConfig(ctx context.Context,
req *v1alpha2.UpdateRuntimeConfigRequest) (*v1alpha2.UpdateRuntimeConfigResponse, error) {
rsp, err := s.runtime.UpdateRuntimeConfig(ctx, (*v1.UpdateRuntimeConfigRequest)(unsafe.Pointer(req)))
return (*v1alpha2.UpdateRuntimeConfigResponse)(unsafe.Pointer(rsp)), err
}

func (s *Server) Status(ctx context.Context,
req *v1alpha2.StatusRequest) (*v1alpha2.StatusResponse, error) {
rsp, err := s.runtime.Status(ctx, (*v1.StatusRequest)(unsafe.Pointer(req)))
return (*v1alpha2.StatusResponse)(unsafe.Pointer(rsp)), err
}

0 comments on commit 0fed389

Please sign in to comment.