Skip to content

Commit

Permalink
feat(pkg/cmd): run creates trace job via factory clients
Browse files Browse the repository at this point in the history
Signed-off-by: Leonardo Di Donato <leodidonato@gmail.com>
  • Loading branch information
leodido authored and fntlnz committed Nov 25, 2018
1 parent 493a75e commit 618ba65
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 38 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package cmd
import (
"fmt"

"github.com/davecgh/go-spew/spew"
"github.com/fntlnz/kubectl-trace/pkg/factory"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -102,6 +103,7 @@ func (o *GetOptions) Validate(cmd *cobra.Command, args []string) error {

// Complete completes the setup of the command.
func (o *GetOptions) Complete(factory factory.Factory, cmd *cobra.Command, args []string) error {
// Prepare namespace
var err error
o.namespace, o.explicitNamespace, _ = factory.ToRawKubeConfigLoader().Namespace()
if err != nil {
Expand All @@ -119,12 +121,13 @@ func (o *GetOptions) Complete(factory factory.Factory, cmd *cobra.Command, args
return fmt.Errorf(missingTargetErr)
}

// todo > init printers (need PrintFlags)
// todo > init printers (need o.PrintFlags)

return nil
}

// Run executes the get command.
func (o *GetOptions) Run() error {
spew.Dump(o)
return nil
}
109 changes: 96 additions & 13 deletions pkg/cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,17 @@ package cmd

import (
"fmt"
"io/ioutil"

"github.com/davecgh/go-spew/spew"
"github.com/fntlnz/kubectl-trace/pkg/factory"
"github.com/fntlnz/kubectl-trace/pkg/tracejob"
"github.com/spf13/cobra"
"k8s.io/api/core/v1"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes/scheme"

batchv1client "k8s.io/client-go/kubernetes/typed/batch/v1"
)

var (
Expand All @@ -23,13 +29,16 @@ var (
# Run an bpftrace inline program on a pod container
%[1]s trace run pod/nginx -c nginx -e "tracepoint:syscalls:sys_enter_* { @[probe] = count(); }"
%[1]s trace run pod/nginx nginx -e "tracepoint:syscalls:sys_enter_* { @[probe] = count(); }"
%[1]s trace run pod/nginx nginx -e "tracepoint:syscalls:sys_enter_* { @[probe] = count(); }"`

runCommand = "run"
usageString = "(POD | TYPE/NAME)"
requiredArgErrString = fmt.Sprintf("%s is a required argument for the %s command", usageString, runCommand)
containerAsArgOrFlagErrString = "specify container inline as argument or via its flag"
bpftraceMissingErrString = "the bpftrace program is mandatory"
bpftraceDoubleErrString = "specify the bpftrace program either via an external file or via a literal string, not both"
bpftraceEmptyErrString = "the bpftrace programm cannot be empty"
)

// RunOptions ...
Expand All @@ -44,6 +53,8 @@ type RunOptions struct {
eval string
program string
resourceArg string

client batchv1client.BatchV1Interface
}

// NewRunOptions provides an instance of RunOptions with default values.
Expand Down Expand Up @@ -86,19 +97,16 @@ func NewRunCommand(factory factory.Factory, streams genericclioptions.IOStreams)

// Validate validates the arguments and flags populating RunOptions accordingly.
func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error {
containerDefined := cmd.Flag("container").Changed
containerFlagDefined := cmd.Flag("container").Changed
switch len(args) {
case 1:
o.resourceArg = args[0]
if !containerDefined {
return fmt.Errorf(containerAsArgOrFlagErrString)
}
break
// 2nd argument interpreted as container when provided
case 2:
o.resourceArg = args[0]
o.container = args[1]
if containerDefined {
if containerFlagDefined {
return fmt.Errorf(containerAsArgOrFlagErrString)
}
break
Expand All @@ -109,39 +117,114 @@ func (o *RunOptions) Validate(cmd *cobra.Command, args []string) error {
if !cmd.Flag("eval").Changed && !cmd.Flag("program").Changed {
return fmt.Errorf(bpftraceMissingErrString)
}
if cmd.Flag("eval").Changed == cmd.Flag("program").Changed {
return fmt.Errorf(bpftraceDoubleErrString)
}
if (cmd.Flag("eval").Changed && len(o.eval) == 0) || (cmd.Flag("program").Changed && len(o.program) == 0) {
return fmt.Errorf(bpftraceEmptyErrString)
}

// todo > complete validation
// - make errors
// - make validators
if len(o.container) == 0 {
return fmt.Errorf("invalid container")
}
// if len(o.eval) == 0 || file not exist(o.program) || file is empty(o.program) {
// return fmt.Errorf("invalid bpftrace program")
// }

return nil
}

// Complete completes the setup of the command.
func (o *RunOptions) Complete(factory factory.Factory, cmd *cobra.Command, args []string) error {
// Prepare program
if len(o.program) > 0 {
b, err := ioutil.ReadFile(o.program)
if err != nil {
return fmt.Errorf("error opening program file")
}
o.program = string(b)
} else {
o.program = o.eval
}

// Prepare namespace
var err error
o.namespace, o.explicitNamespace, err = factory.ToRawKubeConfigLoader().Namespace()
if err != nil {
return err
}

spew.Dump(o)
b := factory.NewBuilder()
spew.Dump(b)
// Look for the target object
x := factory.
NewBuilder().
WithScheme(scheme.Scheme, scheme.Scheme.PrioritizedVersionsAllGroups()...).
NamespaceParam(o.namespace).
SingleResourceType().
ResourceNames("pods", o.resourceArg). // Search pods by default
Do()

obj, err := x.Object()
if err != nil {
return err
}

spew.Dump(obj)

// Check we got a pod or a node
// isPod := false
switch obj.(type) {
case *v1.Pod:
// isPod = true
break
case *v1.Node:
break
default:
return fmt.Errorf("first argument must be %s", usageString)
}

// Check we also have container if we got a pod
// if o.container == "" && isPod {
// return fmt.Errorf("missing pod container")
// }

// get resource by pod | type/name
// get container
// Prepare client
clientConfig, err := factory.ToRESTConfig()
if err != nil {
return err
}
o.client, err = batchv1client.NewForConfig(clientConfig)
if err != nil {
return err
}

// todo > setup printer
// printer, err := o.PrintFlags.ToPrinter()
// if err != nil {
// return err
// }
// o.print = func(obj runtime.Object) error {
// return printer.PrintObj(obj, o.Out)
// }

return nil
}

// Run executes the run command.
func (o *RunOptions) Run() error {
tj := tracejob.TraceJob{
Namespace: o.namespace,
Program: o.program,
Hostname: o.resourceArg,
}

spew.Dump(tj)
fmt.Println(o.container)

_, err := o.client.Jobs(o.namespace).Create(tracejob.Create(tj))
if err != nil {
return err
}

// o.print(_)
return nil
}
6 changes: 5 additions & 1 deletion pkg/meta/constants.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
package meta

const (
// TracePrefix is the prefix to identify objects created by this tool
TracePrefix = "kubectl-trace-"
// TraceIDLabelKey is a meta to annotate objects created by this tool
TraceIDLabelKey = "fntlnz.wtf/kubectl-trace-id"
TraceLabelKey = "fntlnz.wtf/kubectl-trace"
// TraceLabelKey is a meta to annotate objects created by this tool
TraceLabelKey = "fntlnz.wtf/kubectl-trace"
)
53 changes: 30 additions & 23 deletions pkg/tracejob/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
batchv1 "k8s.io/api/batch/v1"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
batchv1typed "k8s.io/client-go/kubernetes/typed/batch/v1"
corev1typed "k8s.io/client-go/kubernetes/typed/core/v1"
)
Expand Down Expand Up @@ -140,44 +141,44 @@ func (t *TraceJobClient) DeleteJob(nf TraceJobFilter) error {
return nil
}

// todo(fntlnz): deal with programs that needs the user to send a signal to complete,
// like how the hist() function does
// Will likely need to allocate a TTY for this one thing.
func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) {
bpfTraceCmd := []string{
// Create setup a new Job for bpftrace program.
func Create(j TraceJob) *batchv1.Job {
j.ID = string(uuid.NewUUID())
j.Name = fmt.Sprintf("%s%s", meta.TracePrefix, j.ID)

bpftraceCommand := []string{
"bpftrace",
"/programs/program.bt",
}

commonMeta := metav1.ObjectMeta{
Name: nj.Name,
Namespace: nj.Namespace,
Name: j.Name,
Namespace: j.Namespace,
Labels: map[string]string{
meta.TraceLabelKey: nj.Name,
meta.TraceIDLabelKey: nj.ID,
meta.TraceLabelKey: j.Name,
meta.TraceIDLabelKey: j.ID,
},
Annotations: map[string]string{
meta.TraceLabelKey: nj.Name,
meta.TraceIDLabelKey: nj.ID,
meta.TraceLabelKey: j.Name,
meta.TraceIDLabelKey: j.ID,
},
}

cm := &apiv1.ConfigMap{
ObjectMeta: commonMeta,
Data: map[string]string{
"program.bt": nj.Program,
"program.bt": j.Program,
},
}

job := &batchv1.Job{
return &batchv1.Job{
ObjectMeta: commonMeta,
Spec: batchv1.JobSpec{
TTLSecondsAfterFinished: int32Ptr(5),
Parallelism: int32Ptr(1),
Completions: int32Ptr(1),
// This is why your tracing job is being killed after 100 seconds,
// someone should work on it to make it configurable and let it run
// indefinitely by default.
// someone should work on it to make it configurable and let it run indefinitely by default.
ActiveDeadlineSeconds: int64Ptr(100), // TODO(fntlnz): allow canceling from kubectl and increase this,
BackoffLimit: int32Ptr(1),
Template: apiv1.PodTemplateSpec{
Expand Down Expand Up @@ -213,11 +214,11 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) {
},
Containers: []apiv1.Container{
apiv1.Container{
Name: nj.Name,
Name: j.Name,
Image: "quay.io/fntlnz/kubectl-trace-bpftrace:master", //TODO(fntlnz): yes this should be configurable!
Command: bpfTraceCmd,
TTY: true,
Stdin: true,
Command: bpftraceCommand,
VolumeMounts: []apiv1.VolumeMount{
apiv1.VolumeMount{
Name: "program",
Expand Down Expand Up @@ -250,7 +251,7 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) {
apiv1.NodeSelectorRequirement{
Key: "kubernetes.io/hostname",
Operator: apiv1.NodeSelectorOpIn,
Values: []string{nj.Hostname},
Values: []string{j.Hostname},
},
},
},
Expand All @@ -262,13 +263,19 @@ func (t *TraceJobClient) CreateJob(nj TraceJob) (*batchv1.Job, error) {
},
},
}

if _, err := t.ConfigClient.Create(cm); err != nil {
return nil, err
}
return t.JobClient.Create(job)
}

// todo(fntlnz): deal with programs that needs the user to send a signal to complete,
// like how the hist() function does
// Will likely need to allocate a TTY for this one thing.
// func (t *TraceJobClient) CreateJob(j TraceJob) (*batchv1.Job, error) {

// if _, err := t.ConfigClient.Create(cm); err != nil {
// return nil, err
// }
// return t.JobClient.Create(job)
// }

func int32Ptr(i int32) *int32 { return &i }
func int64Ptr(i int64) *int64 { return &i }
func boolPtr(b bool) *bool { return &b }
Expand Down

0 comments on commit 618ba65

Please sign in to comment.