Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Reworked collectors and runner
Signed-off-by: Andreas Neumann <aneumann@mesosphere.com>
  • Loading branch information
ANeumann82 committed Jun 2, 2020
1 parent fd9864a commit 2a439b9
Show file tree
Hide file tree
Showing 4 changed files with 202 additions and 168 deletions.
60 changes: 39 additions & 21 deletions pkg/kudoctl/cmd/diagnostics/collectors.go
Expand Up @@ -9,6 +9,8 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"

"github.com/kudobuilder/kudo/pkg/kudoctl/clog"
)

// Ensure collector is implemented
Expand All @@ -21,41 +23,44 @@ type resourceCollector struct {
parentDir func() string // parent dir to attach the printer's output
failOnError bool // define whether the collector should return the error
callback func(runtime.Object) // will be called with the retrieved resource after collection to update shared context
printer *nonFailingPrinter
printMode printMode
}

// collect - load a resource and send either the resource or collection error to printer
// return error if failOnError field is set to true
// if failOnError is true, finding no object(s) is treated as an error
func (c *resourceCollector) collect() error {
func (c *resourceCollector) collect(printer *nonFailingPrinter) error {
clog.V(4).Printf("Collect Resource %s in parent dir %s", c.name, c.parentDir())
obj, err := c._collect(c.failOnError)
if err != nil {
c.printer.printError(err, c.parentDir(), c.name)
printer.printError(err, c.parentDir(), c.name)
if c.failOnError {
return err
}
}
if obj != nil {
c.printer.printObject(obj, c.parentDir(), c.printMode)
printer.printObject(obj, c.parentDir(), c.printMode)
}
return nil
}

func emptyResult(obj runtime.Object) bool {
return obj == nil || reflect.ValueOf(obj).IsNil() || (meta.IsListType(obj) && meta.LenList(obj) == 0)
}

func (c *resourceCollector) _collect(failOnError bool) (runtime.Object, error) {
obj, err := c.loadResourceFn()
switch {
case err != nil:
if err != nil {
return nil, fmt.Errorf("failed to retrieve object(s) of kind %s: %v", c.name, err)
case obj == nil || reflect.ValueOf(obj).IsNil() || (meta.IsListType(obj) && meta.LenList(obj) == 0):
}
if emptyResult(obj) {
obj = nil
if failOnError {
return nil, fmt.Errorf("no object(s) of kind %s retrieved", c.name)
}
default:
if c.callback != nil {
c.callback(obj)
}
}
if c.callback != nil {
c.callback(obj)
}
return obj, nil
}
Expand All @@ -67,24 +72,24 @@ var _ collector = &resourceCollectorGroup{}
// each other's side-effects on the shared context
type resourceCollectorGroup struct {
collectors []resourceCollector
parentDir func() string
}

// collect - collect resource and run callback for each collector, print all afterwards
// collection failures are treated as fatal regardless of the collectors failOnError flag setting
func (g resourceCollectorGroup) collect() error {
func (g resourceCollectorGroup) collect(printer *nonFailingPrinter) error {
clog.V(0).Printf("Collect ResourceGroup for %d collectors", len(g.collectors))
objs := make([]runtime.Object, len(g.collectors))
modes := make([]printMode, len(g.collectors))
for i, c := range g.collectors {
obj, err := c._collect(true)
if err != nil {
c.printer.printError(err, g.parentDir(), c.name)
printer.printError(err, c.parentDir(), c.name)
return err
}
objs[i], modes[i] = obj, c.printMode
}
for i, c := range g.collectors {
c.printer.printObject(objs[i], c.parentDir(), modes[i])
printer.printObject(objs[i], c.parentDir(), modes[i])
}
return nil
}
Expand All @@ -94,22 +99,35 @@ var _ collector = &logsCollector{}

type logsCollector struct {
loadLogFn func(string, string) (io.ReadCloser, error)
pods []v1.Pod
pods func() []v1.Pod
parentDir func() string
printer *nonFailingPrinter
}

func (c *logsCollector) collect() error {
for _, pod := range c.pods {
func (c *logsCollector) collect(printer *nonFailingPrinter) error {
clog.V(0).Printf("Collect Logs for %d pods", len(c.pods()))
for _, pod := range c.pods() {
for _, container := range pod.Spec.Containers {
log, err := c.loadLogFn(pod.Name, container.Name)
if err != nil {
c.printer.printError(err, filepath.Join(c.parentDir(), fmt.Sprintf("pod_%s", pod.Name)), fmt.Sprintf("%s.log", container.Name))
printer.printError(err, filepath.Join(c.parentDir(), fmt.Sprintf("pod_%s", pod.Name)), fmt.Sprintf("%s.log", container.Name))
} else {
c.printer.printLog(log, filepath.Join(c.parentDir(), fmt.Sprintf("pod_%s", pod.Name)), container.Name)
printer.printLog(log, filepath.Join(c.parentDir(), fmt.Sprintf("pod_%s", pod.Name)), container.Name)
_ = log.Close()
}
}
}
return nil
}

var _ collector = &objCollector{}

type objCollector struct {
obj interface{}
parentDir stringGetter
name string
}

func (c *objCollector) collect(printer *nonFailingPrinter) error {
printer.printYaml(c.obj, c.parentDir(), c.name)
return nil
}
8 changes: 6 additions & 2 deletions pkg/kudoctl/cmd/diagnostics/processing_context.go
Expand Up @@ -3,10 +3,10 @@ package diagnostics
import (
"fmt"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"

"github.com/kudobuilder/kudo/pkg/apis/kudo/v1beta1"
)

// processingContext - shared data for the resource collectors
Expand Down Expand Up @@ -51,3 +51,7 @@ func (ctx *processingContext) operatorVersionName() string {
func (ctx *processingContext) operatorName() string {
return ctx.opName
}

func (ctx *processingContext) podList() []v1.Pod {
return ctx.pods
}
29 changes: 19 additions & 10 deletions pkg/kudoctl/cmd/diagnostics/runner.go
Expand Up @@ -3,22 +3,31 @@ package diagnostics
// collector - generic interface for diagnostic data collection
// implementors are expected to return only fatal errors and handle non-fatal ones themselves
type collector interface {
collect() error
collect(printer *nonFailingPrinter) error
}

// runner - sequential runner for Collectors reducing error checking boilerplate code
type runner struct {
fatalErr error
collectors []collector
}

func (r *runner) run(c collector) *runner {
if r.fatalErr == nil {
r.fatalErr = c.collect()
}
return r
func (r *runner) addCollector(c collector) {
r.collectors = append(r.collectors, c)
}

func (r *runner) addObjDump(v interface{}, dir stringGetter, name string) {
r.addCollector(&objCollector{
obj: v,
parentDir: dir,
name: name,
})
}

func (r *runner) dumpToYaml(v interface{}, dir stringGetter, name string, p *nonFailingPrinter) *runner {
p.printYaml(v, dir(), name)
return r
func (r *runner) run(printer *nonFailingPrinter) error {
for _, c := range r.collectors {
if err := c.collect(printer); err != nil {
return err
}
}
return nil
}

0 comments on commit 2a439b9

Please sign in to comment.