Skip to content

Commit

Permalink
dra kubelet: implement NodeResourceSlice controller
Browse files Browse the repository at this point in the history
kubelet is responsible for publishing the NodeResourceSlice objects for its
node. The new controller deletes any extra objects that don't match some
registered driver, updates existing ones, and creates new ones as needed to
ensure that the information provided by drivers is replicated correctly in the
cluster.
  • Loading branch information
pohly committed Mar 1, 2024
1 parent 96e5e16 commit 0da6af8
Show file tree
Hide file tree
Showing 13 changed files with 789 additions and 162 deletions.
1 change: 1 addition & 0 deletions cmd/kubelet/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,6 +808,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
kubeDeps.Mounter,
kubeDeps.CAdvisorInterface,
cm.NodeConfig{
NodeName: nodeName,
RuntimeCgroupsName: s.RuntimeCgroups,
SystemCgroupsName: s.SystemCgroups,
KubeletCgroupsName: s.KubeletCgroups,
Expand Down
1 change: 1 addition & 0 deletions pkg/kubelet/cm/container_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ type ContainerManager interface {
}

type NodeConfig struct {
NodeName types.NodeName
RuntimeCgroupsName string
SystemCgroupsName string
KubeletCgroupsName string
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
// initialize DRA manager
if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.DynamicResourceAllocation) {
klog.InfoS("Creating Dynamic Resource Allocation (DRA) manager")
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir)
cm.draManager, err = dra.NewManagerImpl(kubeClient, nodeConfig.KubeletRootDir, nodeConfig.NodeName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/cm/dra/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ type ManagerImpl struct {
}

// NewManagerImpl creates a new manager.
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string) (*ManagerImpl, error) {
func NewManagerImpl(kubeClient clientset.Interface, stateFileDirectory string, nodeName types.NodeName) (*ManagerImpl, error) {
klog.V(2).InfoS("Creating DRA manager")

claimInfoCache, err := newClaimInfoCache(stateFileDirectory, draManagerStateFileName)
Expand Down
8 changes: 4 additions & 4 deletions pkg/kubelet/cm/dra/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestNewManagerImpl(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory)
manager, err := NewManagerImpl(kubeClient, test.stateFileDirectory, "worker")
if test.wantErr {
assert.Error(t, err)
return
Expand Down Expand Up @@ -287,7 +287,7 @@ func TestGetResources(t *testing.T) {
},
} {
t.Run(test.description, func(t *testing.T) {
manager, err := NewManagerImpl(kubeClient, t.TempDir())
manager, err := NewManagerImpl(kubeClient, t.TempDir(), "worker")
assert.NoError(t, err)

if test.claimInfo != nil {
Expand Down Expand Up @@ -760,7 +760,7 @@ func TestPrepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()

plg := plugin.NewRegistrationHandler()
plg := plugin.NewRegistrationHandler(nil, "worker")
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
Expand Down Expand Up @@ -1060,7 +1060,7 @@ func TestUnprepareResources(t *testing.T) {
}
defer draServerInfo.teardownFn()

plg := plugin.NewRegistrationHandler()
plg := plugin.NewRegistrationHandler(nil, "worker")
if err := plg.RegisterPlugin(test.driverName, draServerInfo.socketName, []string{"1.27"}); err != nil {
t.Fatalf("failed to register plugin %s, err: %v", test.driverName, err)
}
Expand Down
85 changes: 85 additions & 0 deletions pkg/kubelet/cm/dra/plugin/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ func (f *fakeV1alpha3GRPCServer) NodeUnprepareResource(ctx context.Context, in *
return &drapbv1alpha3.NodeUnprepareResourcesResponse{}, nil
}

func (f *fakeV1alpha3GRPCServer) NodeListAndWatchResources(req *drapbv1alpha3.NodeListAndWatchResourcesRequest, srv drapbv1alpha3.Node_NodeListAndWatchResourcesServer) error {
srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{})
srv.Send(&drapbv1alpha3.NodeListAndWatchResourcesResponse{})
return nil
}

type fakeV1alpha2GRPCServer struct {
drapbv1alpha2.UnimplementedNodeServer
}
Expand Down Expand Up @@ -288,3 +294,82 @@ func TestNodeUnprepareResource(t *testing.T) {
})
}
}

func TestListAndWatchResources(t *testing.T) {
for _, test := range []struct {
description string
serverSetup func(string) (string, tearDown, error)
serverVersion string
request *drapbv1alpha3.NodeListAndWatchResourcesRequest
responses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
expectError string
}{
{
description: "server supports NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha3Version,
request: &drapbv1alpha3.NodeListAndWatchResourcesRequest{},
responses: []*drapbv1alpha3.NodeListAndWatchResourcesResponse{
{},
{},
},
expectError: "EOF",
},
{
description: "server doesn't support NodeResources API",
serverSetup: setupFakeGRPCServer,
serverVersion: v1alpha2Version,
request: new(drapbv1alpha3.NodeListAndWatchResourcesRequest),
expectError: "Unimplemented",
},
} {
t.Run(test.description, func(t *testing.T) {
addr, teardown, err := setupFakeGRPCServer(test.serverVersion)
if err != nil {
t.Fatal(err)
}
defer teardown()

p := &plugin{
endpoint: addr,
version: v1alpha3Version,
}

conn, err := p.getOrCreateGRPCConn()
defer func() {
err := conn.Close()
if err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}

draPlugins.add("dummy-plugin", p)
defer draPlugins.delete("dummy-plugin")

client, err := NewDRAPluginClient("dummy-plugin")
if err != nil {
t.Fatal(err)
}

stream, err := client.NodeListAndWatchResources(context.Background(), test.request)
if err != nil {
t.Fatal(err)
}
var actualResponses []*drapbv1alpha3.NodeListAndWatchResourcesResponse
var actualErr error
for {
resp, err := stream.Recv()
if err != nil {
actualErr = err
break
}
actualResponses = append(actualResponses, resp)
}
assert.Equal(t, test.responses, actualResponses)
assert.Contains(t, actualErr.Error(), test.expectError)
})
}
}
Loading

0 comments on commit 0da6af8

Please sign in to comment.