Skip to content

Commit

Permalink
switch to kubelet plugin watcher mode
Browse files Browse the repository at this point in the history
  • Loading branch information
zshi-redhat committed Mar 28, 2019
1 parent ce0274a commit be8d74d
Show file tree
Hide file tree
Showing 6 changed files with 82 additions and 23 deletions.
2 changes: 2 additions & 0 deletions cmd/sriovdp/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ func main() {
flag.Parse()
rm := newResourceManager(cp)

rm.detectPluginWatchMode()

glog.Infof("resource manager reading configs")
if err := rm.readConfig(); err != nil {
glog.Errorf("error getting resources from file %v", err)
Expand Down
23 changes: 21 additions & 2 deletions cmd/sriovdp/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"

"github.com/golang/glog"

Expand All @@ -26,6 +27,10 @@ import (
"github.com/intel/sriov-network-device-plugin/pkg/utils"
)

var (
pluginWatch bool
)

const (
socketSuffix = "sock"
)
Expand All @@ -46,8 +51,20 @@ type resourceManager struct {
func newResourceManager(cp *cliParams) *resourceManager {
return &resourceManager{
cliParams: *cp,
rFactory: resources.NewResourceFactory(cp.resourcePrefix, socketSuffix),
rFactory: resources.NewResourceFactory(cp.resourcePrefix, socketSuffix, pluginWatch),
}
}

// Detect whether Plugin Watch mode is supported
func (rm *resourceManager) detectPluginWatchMode() {
if _, err := os.Stat(types.SockDir); err != nil {
glog.Infof("Using Deprecated Devie Plugin Registry Path")
pluginWatch = false
} else {
glog.Infof("Using Kubelet Plugin Registry Mode")
pluginWatch = true
}
return
}

// Read and validate configurations from Config file
Expand Down Expand Up @@ -100,7 +117,9 @@ func (rm *resourceManager) startAllServers() error {
}

// start watcher
go rs.Watch()
if !pluginWatch {
go rs.Watch()
}
}
return nil
}
Expand Down
1 change: 1 addition & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import:
- package: k8s.io/kubernetes
subpackages:
- pkg/kubelet/apis/deviceplugin/v1beta1
- pkg/kubelet/apis/pluginregistration/v1
testImport:
- package: github.com/onsi/ginkgo
subpackages:
Expand Down
6 changes: 4 additions & 2 deletions pkg/resources/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,19 @@ import (
type resourceFactory struct {
endPointPrefix string
endPointSuffix string
pluginWatch bool
}

var instance *resourceFactory

// NewResourceFactory returns an instance of Resource Server factory
func NewResourceFactory(prefix, suffix string) types.ResourceFactory {
func NewResourceFactory(prefix, suffix string, pluginWatch bool) types.ResourceFactory {

if instance == nil {
return &resourceFactory{
endPointPrefix: prefix,
endPointSuffix: suffix,
pluginWatch: pluginWatch,
}
}
return instance
Expand All @@ -43,7 +45,7 @@ func NewResourceFactory(prefix, suffix string) types.ResourceFactory {
// GetResourceServer returns an instance of ResourceServer for a ResourcePool
func (rf *resourceFactory) GetResourceServer(rp types.ResourcePool) (types.ResourceServer, error) {
if rp != nil {
return newResourceServer(rf.endPointPrefix, rf.endPointSuffix, rp), nil
return newResourceServer(rf.endPointPrefix, rf.endPointSuffix, rf.pluginWatch, rp), nil
}
return nil, fmt.Errorf("factory: unable to get resource pool object")
}
Expand Down
69 changes: 51 additions & 18 deletions pkg/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,14 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
)

type resourceServer struct {
resourcePool types.ResourcePool
pluginWatch bool
endPoint string // Socket file
sockPath string // Socket file path
resourceNamePrefix string
grpcServer *grpc.Server
termSignal chan bool
Expand All @@ -40,11 +43,17 @@ type resourceServer struct {
checkIntervals int // health check intervals in seconds
}

func newResourceServer(prefix, suffix string, rp types.ResourcePool) types.ResourceServer {
func newResourceServer(prefix, suffix string, pluginWatch bool, rp types.ResourcePool) types.ResourceServer {
sockName := fmt.Sprintf("%s.%s", rp.GetResourceName(), suffix)
sockPath := filepath.Join(types.SockDir, sockName)
if !pluginWatch {
sockPath = filepath.Join(types.DeprecatedSockDir, sockName)
}
return &resourceServer{
resourcePool: rp,
pluginWatch: pluginWatch,
endPoint: sockName,
sockPath: sockPath,
resourceNamePrefix: prefix,
grpcServer: grpc.NewServer(),
termSignal: make(chan bool, 1),
Expand All @@ -55,7 +64,7 @@ func newResourceServer(prefix, suffix string, rp types.ResourcePool) types.Resou
}

func (rs *resourceServer) register() error {
kubeletEndpoint := filepath.Join(types.SockDir, types.KubeEndPoint)
kubeletEndpoint := filepath.Join(types.DeprecatedSockDir, types.KubeEndPoint)
conn, err := grpc.Dial(kubeletEndpoint, grpc.WithInsecure(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
Expand All @@ -81,6 +90,26 @@ func (rs *resourceServer) register() error {
return nil
}

func (rs *resourceServer) GetInfo(ctx context.Context, rqt *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
pluginInfoResponse := &registerapi.PluginInfo{
Type: registerapi.DevicePlugin,
Name: fmt.Sprintf("%s/%s", rs.resourceNamePrefix, rs.resourcePool.GetResourceName()),
Endpoint: filepath.Join(types.SockDir, rs.endPoint),
SupportedVersions: []string{"v1alpha1", "v1beta1"},
}
return pluginInfoResponse, nil
}

func (rs *resourceServer) NotifyRegistrationStatus(ctx context.Context, regstat *registerapi.RegistrationStatus) (*registerapi.RegistrationStatusResponse, error) {
if regstat.PluginRegistered {
glog.Infof("Plugin: %s gets registered successfully at Kubelet\n", rs.endPoint)
} else {
glog.Infof("Plugin: %s failed to be registered at Kubelet: %v; restarting.\n", rs.endPoint, regstat.Error)
rs.grpcServer.Stop()
}
return &registerapi.RegistrationStatusResponse{}, nil
}

func (rs *resourceServer) Allocate(ctx context.Context, rqt *pluginapi.AllocateRequest) (*pluginapi.AllocateResponse, error) {
glog.Infof("Allocate() called with %+v", rqt)
resp := new(pluginapi.AllocateResponse)
Expand Down Expand Up @@ -164,19 +193,21 @@ func (rs *resourceServer) Start() error {
resourceName := rs.resourcePool.GetResourceName()
_ = rs.cleanUp() // try tp clean up and continue
glog.Infof("starting %s device plugin endpoint at: %s\n", resourceName, rs.endPoint)
sockPath := filepath.Join(types.SockDir, rs.endPoint)
lis, err := net.Listen("unix", sockPath)
lis, err := net.Listen("unix", rs.sockPath)
if err != nil {
glog.Errorf("error starting %s device plugin endpoint: %v", resourceName, err)
return err
}

// Register all services
if rs.pluginWatch {
registerapi.RegisterRegistrationServer(rs.grpcServer, rs)
}
pluginapi.RegisterDevicePluginServer(rs.grpcServer, rs)

go rs.grpcServer.Serve(lis)
// Wait for server to start by launching a blocking connection
conn, err := grpc.Dial(sockPath, grpc.WithInsecure(), grpc.WithBlock(),
conn, err := grpc.Dial(rs.sockPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithTimeout(5*time.Second),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
Expand All @@ -192,14 +223,17 @@ func (rs *resourceServer) Start() error {

rs.triggerUpdate()

// Register with Kubelet.
err = rs.register()
if err != nil {
// Stop server
rs.grpcServer.Stop()
glog.Fatal(err)
return err
if !rs.pluginWatch {
// Register with Kubelet.
err = rs.register()
if err != nil {
// Stop server
rs.grpcServer.Stop()
glog.Fatal(err)
return err
}
}

return nil
}

Expand All @@ -226,7 +260,9 @@ func (rs *resourceServer) Stop() error {
}
// Send terminate signal to ListAndWatch()
rs.termSignal <- true
rs.stopWatcher <- true
if !rs.pluginWatch {
rs.stopWatcher <- true
}

rs.grpcServer.Stop()
rs.grpcServer = nil
Expand All @@ -236,15 +272,14 @@ func (rs *resourceServer) Stop() error {

func (rs *resourceServer) Watch() {
// Watch for socket file; if not present restart server
sockPath := filepath.Join(types.SockDir, rs.endPoint)
for {
select {
case stop := <-rs.stopWatcher:
if stop {
return
}
default:
_, err := os.Lstat(sockPath)
_, err := os.Lstat(rs.sockPath)
if err != nil {
// Socket file not found; restart server
glog.Warningf("server endpoint not found %s", rs.endPoint)
Expand All @@ -253,16 +288,14 @@ func (rs *resourceServer) Watch() {
glog.Fatalf("unable to restart server %v", err)
}
}

}
// Sleep for some intervals; TODO: investigate on suggested interval
time.Sleep(time.Second * time.Duration(5))
}
}

func (rs *resourceServer) cleanUp() error {
sockPath := filepath.Join(types.SockDir, rs.endPoint)
if err := os.Remove(sockPath); err != nil && !os.IsNotExist(err) {
if err := os.Remove(rs.sockPath); err != nil && !os.IsNotExist(err) {
return err
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

const (
// SockDir is the default Kubelet device plugin socket directory
SockDir = "/var/lib/kubelet/device-plugins"
SockDir = "/var/lib/kubelet/plugins_registry"
// DeprecatedSockDir is the deprecated Kubelet device plugin socket directory
DeprecatedSockDir = "/var/lib/kubelet/device-plugins"
// KubeEndPoint is kubelet socket name
KubeEndPoint = "kubelet.sock"
)
Expand Down

0 comments on commit be8d74d

Please sign in to comment.