Skip to content

Commit

Permalink
Refactor code to use typed clients and informers
Browse files Browse the repository at this point in the history
  • Loading branch information
vladimirvivien committed Mar 7, 2022
1 parent 24eea62 commit e0ddeb1
Show file tree
Hide file tree
Showing 7 changed files with 179 additions and 296 deletions.
5 changes: 5 additions & 0 deletions cmd/ktop.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ func (o *ktopCmdOptions) runKtop(c *cobra.Command, args []string) error {
fmt.Printf("main: failed to create Kubernetes client: %s\n", err)
os.Exit(1)
}
// Get server version (test server availability before continuing)
if _, err := k8sC.GetServerVersion(); err != nil {
return fmt.Errorf("ktop: %s", err)
}

fmt.Printf("Connected to: %s\n", k8sC.Config().Host)

app := application.New(k8sC)
Expand Down
64 changes: 45 additions & 19 deletions k8s/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/plugin/pkg/client/auth"
_ "k8s.io/client-go/plugin/pkg/client/auth/oidc"
restclient "k8s.io/client-go/rest"
Expand All @@ -22,19 +23,20 @@ import (
const (
AllNamespaces = "*"
)

type Client struct {
namespace string
namespaces []string
config *restclient.Config
dynaClient dynamic.Interface
discoClient discovery.CachedDiscoveryInterface
metricsClient *metricsclient.Clientset
metricsAvailable bool
refreshTimeout time.Duration
controller *Controller
namespace string
config *restclient.Config
kubeClient kubernetes.Interface
dynaClient dynamic.Interface
discoClient discovery.CachedDiscoveryInterface
metricsClient *metricsclient.Clientset
metricsAvailCount int
refreshTimeout time.Duration
controller *Controller
}

func New(flags *genericclioptions.ConfigFlags)(*Client, error) {
func New(flags *genericclioptions.ConfigFlags) (*Client, error) {
if flags == nil {
return nil, fmt.Errorf("configuration flagset is nil")
}
Expand All @@ -44,6 +46,11 @@ func New(flags *genericclioptions.ConfigFlags)(*Client, error) {
return nil, err
}

kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}

dyna, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
Expand All @@ -60,13 +67,14 @@ func New(flags *genericclioptions.ConfigFlags)(*Client, error) {
}

var namespace = *flags.Namespace
if namespace == "" || namespace == "*"{
if namespace == "" || namespace == "*" {
namespace = AllNamespaces
}

client := &Client{
namespace: namespace,
config: config,
kubeClient: kubeClient,
dynaClient: dyna,
discoClient: disco,
metricsClient: metrics,
Expand All @@ -91,7 +99,26 @@ func (k8s *Client) Config() *restclient.Config {
return k8s.config
}

func (k8s *Client) GetServerVersion() (string, error){
version, err := k8s.discoClient.ServerVersion()
if err != nil {
return "", fmt.Errorf("failed to connect to server: %s", err)
}
return version.String(), nil
}

// AssertMetricsAvailable checks for available metrics server every 10th invocation.
// Otherwise, it returns the last known registration state of metrics server.
func (k8s *Client) AssertMetricsAvailable() error {
if k8s.metricsAvailCount != 0 {
if k8s.metricsAvailCount%10 != 0 {
k8s.metricsAvailCount++
} else {
k8s.metricsAvailCount = 0
}
return nil
}

groups, err := k8s.discoClient.ServerGroups()
if err != nil {
return err
Expand All @@ -104,7 +131,6 @@ func (k8s *Client) AssertMetricsAvailable() error {
}
}

k8s.metricsAvailable = avail
if !avail {
return fmt.Errorf("metrics api not available")
}
Expand All @@ -113,8 +139,8 @@ func (k8s *Client) AssertMetricsAvailable() error {

// GetNodeMetrics returns metrics for specified node
func (k8s *Client) GetNodeMetrics(ctx context.Context, nodeName string) (*metricsV1beta1.NodeMetrics, error) {
if !k8s.metricsAvailable {
return nil, fmt.Errorf("metrics api not available")
if err := k8s.AssertMetricsAvailable(); err != nil {
return nil, fmt.Errorf("node metrics: %s", err)
}

metrics, err := k8s.metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, nodeName, metav1.GetOptions{})
Expand All @@ -126,9 +152,9 @@ func (k8s *Client) GetNodeMetrics(ctx context.Context, nodeName string) (*metric
}

// GetPodMetricsByName returns metrics for specified pod
func (k8s *Client) GetPodMetricsByName(ctx context.Context, pod coreV1.Pod) (*metricsV1beta1.PodMetrics, error) {
if !k8s.metricsAvailable {
return nil, fmt.Errorf("metrics api not available")
func (k8s *Client) GetPodMetricsByName(ctx context.Context, pod *coreV1.Pod) (*metricsV1beta1.PodMetrics, error) {
if err := k8s.AssertMetricsAvailable(); err != nil {
return nil, fmt.Errorf("pod metrics by name: %s", err)
}

metrics, err := k8s.metricsClient.MetricsV1beta1().PodMetricses(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
Expand All @@ -140,8 +166,8 @@ func (k8s *Client) GetPodMetricsByName(ctx context.Context, pod coreV1.Pod) (*me
}

func (k8s *Client) GetAllPodMetrics(ctx context.Context) ([]metricsV1beta1.PodMetrics, error) {
if !k8s.metricsAvailable {
return nil, fmt.Errorf("metrics api not available")
if err := k8s.AssertMetricsAvailable(); err != nil {
return nil, fmt.Errorf("pod metrics: %s", err)
}

metricsList, err := k8s.metricsClient.MetricsV1beta1().PodMetricses(k8s.namespace).List(ctx, metav1.ListOptions{})
Expand Down
116 changes: 81 additions & 35 deletions k8s/client_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
appsV1 "k8s.io/api/apps/v1"
batchV1 "k8s.io/api/batch/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/informers"
appsV1Informers "k8s.io/client-go/informers/apps/v1"
batchV1Informers "k8s.io/client-go/informers/batch/v1"
coreV1Informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)

var (
Expand All @@ -35,25 +38,26 @@ type RefreshPodsFunc func(ctx context.Context, items []model.PodModel) error
type RefreshSummaryFunc func(ctx context.Context, items model.ClusterSummary) error

type Controller struct {
client *Client
namespaceInformer informers.GenericInformer
nodeInformer informers.GenericInformer
nodeRefreshFunc RefreshNodesFunc
podInformer informers.GenericInformer
podRefreshFunc RefreshPodsFunc

deploymentInformer informers.GenericInformer
daemonSetInformer informers.GenericInformer
replicaSetInformer informers.GenericInformer
statefulSetInformer informers.GenericInformer

jobInformer informers.GenericInformer
cronJobInformer informers.GenericInformer

pvInformer informers.GenericInformer
pvcInformer informers.GenericInformer
summaryRefreshFunc RefreshSummaryFunc
client *Client

informer informers.SharedInformerFactory
namespaceInformer coreV1Informers.NamespaceInformer
nodeInformer coreV1Informers.NodeInformer
podInformer coreV1Informers.PodInformer
pvInformer coreV1Informers.PersistentVolumeInformer
pvcInformer coreV1Informers.PersistentVolumeClaimInformer

jobInformer batchV1Informers.JobInformer
cronJobInformer batchV1Informers.CronJobInformer

deploymentInformer appsV1Informers.DeploymentInformer
daemonSetInformer appsV1Informers.DaemonSetInformer
replicaSetInformer appsV1Informers.ReplicaSetInformer
statefulSetInformer appsV1Informers.StatefulSetInformer

nodeRefreshFunc RefreshNodesFunc
podRefreshFunc RefreshPodsFunc
summaryRefreshFunc RefreshSummaryFunc
}

func newController(client *Client) *Controller {
Expand All @@ -80,25 +84,67 @@ func (c *Controller) Start(ctx context.Context, resync time.Duration) error {
return errors.New("context cannot be nil")
}

factory := dynamicinformer.NewDynamicSharedInformerFactory(c.client.dynaClient, resync)
c.namespaceInformer = factory.ForResource(GVRs["namespaces"])
c.nodeInformer = factory.ForResource(GVRs["nodes"])
c.podInformer = factory.ForResource(GVRs["pods"])
c.deploymentInformer = factory.ForResource(GVRs["deployments"])
c.daemonSetInformer = factory.ForResource(GVRs["daemonsets"])
c.replicaSetInformer = factory.ForResource(GVRs["replicasets"])
c.statefulSetInformer = factory.ForResource(GVRs["statefulsets"])
c.jobInformer = factory.ForResource(GVRs["jobs"])
c.cronJobInformer = factory.ForResource(GVRs["cronjobs"])
c.pvInformer = factory.ForResource(GVRs["persistentvolumes"])
c.pvcInformer = factory.ForResource(GVRs["persistentvolumeclaims"])
var factory informers.SharedInformerFactory
if c.client.namespace == AllNamespaces {
factory = informers.NewSharedInformerFactory(c.client.kubeClient, resync)
} else {
factory = informers.NewSharedInformerFactoryWithOptions(c.client.kubeClient, resync, informers.WithNamespace(c.client.namespace))
}

// NOTE: the followings captures each informer
// and also calls Informer() method to register the cached type.
// Call to Informer() must happen before factory.Star() or it hangs.

// core/V1 informers
coreInformers := factory.Core().V1()
c.namespaceInformer = coreInformers.Namespaces()
namespaceHasSynced := c.namespaceInformer.Informer().HasSynced
c.nodeInformer = coreInformers.Nodes()
nodeHasSynced := c.nodeInformer.Informer().HasSynced
c.podInformer = coreInformers.Pods()
podHasSynced := c.podInformer.Informer().HasSynced
c.pvInformer = coreInformers.PersistentVolumes()
pvHasSynced := c.pvInformer.Informer().HasSynced
c.pvcInformer = coreInformers.PersistentVolumeClaims()
pvcHasSynced := c.pvcInformer.Informer().HasSynced

// Apps/v1 Informers
appsInformers := factory.Apps().V1()
c.deploymentInformer = appsInformers.Deployments()
deploymentHasSynced := c.deploymentInformer.Informer().HasSynced
c.daemonSetInformer = appsInformers.DaemonSets()
daemonsetHasSynced := c.daemonSetInformer.Informer().HasSynced
c.replicaSetInformer = appsInformers.ReplicaSets()
replicasetHasSynced := c.replicaSetInformer.Informer().HasSynced
c.statefulSetInformer = appsInformers.StatefulSets()
statefulsetHasSynced := c.statefulSetInformer.Informer().HasSynced

// Batch informers
batchInformers := factory.Batch().V1()
c.jobInformer = batchInformers.Jobs()
jobHasSynced := c.jobInformer.Informer().HasSynced
c.cronJobInformer = batchInformers.CronJobs()
cronJobHasSynced := c.cronJobInformer.Informer().HasSynced

factory.Start(ctx.Done())
for name, gvr := range GVRs {
if synced := factory.WaitForCacheSync(ctx.Done()); !synced[gvr] {
return fmt.Errorf("resource not synced: %s", name)
}

ok := cache.WaitForCacheSync(ctx.Done(),
namespaceHasSynced,
nodeHasSynced,
podHasSynced,
pvHasSynced,
pvcHasSynced,
deploymentHasSynced,
daemonsetHasSynced,
replicasetHasSynced,
statefulsetHasSynced,
jobHasSynced,
cronJobHasSynced,
)
if !ok {
return fmt.Errorf("resource failed to sync: %s")
}

c.setupSummaryHandler(ctx, c.summaryRefreshFunc)
c.setupNodeHandler(ctx, c.nodeRefreshFunc)
c.installPodsHandler(ctx, c.podRefreshFunc)
Expand Down

0 comments on commit e0ddeb1

Please sign in to comment.