Skip to content

Commit

Permalink
feat: Bump kubelet gRPC API to v1alpha3
Browse files Browse the repository at this point in the history
  • Loading branch information
toVersus committed Apr 20, 2024
1 parent 87e127a commit 8ac27dd
Showing 1 changed file with 62 additions and 23 deletions.
85 changes: 62 additions & 23 deletions cmd/fake-dra-kubeletplugin/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@ import (
"context"
"errors"
"fmt"
"sync"

"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
drav1alpha2 "k8s.io/kubelet/pkg/apis/dra/v1alpha2"
drapbv1 "k8s.io/kubelet/pkg/apis/dra/v1alpha3"

nascrd "github.com/toVersus/fake-dra-driver/api/3-shake.com/resource/fake/nas/v1alpha1"
nasclient "github.com/toVersus/fake-dra-driver/api/3-shake.com/resource/fake/nas/v1alpha1/client"
)

var _ drav1alpha2.NodeServer = &driver{}
var _ drapbv1.NodeServer = &driver{}

type driver struct {
sync.Mutex

nascrd *nascrd.NodeAllocationState
nasclient *nasclient.Client
state *DeviceState
Expand Down Expand Up @@ -79,14 +82,35 @@ func (d *driver) Shutdown(ctx context.Context) error {
})
}

func (d *driver) NodePrepareResource(ctx context.Context, req *drav1alpha2.NodePrepareResourceRequest) (*drav1alpha2.NodePrepareResourceResponse, error) {
logger := klog.FromContext(ctx).WithValues(
"resourceClaim", klog.KRef(req.Namespace, req.ClaimName),
"resourceClaimUID", req.ClaimUid,
"nodeAllocationState", klog.KObj(d.nascrd),
)
func (d *driver) NodeListAndWatchResources(req *drapbv1.NodeListAndWatchResourcesRequest, stream drapbv1.Node_NodeListAndWatchResourcesServer) error {
// DRA Structured Parameters is not supported yet
return nil
}

func (d *driver) NodePrepareResources(ctx context.Context, req *drapbv1.NodePrepareResourcesRequest) (*drapbv1.NodePrepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.V(4).Info("NodePrepareResource is called", "numClaims", len(req.Claims))

preparedResources := &drapbv1.NodePrepareResourcesResponse{Claims: map[string]*drapbv1.NodePrepareResourceResponse{}}

// In production version some common operations of d.nodeUnprepareResources
// should be done outside of the loop, for instance updating the CR could
// be done once after all HW was prepared.
for _, claim := range req.Claims {
prepared := d.nodePrepareResource(ctx, claim)
klog.V(4).Info("Prepared devices for allocated claims", "devices", klog.Format(prepared))
preparedResources.Claims[claim.Uid] = prepared
}

return preparedResources, nil
}

func (d *driver) nodePrepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodePrepareResourceResponse {
logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("NodePrepareResource is called")
d.Lock()
defer d.Unlock()

var prepared []string
err := retry.RetryOnConflict(retry.DefaultRetry, func() error {
Expand All @@ -96,14 +120,14 @@ func (d *driver) NodePrepareResource(ctx context.Context, req *drav1alpha2.NodeP
return err
}
logger.V(4).Info("Preparing devices for claim")
prepared, err = d.state.Prepare(ctx, req.ClaimUid, d.nascrd.Spec.AllocatedClaims[req.ClaimUid])
prepared, err = d.state.Prepare(ctx, claim.Uid, d.nascrd.Spec.AllocatedClaims[claim.Uid])
if err != nil {
return fmt.Errorf("error preparing devices for claim %q: %w", req.ClaimUid, err)
return fmt.Errorf("error preparing devices for claim %q: %w", claim.Uid, err)
}

logger.V(4).Info("Updating spec of NodeAllocationState and add prepared devices to PreparedDevices field")
if err := d.nasclient.Update(d.state.GetUpdatedSpec(&d.nascrd.Spec)); err != nil {
if nestedErr := d.state.Unprepare(ctx, req.ClaimUid); nestedErr != nil {
if nestedErr := d.state.Unprepare(ctx, claim.Uid); nestedErr != nil {
logger.Error(errors.Join(err, nestedErr), "Error unpreparing resource after claim Update() failed")
} else {
logger.Error(err, "Error updating NodeAllocationState status to NotReady, so unpreparing earlier prepared resource")
Expand All @@ -114,19 +138,32 @@ func (d *driver) NodePrepareResource(ctx context.Context, req *drav1alpha2.NodeP
return nil
})
if err != nil {
return nil, fmt.Errorf("failed to retry preparing resource: %w", err)
return &drapbv1.NodePrepareResourceResponse{
Error: fmt.Sprintf("failed to retry preparing resource: %s", err),
}
}

logger.V(4).Info("Prepared devices for allocated claims", "devices", klog.Format(prepared))
return &drav1alpha2.NodePrepareResourceResponse{CdiDevices: prepared}, nil
return &drapbv1.NodePrepareResourceResponse{CDIDevices: prepared}
}

func (d *driver) NodeUnprepareResources(ctx context.Context, req *drapbv1.NodeUnprepareResourcesRequest) (*drapbv1.NodeUnprepareResourcesResponse, error) {
logger := klog.FromContext(ctx)
logger.Info("NodeUnPrepareResource is called", "nclaims", len(req.Claims))
unpreparedResources := &drapbv1.NodeUnprepareResourcesResponse{Claims: map[string]*drapbv1.NodeUnprepareResourceResponse{}}

for _, claim := range req.Claims {
unpreparedResources.Claims[claim.Uid] = d.nodeUnprepareResource(ctx, claim)
}

return unpreparedResources, nil
}

func (d *driver) NodeUnprepareResource(ctx context.Context, req *drav1alpha2.NodeUnprepareResourceRequest) (*drav1alpha2.NodeUnprepareResourceResponse, error) {
logger := klog.FromContext(ctx).WithValues(
"resourceClaim", klog.KRef(req.Namespace, req.ClaimName),
"resourceClaimUID", req.ClaimUid,
"nodeAllocationState", klog.KObj(d.nascrd),
)
func (d *driver) nodeUnprepareResource(ctx context.Context, claim *drapbv1.Claim) *drapbv1.NodeUnprepareResourceResponse {
d.Lock()
defer d.Unlock()

logger := klog.FromContext(ctx)
ctx = klog.NewContext(ctx, logger)
logger.V(4).Info("NodeUnprepareResource is called")

Expand All @@ -136,8 +173,8 @@ func (d *driver) NodeUnprepareResource(ctx context.Context, req *drav1alpha2.Nod
return err
}
logger.V(4).Info("Unpreparing devices for claim")
if err := d.state.Unprepare(ctx, req.ClaimUid); err != nil {
return fmt.Errorf("error unpreparing devices for claim %q: %w", req.ClaimUid, err)
if err := d.state.Unprepare(ctx, claim.Uid); err != nil {
return fmt.Errorf("error unpreparing devices for claim %q: %w", claim.Uid, err)
}

if err := d.nasclient.Update(d.state.GetUpdatedSpec(&d.nascrd.Spec)); err != nil {
Expand All @@ -147,9 +184,11 @@ func (d *driver) NodeUnprepareResource(ctx context.Context, req *drav1alpha2.Nod
return nil
})
if err != nil {
return nil, fmt.Errorf("error unpreparing resource: %w", err)
return &drapbv1.NodeUnprepareResourceResponse{
Error: fmt.Sprintf("error unpreparing resource: %s", err),
}
}

logger.V(4).Info("Unprepared devices for unallocated resource claim")
return &drav1alpha2.NodeUnprepareResourceResponse{}, nil
return &drapbv1.NodeUnprepareResourceResponse{}
}

0 comments on commit 8ac27dd

Please sign in to comment.