Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dra: refactoring overall flow of prepare/unprepare resources #120099

Merged
merged 1 commit into from
Oct 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
194 changes: 120 additions & 74 deletions pkg/kubelet/cm/dra/plugin/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,130 +24,176 @@ import (
"google.golang.org/grpc"
grpccodes "google.golang.org/grpc/codes"
grpcstatus "google.golang.org/grpc/status"
"k8s.io/klog/v2"

"k8s.io/klog/v2"
drapbv1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapb "k8s.io/kubelet/pkg/apis/dra/v1alpha3"
)

const PluginClientTimeout = 45 * time.Second

// draPluginClient encapsulates all dra plugin methods.
type draPluginClient struct {
pluginName string
plugin *Plugin
type (
nodeResourceManager interface {
Prepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error)
Unprepare(context.Context, *grpc.ClientConn, *plugin, *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error)
}

v1alpha2NodeResourceManager struct{}
v1alpha3NodeResourceManager struct{}
)

var nodeResourceManagers = map[string]nodeResourceManager{
v1alpha2Version: v1alpha2NodeResourceManager{},
v1alpha3Version: v1alpha3NodeResourceManager{},
}

func (v1alpha2rm v1alpha2NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
TommyStarK marked this conversation as resolved.
Show resolved Hide resolved
nodeClient := drapbv1alpha2.NewNodeClient(conn)
response := &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}

for _, claim := range req.Claims {
res, err := nodeClient.NodePrepareResource(ctx,
&drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodePrepareResourceResponse{}
if err != nil {
result.Error = err.Error()
} else {
result.CDIDevices = res.CdiDevices
}
response.Claims[claim.Uid] = result
}

return response, nil
}

func (v1alpha2rm v1alpha2NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, _ *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
nodeClient := drapbv1alpha2.NewNodeClient(conn)
response := &drapb.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
TommyStarK marked this conversation as resolved.
Show resolved Hide resolved

for _, claim := range req.Claims {
_, err := nodeClient.NodeUnprepareResource(ctx,
&drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodeUnprepareResourceResponse{}
if err != nil {
result.Error = err.Error()
}
response.Claims[claim.Uid] = result
}

return response, nil
}

func (v1alpha3rm v1alpha3NodeResourceManager) Prepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodePrepareResourcesRequest) (*drapb.NodePrepareResourcesResponse, error) {
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodePrepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
p.setVersion(v1alpha2Version)
return nodeResourceManagers[v1alpha2Version].Prepare(ctx, conn, p, req)
}
return nil, err
}

return response, nil
}

func (v1alpha3rm v1alpha3NodeResourceManager) Unprepare(ctx context.Context, conn *grpc.ClientConn, p *plugin, req *drapb.NodeUnprepareResourcesRequest) (*drapb.NodeUnprepareResourcesResponse, error) {
nodeClient := drapb.NewNodeClient(conn)
response, err := nodeClient.NodeUnprepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
p.setVersion(v1alpha2Version)
return nodeResourceManagers[v1alpha2Version].Unprepare(ctx, conn, p, req)
}
return nil, err
}

return response, nil
}

func NewDRAPluginClient(pluginName string) (drapb.NodeClient, error) {
if pluginName == "" {
return nil, fmt.Errorf("plugin name is empty")
}

existingPlugin := draPlugins.Get(pluginName)
existingPlugin := draPlugins.get(pluginName)
if existingPlugin == nil {
return nil, fmt.Errorf("plugin name %s not found in the list of registered DRA plugins", pluginName)
}

return &draPluginClient{
pluginName: pluginName,
plugin: existingPlugin,
}, nil
return existingPlugin, nil
}

func (r *draPluginClient) NodePrepareResources(
func (p *plugin) NodePrepareResources(
ctx context.Context,
req *drapb.NodePrepareResourcesRequest,
opts ...grpc.CallOption,
) (resp *drapb.NodePrepareResourcesResponse, err error) {
) (*drapb.NodePrepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodePrepareResources rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", resp, "err", err)
TommyStarK marked this conversation as resolved.
Show resolved Hide resolved

conn, err := r.plugin.getOrCreateGRPCConn()
conn, err := p.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)

ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()

resp, err = nodeClient.NodePrepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
// Fall back to the older gRPC API.
resp = &drapb.NodePrepareResourcesResponse{
Claims: make(map[string]*drapb.NodePrepareResourceResponse),
}
err = nil
for _, claim := range req.Claims {
respOld, errOld := nodeClientOld.NodePrepareResource(ctx,
&drapbv1alpha2.NodePrepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodePrepareResourceResponse{}
if errOld != nil {
result.Error = errOld.Error()
} else {
result.CDIDevices = respOld.CdiDevices
}
resp.Claims[claim.Uid] = result
}
}
version := p.getVersion()
resourceManager, exists := nodeResourceManagers[version]
if !exists {
err := fmt.Errorf("unsupported plugin version: %s", version)
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", nil, "err", err)
return nil, err
}

return
response, err := resourceManager.Prepare(ctx, conn, p, req)
logger.V(4).Info(log("done calling NodePrepareResources rpc"), "response", response, "err", err)
return response, err
}

func (r *draPluginClient) NodeUnprepareResources(
func (p *plugin) NodeUnprepareResources(
TommyStarK marked this conversation as resolved.
Show resolved Hide resolved
ctx context.Context,
req *drapb.NodeUnprepareResourcesRequest,
opts ...grpc.CallOption,
) (resp *drapb.NodeUnprepareResourcesResponse, err error) {
) (*drapb.NodeUnprepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info(log("calling NodeUnprepareResource rpc"), "request", req)
defer logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", resp, "err", err)

conn, err := r.plugin.getOrCreateGRPCConn()
conn, err := p.getOrCreateGRPCConn()
if err != nil {
return nil, err
}
nodeClient := drapb.NewNodeClient(conn)
nodeClientOld := drapbv1alpha2.NewNodeClient(conn)

ctx, cancel := context.WithTimeout(ctx, PluginClientTimeout)
defer cancel()

resp, err = nodeClient.NodeUnprepareResources(ctx, req)
if err != nil {
status, _ := grpcstatus.FromError(err)
if status.Code() == grpccodes.Unimplemented {
// Fall back to the older gRPC API.
resp = &drapb.NodeUnprepareResourcesResponse{
Claims: make(map[string]*drapb.NodeUnprepareResourceResponse),
}
err = nil
for _, claim := range req.Claims {
_, errOld := nodeClientOld.NodeUnprepareResource(ctx,
&drapbv1alpha2.NodeUnprepareResourceRequest{
Namespace: claim.Namespace,
ClaimUid: claim.Uid,
ClaimName: claim.Name,
ResourceHandle: claim.ResourceHandle,
})
result := &drapb.NodeUnprepareResourceResponse{}
if errOld != nil {
result.Error = errOld.Error()
}
resp.Claims[claim.Uid] = result
}
}
version := p.getVersion()
resourceManager, exists := nodeResourceManagers[version]
if !exists {
err := fmt.Errorf("unsupported plugin version: %s", version)
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", nil, "err", err)
return nil, err
}

return
response, err := resourceManager.Unprepare(ctx, conn, p, req)
logger.V(4).Info(log("done calling NodeUnprepareResources rpc"), "response", response, "err", err)
return response, err
}