Skip to content

Commit

Permalink
DRA:add example driver
Browse files Browse the repository at this point in the history
Signed-off-by: Hyeongju Johannes Lee <hyeongju.lee@intel.com>
  • Loading branch information
hj-johannes-lee committed Aug 9, 2022
1 parent 233abf2 commit 90dd92e
Show file tree
Hide file tree
Showing 5 changed files with 478 additions and 1 deletion.
151 changes: 151 additions & 0 deletions test/integration/cdi/example-driver/app/kubeletplugin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package app

import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"

cdiapi "github.com/container-orchestrated-devices/container-device-interface/pkg/cdi"
specs "github.com/container-orchestrated-devices/container-device-interface/specs-go"
"k8s.io/klog/v2"
drapbv1 "k8s.io/kubernetes/pkg/kubelet/apis/dra/v1alpha1"
plugin "k8s.io/kubernetes/test/integration/cdi/example-driver/kubeletplugin"
)

const (
deviceName = "exampledevice"
cdiVersion = "0.2.0"
)

type examplePlugin struct {
cdiDir string
driverName string
draAddress string
pluginRegistrationPath string
}

// newExamplePlugin returns an initialized examplePlugin instance.
func newExamplePlugin(cdiDir, driverName, draAddress, pluginRegistrationPath string) (*examplePlugin, error) {
return &examplePlugin{
cdiDir: cdiDir,
driverName: driverName,
draAddress: draAddress,
pluginRegistrationPath: pluginRegistrationPath,
}, nil
}

// getJsonFilePath returns the absolute path where json file is/should be.
func (ex *examplePlugin) getJsonFilePath(claimUID string) string {
jsonfileName := fmt.Sprintf("%s-%s.json", ex.driverName, claimUID)
return filepath.Join(ex.cdiDir, jsonfileName)
}

// startPlugin starts servers that are necessary for plugins and registers the plugin with kubelet.
func (ex examplePlugin) startPlugin() error {
klog.Infof("Starting kubelet plugin")

// create CDI directory if not exists
if err := os.MkdirAll(ex.cdiDir, os.FileMode(0750)); err != nil {
return fmt.Errorf("failed to create directory: %v", err)
}

// create DRA driver socket directory if not exists.
if err := os.MkdirAll(filepath.Dir(ex.draAddress), 0750); err != nil {
return fmt.Errorf("failed to create DRA driver socket directory: %v", err)
}

// Run a grpc server for the driver, that listens on draAddress
if err := plugin.StartNonblockingGrpcServer(ex.draAddress, &ex); err != nil {
return fmt.Errorf("failed to start a nonblocking grpc server: %v", err)
}

// Run a registration server and registers plugin with kubelet
if err := plugin.StartRegistrar(ex.driverName, ex.draAddress, ex.pluginRegistrationPath); err != nil {
return fmt.Errorf("failed to start registrar: %v", err)
}

return nil

}

func (ex *examplePlugin) NodePrepareResource(ctx context.Context, req *drapbv1.NodePrepareResourceRequest) (*drapbv1.NodePrepareResourceResponse, error) {

kind := ex.driverName + "/" + deviceName

klog.Infof("NodePrepareResource is called: request: %+v", req)

klog.Infof("Creating CDI File")
// make resourceHandle in the form of map[string]string
var decodedResourceHandle map[string]string
if err := json.Unmarshal([]byte(req.ResourceHandle), &decodedResourceHandle); err != nil {
return nil, fmt.Errorf("failed to unmarshal resourceHandle: %v", err)
}

// create ContainerEdits in device spec and ContainerEdits
envs := []string{}
for key, val := range decodedResourceHandle {
envs = append(envs, key+"="+val)
}
// create spec
spec := specs.Spec{
Version: cdiVersion,
Kind: kind,
Devices: []specs.Device{{
Name: deviceName,
ContainerEdits: specs.ContainerEdits{
Env: envs,
},
}},
ContainerEdits: specs.ContainerEdits{},
}
cdiSpec, err := cdiapi.NewSpec(&spec, ex.getJsonFilePath(req.ClaimUid), 1)
if err != nil {
return nil, fmt.Errorf("failed to create new spec: %v", err)
}
klog.V(5).InfoS("CDI spec is validated")

// create bytes from spec, which would be written into the json file
jsonBytes, err := json.Marshal(*cdiSpec)
if err != nil {
return nil, fmt.Errorf("failed to marshal CDI spec: %v", err)
}

// create json file
filePath := ex.getJsonFilePath(req.ClaimUid)
if err := os.WriteFile(filePath, jsonBytes, os.FileMode(0644)); err != nil {
return nil, fmt.Errorf("failed to write CDI file %v", err)
}
klog.Infof("CDI file is created at " + filePath)

dev := cdiSpec.GetDevice(deviceName).GetQualifiedName()
resp := &drapbv1.NodePrepareResourceResponse{CdiDevice: []string{dev}}
klog.Infof("NodePrepareResource is successfuly finished, response: %+v", resp)
return resp, nil
}

func (ex *examplePlugin) NodeUnprepareResource(ctx context.Context, req *drapbv1.NodeUnprepareResourceRequest) (*drapbv1.NodeUnprepareResourceResponse, error) {
klog.Infof("NodeUnprepareResource is called: request: %+v", req)
filePath := ex.getJsonFilePath(req.ClaimUid)
if err := os.Remove(filePath); err != nil {
return nil, fmt.Errorf("error removing CDI file: %v", err)
}
klog.Infof("CDI file at %s is removed" + filePath)
return &drapbv1.NodeUnprepareResourceResponse{}, nil
}
18 changes: 17 additions & 1 deletion test/integration/cdi/example-driver/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"strings"
"time"

"github.com/container-orchestrated-devices/container-device-interface/pkg/cdi"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand All @@ -44,6 +45,8 @@ import (
"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/component-base/term"
"k8s.io/klog/v2"
"k8s.io/kubernetes/cmd/kubelet/app/options"
"k8s.io/kubernetes/pkg/kubelet/cm/dra"
"k8s.io/kubernetes/test/integration/cdi/example-driver/leaderelection"
)

Expand Down Expand Up @@ -77,6 +80,7 @@ func NewCommand() *cobra.Command {

fs = sharedFlagSets.FlagSet("CDI")
driverName := fs.String("drivername", "example-driver.cdi.k8s.io", "Resource driver name.")
cdiDir := fs.String("cdi-dir", cdi.DefaultDynamicDir, "CDI directory where the driver sockets and CDI files will be")

fs = sharedFlagSets.FlagSet("other")
featureGate := featuregate.NewFeatureGate()
Expand Down Expand Up @@ -244,13 +248,25 @@ func NewCommand() *cobra.Command {
}
cmd.AddCommand(controller)

// kubelet plugin
kubeletPluginFlagSets := cliflag.NamedFlagSets{}
fs = kubeletPluginFlagSets.FlagSet("kubelet plugin")

kubeletDir := fs.String("kubelet-dir", options.NewKubeletFlags().RootDirectory, "Path to the Kubelet directory.")
draAddress := fs.String("dra-address", path.Join(dra.DRACheckpointDir, *driverName, "dra.sock"), "Path to the DRA driver socket kubelet will use to issue CDI operations.")
pluginRegistrationPath := fs.String("plugin-registration-path", path.Join(*kubeletDir, "plugins_registry"), "Path to Kubernetes plugin registration socket.")

kubeletPlugin := &cobra.Command{
Use: "kubelet-plugin",
Short: "run as kubelet plugin",
Long: "cdi-example-driver kubelet-plugin runs as a device plugin for kubelet that supports dynamic resource allocation.",
Args: cobra.ExactArgs(0),
RunE: func(cmd *cobra.Command, args []string) error {
return fmt.Errorf("%s: not implemented", cmd.Use)
plugin, err := newExamplePlugin(*cdiDir, *driverName, *draAddress, *pluginRegistrationPath)
if err != nil {
return err
}
return plugin.startPlugin()
},
}
cmd.AddCommand(kubeletPlugin)
Expand Down
128 changes: 128 additions & 0 deletions test/integration/cdi/example-driver/kubeletplugin/noderegistrar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

// Package app does all of the work necessary to configure and run a
// Kubernetes app process.
package kubeletplugin

import (
"errors"
"fmt"
"net"
"os"
"os/signal"
"runtime"
"syscall"

"google.golang.org/grpc"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util"
registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
)

type nodeRegistrarConfig struct {
draDriverName string
draAddress string
pluginRegistrationPath string
}
type nodeRegistrar struct {
config nodeRegistrarConfig
}

func buildSocketPath(cdiDriverName string, pluginRegistrationPath string) string {
return fmt.Sprintf("%s/%s-reg.sock", pluginRegistrationPath, cdiDriverName)
}

func removeRegSocket(cdiDriverName, pluginRegistrationPath string) {
sigc := make(chan os.Signal, 1)
signal.Notify(sigc, syscall.SIGTERM)
<-sigc
socketPath := buildSocketPath(cdiDriverName, pluginRegistrationPath)
err := os.Remove(socketPath)
if err != nil && !os.IsNotExist(err) {
klog.Errorf("failed to remove socket: %s with error: %+v", socketPath, err)
}
}

func cleanupSocketFile(socketPath string) error {
if err := os.Remove(socketPath); err == nil {
return nil
} else if errors.Is(err, os.ErrNotExist) {
return nil
} else {
return fmt.Errorf("unexpected error removing socket: %v", err)
}
}

// newRegistrar returns an initialized nodeRegistrar instance.
func newRegistrar(config nodeRegistrarConfig) *nodeRegistrar {
return &nodeRegistrar{
config: config,
}
}

// nodeRegister runs a registration server and registers plugin with kubelet.
func (nr nodeRegistrar) nodeRegister() error {
// Run a registration server.
registrationServer := newRegistrationServer(nr.config.draDriverName, nr.config.draAddress, []string{"1.0.0"})
socketPath := buildSocketPath(nr.config.draDriverName, nr.config.pluginRegistrationPath)
if err := cleanupSocketFile(socketPath); err != nil {
return fmt.Errorf(err.Error())
}

var oldmask int
if runtime.GOOS == "linux" {
oldmask, _ = util.Umask(0077)
}

klog.Infof("Starting registration server at: %s\n", socketPath)
lis, err := net.Listen("unix", socketPath)
if err != nil {
return fmt.Errorf("failed to listen on socket: %s with error: %+v", socketPath, err)
}

if runtime.GOOS == "linux" {
util.Umask(oldmask)
}
grpcServer := grpc.NewServer()

// Register kubelet plugin watcher api.
registerapi.RegisterRegistrationServer(grpcServer, registrationServer)

go removeRegSocket(nr.config.draDriverName, nr.config.pluginRegistrationPath)
// Start service.
klog.Infof("Registration server started at: %s\n", socketPath)
if err := grpcServer.Serve(lis); err != nil {
return fmt.Errorf("registration server stopped serving: %v", err)
}
return nil
}

// StartRegistrar creates a new registrar, and run the function nodeRegister.
func StartRegistrar(driverName, draAddress, pluginRegistrationPath string) error {
klog.Infof("Starting noderegistrar")
registrarConfig := nodeRegistrarConfig{
draDriverName: driverName,
draAddress: draAddress,
pluginRegistrationPath: pluginRegistrationPath,
}
registrar := newRegistrar(registrarConfig)
if err := registrar.nodeRegister(); err != nil {
return fmt.Errorf("failed to register node: %v", err)
}

return nil
}
Loading

0 comments on commit 90dd92e

Please sign in to comment.