Skip to content

Commit

Permalink
Merge pull request #31 from ahinvinith/kubviz
Browse files Browse the repository at this point in the history
added new features #23 outdated #24 kubepug # ketall
  • Loading branch information
jebinjeb committed Apr 29, 2023
2 parents cd67052 + 29f0141 commit 2d8758c
Show file tree
Hide file tree
Showing 13 changed files with 2,240 additions and 406 deletions.
85 changes: 58 additions & 27 deletions agent/k8smetrics_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"os"
"strconv"
"strings"
"sync"
"time"

"github.com/kube-tarian/kubviz/model"
Expand Down Expand Up @@ -34,50 +35,92 @@ import (
"k8s.io/client-go/tools/cache"
)

// constants for jetstream
const (
streamName = "METRICS"
streamSubjects = "METRICS.*"
eventSubject = "METRICS.event"
allSubject = "METRICS.all"
)

// to read the token from env variables
// env variables for getting
// nats token, natsurl, clustername
var (
ClusterName = os.Getenv("CLUSTER_NAME")
ClusterName string = os.Getenv("CLUSTER_NAME")
token string = os.Getenv("NATS_TOKEN")
natsurl string = os.Getenv("NATS_ADDRESS")
)

func main() {

// Connect to NATS
// error channels declared for the go routines
outdatedErrChan := make(chan error, 1)
kubePreUpgradeChan := make(chan error, 1)
getAllResourceChan := make(chan error, 1)
clusterMetricsChan := make(chan error, 1)
var wg sync.WaitGroup
// waiting for 4 go routines
wg.Add(4)
// connecting with nats ...
nc, err := nats.Connect(natsurl, nats.Name("K8s Metrics"), nats.Token(token))
checkErr(err)
// Creates JetStreamContext
// creating a jetstream connection using the nats connection
js, err := nc.JetStream()
checkErr(err)
// Creates stream
// creating a stream with stream name METRICS
err = createStream(js)
checkErr(err)
// Create pull METRICS and publish them to nats JetStream
// getting kubernetes clientset
clientset := getK8sClient()
//getK8sEvents(clientset)
err = publishMetrics(clientset, js)
checkErr(err)
// starting all the go routines
go outDatedImages(js, &wg, outdatedErrChan)
go KubePreUpgradeDetector(js, &wg, kubePreUpgradeChan)
go GetAllResources(js, &wg, getAllResourceChan)
getK8sEvents(clientset)
go publishMetrics(clientset, js, &wg, clusterMetricsChan)
wg.Wait()
// once the go routines completes we will close the error channels
close(outdatedErrChan)
close(kubePreUpgradeChan)
close(getAllResourceChan)
close(clusterMetricsChan)
// for loop will wait for the error channels
// logs if any error occurs
for {
select {
case err := <-outdatedErrChan:
if err != nil {
log.Println(err)
}
case err := <-kubePreUpgradeChan:
if err != nil {
log.Println(err)
}
case err := <-getAllResourceChan:
if err != nil {
log.Println(err)
}
case err := <-clusterMetricsChan:
if err != nil {
log.Println(err)
}
}
}

}

// publishMetrics publishes stream of events
// with subject "METRICS.created"
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext) error {
// //Publish Nodes data
func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
//Publish Nodes data
// for i := 1; i <= 10; i++ {
// shouldReturn, returnValue := publishK8sMetrics(i, "Node", getK8sNodes(clientset), js)
// if shouldReturn {
// return returnValue
// }
// time.Sleep(100 * time.Millisecond)
// }
// //Publish Pods data
//Publish Pods data
// for i := 1; i <= 10; i++ {

// shouldReturn, returnValue := publishK8sMetrics(i, "Pod", getK8sPods(clientset), js)
Expand All @@ -90,7 +133,7 @@ func publishMetrics(clientset *kubernetes.Clientset, js nats.JetStreamContext) e
//publishK8sMetrics(1, "Event", getK8sEvents(clientset), js)
watchK8sEvents(clientset, js)

return nil
errCh <- nil
}

func publishK8sMetrics(id string, mtype string, mdata *v1.Event, js nats.JetStreamContext) (bool, error) {
Expand Down Expand Up @@ -126,22 +169,10 @@ func createStream(js nats.JetStreamContext) error {
checkErr(err)
}
return nil

}

func getK8sClient() *kubernetes.Clientset {

// var kubeconfig *string
// if home := homedir.HomeDir(); home != "" {
// kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
// } else {
// kubeconfig = flag.String("kubeconfig", "", "/Users/avikn/Documents/kubeconfig/167")
// }
// flag.Parse()

// // use the current context in kubeconfig
// config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
// checkErr(err)

// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
Expand Down
110 changes: 110 additions & 0 deletions agent/ketall.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package main

import (
"context"
"encoding/json"
"path/filepath"
"sync"
"time"

"github.com/kube-tarian/kubviz/model"
"github.com/nats-io/nats.go"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)

const (
eventSubject_getall_resource = "METRICS.ketall"
)

func PublishAllResources(result model.Resource, js nats.JetStreamContext) error {
metrics := result
metrics.ClusterName = ClusterName
metricsJson, _ := json.Marshal(metrics)
_, err := js.Publish(eventSubject_getall_resource, metricsJson)
if err != nil {
return err
}
log.Printf("Metrics with resource %s in the %s namespace has been published", result.Resource, result.Namespace)
return nil
}

func GetAllResources(js nats.JetStreamContext, wg *sync.WaitGroup, errCh chan error) {
defer wg.Done()
// TODO:should be removed after testing
var kubeconfig string
if home := homedir.HomeDir(); home != "" {
kubeconfig = filepath.Join(home, ".kube", "dev-config")
}

// Build the configuration from the kubeconfig file
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
log.Error(err)
errCh <- err
}
// TODO:upto here delete
// TODO:Production code:
// config, err := rest.InClusterConfig()
// if err != nil {
// panic(err.Error())
// }
// TODO: upto this uncomment for production
// Create a new discovery client to discover all resources in the cluster
dc := discovery.NewDiscoveryClientForConfigOrDie(config)

// Create a new dynamic client to list resources in the cluster
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
log.Error(err)
errCh <- err
}
// Get a list of all available API groups and versions in the cluster
resourceLists, err := dc.ServerPreferredResources()
if err != nil {
log.Error(err)
errCh <- err
}
gvrs, err := discovery.GroupVersionResources(resourceLists)
if err != nil {
panic(err)
errCh <- err
}
// Iterate over all available API groups and versions and list all resources in each group
for gvr := range gvrs {
// List all resources in the group
list, err := dynamicClient.Resource(gvr).Namespace("").List(context.Background(), metav1.ListOptions{})
if err != nil {
// fmt.Printf("Error listing %s: %v\n", gvr.String(), err)
continue
}

for _, item := range list.Items {
age := time.Since(item.GetCreationTimestamp().Time).Round(time.Second).String()
var resource model.Resource
if item.GetNamespace() == "" {
resource = model.Resource{
Resource: item.GetName(),
Namespace: "Default",
Age: age,
}
} else {
resource = model.Resource{
Resource: item.GetName(),
Namespace: item.GetNamespace(),
Age: age,
}

}
err := PublishAllResources(resource, js)
if err != nil {
errCh <- err
}
}
}
errCh <- nil
}
Loading

0 comments on commit 2d8758c

Please sign in to comment.