Skip to content

Commit

Permalink
feat: support fieldSelector
Browse files Browse the repository at this point in the history
Signed-off-by: jwcesign <jwcesign@gmail.com>
  • Loading branch information
jwcesign committed May 27, 2023
1 parent ae6c345 commit 4cd3d3f
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 57 deletions.
2 changes: 1 addition & 1 deletion pkg/metricsadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (m *MetricsController) updateCluster(oldObj, curObj interface{}) {
m.queue.Add(curCluster.GetName())
}

if util.IsClusterSpecPartCriticalUpdated(curCluster.Spec, oldCluster.Spec) ||
if util.ClusterAccessCredentialChanged(curCluster.Spec, oldCluster.Spec) ||
util.IsClusterReady(&curCluster.Status) != util.IsClusterReady(&oldCluster.Status) {
// Cluster.Spec or Cluster health state is changed, rebuild informer.
m.InformerManager.Stop(curCluster.GetName())
Expand Down
282 changes: 228 additions & 54 deletions pkg/metricsadapter/provider/resourcemetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -25,6 +24,9 @@ const (
// labelSelectorAnnotationInternal is the annotation used internal in karmada-metrics-adapter,
// to record the selector specified by the user
labelSelectorAnnotationInternal = "internal.karmada.io/selector"
// namespaceSpecifiedAnnotation is the annotation used in karmada-metrics-adapter,
// to record the namespace specified by the user
namespaceSpecifiedAnnotation = "internal.karmada.io/namespace"
)

var (
Expand Down Expand Up @@ -52,11 +54,12 @@ type ResourceMetricsProvider struct {

// NewResourceMetricsProvider creates a new resource metrics provider
func NewResourceMetricsProvider(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *ResourceMetricsProvider {

return &ResourceMetricsProvider{
clusterLister: clusterLister,
informerManager: informerManager,
PodLister: NewPodLister(),
NodeLister: NewNodeLister(),
PodLister: NewPodLister(clusterLister, informerManager),
NodeLister: NewNodeLister(clusterLister, informerManager),
}
}

Expand Down Expand Up @@ -286,125 +289,291 @@ func (r *ResourceMetricsProvider) queryNodeMetricsBySelector(selector string) ([

// GetPodMetrics queries metrics by the internal constructed pod
func (r *ResourceMetricsProvider) GetPodMetrics(pods ...*metav1.PartialObjectMetadata) ([]metrics.PodMetrics, error) {
var podMetrics []metrics.PodMetrics
var ret []metrics.PodMetrics
if len(pods) == 0 {
return ret, nil
}

// In the previous step, we construct pods list with only one element.
if len(pods) != 1 {
return podMetrics, nil
podsKeyMap := make(map[string]struct{})
for _, pod := range pods {
podKey := generateNamespaceNameKey(pod.Namespace, pod.Name)
podsKeyMap[podKey] = struct{}{}
}

// In the previous step, if query with label selector, the name will be set to empty
if pods[0].Name == "" {
namespace := pods[0].Namespace
var queryData []metrics.PodMetrics
var err error
// In the previous step, we construct the annotations, so it couldn't be nil
if _, ok := pods[0].Annotations[labelSelectorAnnotationInternal]; ok {
namespace := pods[0].Annotations[namespaceSpecifiedAnnotation]
selectorStr := pods[0].Annotations[labelSelectorAnnotationInternal]
return r.queryPodMetricsBySelector(selectorStr, namespace)
queryData, err = r.queryPodMetricsBySelector(selectorStr, namespace)
} else {
queryData, err = r.queryPodMetricsByName(pods[0].Name, pods[0].Namespace)
}

if err != nil {
return nil, err
}

return r.queryPodMetricsByName(pods[0].Name, pods[0].Namespace)
for _, i := range queryData {
podKey := generateNamespaceNameKey(i.Namespace, i.Name)
if _, ok := podsKeyMap[podKey]; ok {
ret = append(ret, i)
}
}

return ret, nil
}

// GetNodeMetrics queries metrics by the internal constructed node
func (r *ResourceMetricsProvider) GetNodeMetrics(nodes ...*corev1.Node) ([]metrics.NodeMetrics, error) {
var nodeMetrics []metrics.NodeMetrics
var ret []metrics.NodeMetrics
if len(nodes) == 0 {
return ret, nil
}

// In the previous step, we construct node list with only one element, this should never happen
if len(nodes) != 1 {
// never reach here
return nodeMetrics, nil
nodesKeyMap := make(map[string]struct{})
for _, node := range nodes {
nodKey := generateNamespaceNameKey(node.Namespace, node.Name)
nodesKeyMap[nodKey] = struct{}{}
}

// In the previous step, if query with label selector, the name will be set to empty
if nodes[0].Name == "" {
var queryData []metrics.NodeMetrics
var err error
// In the previous step, we construct the annotations, so it couldn't be nil
if _, ok := nodes[0].Annotations[labelSelectorAnnotationInternal]; ok {
selectorStr := nodes[0].Annotations[labelSelectorAnnotationInternal]
return r.queryNodeMetricsBySelector(selectorStr)
queryData, err = r.queryNodeMetricsBySelector(selectorStr)
} else {
queryData, err = r.queryNodeMetricsByName(nodes[0].Name)
}

return r.queryNodeMetricsByName(nodes[0].Name)
if err != nil {
return nil, err
}

for _, i := range queryData {
nodKey := generateNamespaceNameKey(i.Namespace, i.Name)
if _, ok := nodesKeyMap[nodKey]; ok {
ret = append(ret, i)
}
}

return ret, nil
}

// PodLister is an internal lister for pods
type PodLister struct {
namespaceSpecified string
clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
}

// NewPodLister creates an internal new PodLister
func NewPodLister() *PodLister {
return &PodLister{}
func NewPodLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *PodLister {
return &PodLister{
clusterLister: clusterLister,
informerManager: informerManager,
}
}

// List returns the internal constructed pod with label selector info
func (p *PodLister) List(selector labels.Selector) (ret []runtime.Object, err error) {
klog.V(4).Infof("List query pods with selector: %v", selector.String())

podData := &v1.PartialObjectMetadata{
TypeMeta: v1.TypeMeta{},
ObjectMeta: v1.ObjectMeta{
Namespace: p.namespaceSpecified,
Annotations: map[string]string{
labelSelectorAnnotationInternal: selector.String(),
},
},
clusters, err := p.clusterLister.List(labels.Everything())
if err != nil {
return nil, err
}

return []runtime.Object{podData}, nil
for _, cluster := range clusters {
sci := p.informerManager.GetSingleClusterManager(cluster.Name)
if sci == nil {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
pods, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).List(selector)
if err != nil {
klog.Infof("Failed to list pods from cluster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err)
return nil, err
}
for _, pod := range pods {
podTyped := &corev1.Pod{}
err = helper.ConvertToTypedObject(pod, podTyped)
if err != nil {
klog.Infof("Failed to convert to typed object: %v", err)
return nil, err
}
podPartial := p.convertToPodPartialData(podTyped, selector.String(), true)
ret = append(ret, podPartial)
}
}

return ret, nil
}

// convertToPodPartialData converts pod to partial data
func (p *PodLister) convertToPodPartialData(pod *corev1.Pod, selector string, labelSelector bool) *metav1.PartialObjectMetadata {
ret := &metav1.PartialObjectMetadata{
TypeMeta: pod.TypeMeta,
ObjectMeta: pod.ObjectMeta,
}
if ret.Annotations == nil {
ret.Annotations = map[string]string{}
}

//If user sets this annotation, we need to remove it to avoid parsing wrong next.
if !labelSelector {
delete(ret.Annotations, namespaceSpecifiedAnnotation)
delete(ret.Annotations, labelSelectorAnnotationInternal)
return ret
}
ret.Annotations[labelSelectorAnnotationInternal] = selector
ret.Annotations[namespaceSpecifiedAnnotation] = p.namespaceSpecified

return ret
}

// Get returns the internal constructed pod with name info
func (p *PodLister) Get(name string) (runtime.Object, error) {
klog.V(4).Infof("Query pod in namespace(%s) with name:%s", p.namespaceSpecified, name)

podData := &v1.PartialObjectMetadata{
TypeMeta: v1.TypeMeta{},
ObjectMeta: v1.ObjectMeta{
Name: name,
Namespace: p.namespaceSpecified,
},
clusters, err := p.clusterLister.List(labels.Everything())
if err != nil {
return nil, err
}
return podData, nil

for _, cluster := range clusters {
sci := p.informerManager.GetSingleClusterManager(cluster.Name)
if sci == nil {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
pod, err := sci.Lister(PodsGVR).ByNamespace(p.namespaceSpecified).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
klog.Infof("Failed to get pod from clsuster(%s) in namespace(%s): %v", cluster.Name, p.namespaceSpecified, err)
}
continue
}
// We only return the first matched cluster
podTyped := &corev1.Pod{}
err = helper.ConvertToTypedObject(pod, podTyped)
if err != nil {
klog.Infof("Failed to convert to typed object: %v", err)
return nil, err
}
return p.convertToPodPartialData(podTyped, "", false), nil
}

return nil, errors.NewNotFound(PodsGVR.GroupResource(), name)
}

// ByNamespace returns the pod lister with namespace info
func (p *PodLister) ByNamespace(namespace string) cache.GenericNamespaceLister {
klog.V(4).Infof("Query Pods in namespace: %s", namespace)

listerCopy := &PodLister{}
listerCopy := &PodLister{
clusterLister: p.clusterLister,
informerManager: p.informerManager,
}
listerCopy.namespaceSpecified = namespace
return listerCopy
}

// NodeLister is an internal lister for nodes
type NodeLister struct {
clusterLister clusterlister.ClusterLister
informerManager genericmanager.MultiClusterInformerManager
}

// NewNodeLister creates an internal new NodeLister
func NewNodeLister() *NodeLister {
return &NodeLister{}
func NewNodeLister(clusterLister clusterlister.ClusterLister, informerManager genericmanager.MultiClusterInformerManager) *NodeLister {
return &NodeLister{
clusterLister: clusterLister,
informerManager: informerManager,
}
}

// List returns the internal constructed node with label selector info
func (n *NodeLister) List(selector labels.Selector) (ret []*corev1.Node, err error) {
klog.V(4).Infof("Query node metrics with selector: %s", selector.String())

node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
labelSelectorAnnotationInternal: selector.String(),
},
},
clusters, err := n.clusterLister.List(labels.Everything())
if err != nil {
return nil, err
}

for _, cluster := range clusters {
sci := n.informerManager.GetSingleClusterManager(cluster.Name)
if sci == nil {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
nodes, err := sci.Lister(NodesGVR).List(selector)
if err != nil {
klog.Infof("Failed to list nodes from cluster(%s): %v", cluster.Name, err)
return nil, err
}
for index := range nodes {
nodeTyped := &corev1.Node{}
err = helper.ConvertToTypedObject(nodes[index], nodeTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
if nodeTyped.Annotations == nil {
nodeTyped.Annotations = map[string]string{}
}

//If user sets this annotation, we need to remove it to avoid parsing wrong next.
nodeTyped.Annotations[labelSelectorAnnotationInternal] = selector.String()
ret = append(ret, nodeTyped)
}
}
return []*corev1.Node{node}, nil

return ret, nil
}

// Get returns the internal constructed node with name info
func (n *NodeLister) Get(name string) (*corev1.Node, error) {
klog.V(4).Infof("Query node metrics with name:%s", name)

node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
clusters, err := n.clusterLister.List(labels.Everything())
if err != nil {
return nil, err
}

for _, cluster := range clusters {
sci := n.informerManager.GetSingleClusterManager(cluster.Name)
if sci == nil {
klog.Errorf("Failed to get SingleClusterInformerManager for cluster(%s)", cluster.Name)
continue
}
node, err := sci.Lister(NodesGVR).Get(name)
if err != nil {
if !errors.IsNotFound(err) {
klog.Infof("Failed to get node from cluster(%s):%v", cluster.Name, err)
}
continue
}
// We only return the first matched cluster
nodeTyped := &corev1.Node{}
err = helper.ConvertToTypedObject(node, nodeTyped)
if err != nil {
klog.Errorf("Failed to convert to typed object: %v", err)
return nil, err
}
if nodeTyped.Annotations == nil {
nodeTyped.Annotations = map[string]string{}
}

//If user sets this annotation, we need to remove it to avoid parsing wrong next.
delete(nodeTyped.Annotations, labelSelectorAnnotationInternal)
return nodeTyped, nil
}
return node, nil

return nil, errors.NewNotFound(NodesGVR.GroupResource(), name)
}

// metricsConvertV1beta1PodToInternalPod converts metricsv1beta1.PodMetrics to metrics.PodMetrics
Expand Down Expand Up @@ -460,3 +629,8 @@ func metricsConvertV1beta1NodeToInternalNode(objs ...unstructured.Unstructured)

return nodeMetricsInternal, nil
}

// generateNamespaceNameKey generates namespace/name key
func generateNamespaceNameKey(namespace, name string) string {
return namespace + "/" + name
}
Loading

0 comments on commit 4cd3d3f

Please sign in to comment.