Skip to content

Commit

Permalink
Implementation of kube_capture starlark function
Browse files Browse the repository at this point in the history
This patch implements the Go code for starlark builtin function kube_capture().
The function allows Crashd script to capture and write information about various
kubernetes objects and logs
  • Loading branch information
srm09 committed Jun 25, 2020
1 parent 6787b0d commit 12be3fc
Show file tree
Hide file tree
Showing 18 changed files with 912 additions and 21 deletions.
1 change: 1 addition & 0 deletions go.mod
Expand Up @@ -6,6 +6,7 @@ require (
github.com/imdario/mergo v0.3.7 // indirect
github.com/onsi/ginkgo v1.10.1
github.com/onsi/gomega v1.7.0
github.com/pkg/errors v0.8.0
github.com/sirupsen/logrus v1.4.2
github.com/spf13/cobra v0.0.5
github.com/vladimirvivien/echo v0.0.1-alpha.4
Expand Down
1 change: 1 addition & 0 deletions go.sum
Expand Up @@ -126,6 +126,7 @@ github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
10 changes: 0 additions & 10 deletions k8s/client.go
Expand Up @@ -31,16 +31,6 @@ type Client struct {
JsonPrinter printers.JSONPrinter
}

type SearchResult struct {
ListKind string
ResourceName string
ResourceKind string
GroupVersionResource schema.GroupVersionResource
List *unstructured.UnstructuredList
Namespaced bool
Namespace string
}

// New returns a *Client
func New(kubeconfig string) (*Client, error) {
// creating cfg for each client type because each
Expand Down
49 changes: 49 additions & 0 deletions k8s/container.go
@@ -0,0 +1,49 @@
package k8s

import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
)

func GetContainers(podItem unstructured.Unstructured) ([]Container, error) {
var containers []Container
coreContainers, err := _getPodContainers(podItem)
if err != nil {
return containers, err
}

for _, c := range coreContainers {
containers = append(containers, NewContainerLogger(podItem.GetNamespace(), podItem.GetName(), c))
}
return containers, nil
}

func _getPodContainers(podItem unstructured.Unstructured) ([]corev1.Container, error) {
var containers []corev1.Container

pod := new(corev1.Pod)
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(podItem.Object, &pod); err != nil {
return nil, fmt.Errorf("error converting container objects: %s", err)
}

for _, c := range pod.Spec.InitContainers {
containers = append(containers, c)
}

for _, c := range pod.Spec.Containers {
containers = append(containers, c)
}
containers = append(containers, _getPodEphemeralContainers(pod)...)
return containers, nil
}

func _getPodEphemeralContainers(pod *corev1.Pod) []corev1.Container {
var containers []corev1.Container
for _, ec := range pod.Spec.EphemeralContainers {
containers = append(containers, corev1.Container(ec.EphemeralContainerCommon))
}
return containers
}
64 changes: 64 additions & 0 deletions k8s/container_logger.go
@@ -0,0 +1,64 @@
package k8s

import (
"fmt"
"io"
"os"
"path/filepath"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
)

type ContainerLogsImpl struct {
namespace string
podName string
container corev1.Container
}

func NewContainerLogger(namespace, podName string, container corev1.Container) ContainerLogsImpl {
return ContainerLogsImpl{
namespace: namespace,
podName: podName,
container: container,
}
}

func (c ContainerLogsImpl) Fetch(restApi rest.Interface) (io.ReadCloser, error) {
opts := &corev1.PodLogOptions{Container: c.container.Name}
req := restApi.Get().Namespace(c.namespace).Name(c.podName).Resource("pods").SubResource("log").VersionedParams(opts, scheme.ParameterCodec)
stream, err := req.Stream()
if err != nil {
err = errors.Wrap(err, "failed to create container log stream")
}
return stream, err
}

func (c ContainerLogsImpl) Write(reader io.ReadCloser, rootDir string) error {
containerLogDir := filepath.Join(rootDir, c.container.Name)
if err := os.MkdirAll(containerLogDir, 0744); err != nil && !os.IsExist(err) {
return fmt.Errorf("error creating container log dir: %s", err)
}

path := filepath.Join(containerLogDir, fmt.Sprintf("%s.log", c.container.Name))
logrus.Debugf("Writing pod container log %s", path)

file, err := os.Create(path)
if err != nil {
return err
}
defer file.Close()

defer reader.Close()
if _, err := io.Copy(file, reader); err != nil {
cpErr := fmt.Errorf("failed to copy container log:\n%s", err)
if wErr := writeError(cpErr, file); wErr != nil {
return fmt.Errorf("failed to write previous err [%s] to file: %s", err, wErr)
}
return err
}
return nil
}
20 changes: 20 additions & 0 deletions k8s/k8s.go
@@ -0,0 +1,20 @@
package k8s

import (
"fmt"
"io"

"k8s.io/client-go/rest"
)

const BaseDirname = "kubecapture"

type Container interface {
Fetch(rest.Interface) (io.ReadCloser, error)
Write(io.ReadCloser, string) error
}

func writeError(errStr error, w io.Writer) error {
_, err := fmt.Fprintln(w, errStr.Error())
return err
}
13 changes: 13 additions & 0 deletions k8s/k8s_suite_test.go
@@ -0,0 +1,13 @@
package k8s

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestK8s(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "K8s Suite")
}
42 changes: 42 additions & 0 deletions k8s/object_writer.go
@@ -0,0 +1,42 @@
package k8s

import (
"fmt"
"os"
"path/filepath"

"github.com/sirupsen/logrus"
"k8s.io/cli-runtime/pkg/printers"
)

type ObjectWriter struct {
writeDir string
}

func (w ObjectWriter) Write(result SearchResult) (string, error) {
resultDir := w.writeDir
if result.Namespaced {
resultDir = filepath.Join(w.writeDir, result.Namespace)
}
if err := os.MkdirAll(resultDir, 0744); err != nil && !os.IsExist(err) {
return "", fmt.Errorf("failed to create search result dir: %s", err)
}

path := filepath.Join(resultDir, fmt.Sprintf("%s.json", result.ResourceName))
file, err := os.Create(path)
if err != nil {
return "", err
}
defer file.Close()

logrus.Debugf("kube_capture(): saving %s search results to: %s", result.ResourceName, path)

printer := new(printers.JSONPrinter)
if err := printer.PrintObj(result.List, file); err != nil {
if wErr := writeError(err, file); wErr != nil {
return "", fmt.Errorf("failed to write previous err [%s] to file: %s", err, wErr)
}
return "", err
}
return resultDir, nil
}
81 changes: 81 additions & 0 deletions k8s/result_writer.go
@@ -0,0 +1,81 @@
package k8s

import (
"fmt"
"os"
"path/filepath"

"k8s.io/client-go/rest"
)

type ResultWriter struct {
workdir string
writeLogs bool
restApi rest.Interface
}

func NewResultWriter(workdir, what string, restApi rest.Interface) (*ResultWriter, error) {
var err error
workdir = filepath.Join(workdir, BaseDirname)
if err := os.MkdirAll(workdir, 0744); err != nil && !os.IsExist(err) {
return nil, err
}

writeLogs := what == "logs" || what == "all"
return &ResultWriter{
workdir: workdir,
writeLogs: writeLogs,
restApi: restApi,
}, err
}

func (w *ResultWriter) GetResultDir() string {
return w.workdir
}

func (w *ResultWriter) Write(searchResults []SearchResult) error {
if searchResults == nil || len(searchResults) == 0 {
return fmt.Errorf("cannot write empty (or nil) search result")
}

// each result represents a list of searched item
// write each list in a namespaced location in working dir
for _, result := range searchResults {
objWriter := ObjectWriter{
writeDir: w.workdir,
}
writeDir, err := objWriter.Write(result)
if err != nil {
return err
}

if w.writeLogs && result.ListKind == "PodList" {
if len(result.List.Items) == 0 {
continue
}
for _, podItem := range result.List.Items {
logDir := filepath.Join(writeDir, podItem.GetName())
if err := os.MkdirAll(logDir, 0744); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create pod log dir: %s", err)
}

containers, err := GetContainers(podItem)
if err != nil {
return err
}
for _, containerLogger := range containers {
reader, err := containerLogger.Fetch(w.restApi)
if err != nil {
return err
}
err = containerLogger.Write(reader, logDir)
if err != nil {
return err
}
}
}
}
}

return nil
}

0 comments on commit 12be3fc

Please sign in to comment.