Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add service and node discovery to test resources #127

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/cli/probe.go
@@ -1,6 +1,8 @@
package cli

import (
"strings"

"github.com/mattfenwick/cyclonus/pkg/connectivity"
"github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
"github.com/mattfenwick/cyclonus/pkg/generator"
Expand All @@ -11,7 +13,6 @@ import (
v1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"strings"
)

type ProbeArgs struct {
Expand Down
6 changes: 6 additions & 0 deletions pkg/connectivity/comparisontable.go
Expand Up @@ -44,6 +44,12 @@ func NewComparisonTable(items []string) *ComparisonTable {
}

func NewComparisonTableFrom(kubeProbe *probe.Table, simulatedProbe *probe.Table) *ComparisonTable {
if kubeProbe == nil {
panic(errors.Errorf("kubeprobe is nil"))
}
if simulatedProbe == nil {
panic(errors.Errorf("sim probe is nil"))
}
if len(kubeProbe.Wrapped.Froms) != len(simulatedProbe.Wrapped.Froms) || len(kubeProbe.Wrapped.Tos) != len(simulatedProbe.Wrapped.Tos) {
panic(errors.Errorf("cannot compare tables of different dimensions"))
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/connectivity/interpreter.go
Expand Up @@ -2,14 +2,15 @@ package connectivity

import (
"fmt"
"time"

"github.com/mattfenwick/cyclonus/pkg/connectivity/probe"
"github.com/mattfenwick/cyclonus/pkg/generator"
"github.com/mattfenwick/cyclonus/pkg/kube"
"github.com/mattfenwick/cyclonus/pkg/matcher"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
networkingv1 "k8s.io/api/networking/v1"
"time"
)

const (
Expand Down Expand Up @@ -116,6 +117,10 @@ func (t *Interpreter) ExecuteTestCase(testCase *generator.TestCase) *Result {
err = testCaseState.SetPodLabels(ns, pod, labels)
} else if action.DeletePod != nil {
err = testCaseState.DeletePod(action.DeletePod.Namespace, action.DeletePod.Pod)
} else if action.CreateService != nil {
err = testCaseState.CreateService(action.CreateService.Service)
} else if action.DeleteService != nil {
err = testCaseState.DeleteService(action.DeleteService.Service)
} else {
err = errors.Errorf("invalid Action at step %d, action %d", stepIndex, actionIndex)
}
Expand Down
43 changes: 43 additions & 0 deletions pkg/connectivity/probe/jobbuilder.go
Expand Up @@ -3,6 +3,7 @@ package probe
import (
"github.com/mattfenwick/cyclonus/pkg/generator"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/intstr"
)
Expand All @@ -12,19 +13,58 @@ type JobBuilder struct {
}

func (j *JobBuilder) GetJobsForProbeConfig(resources *Resources, config *generator.ProbeConfig) *Jobs {
logrus.Debugf("getting jobs for probe config %v", config)
if config.AllAvailable {
return j.GetJobsAllAvailableServers(resources, config.Mode)
} else if config.Mode == generator.ProbeModeNodeIP {
return j.GetJobsForNodeIP(resources, config)
} else if config.PortProtocol != nil {
return j.GetJobsForNamedPortProtocol(resources, config.PortProtocol.Port, config.PortProtocol.Protocol, config.Mode)
} else {
panic(errors.Errorf("invalid ProbeConfig %+v", config))
}
}

func (j *JobBuilder) GetJobsForNodeIP(resources *Resources, config *generator.ProbeConfig) *Jobs {
jobs := &Jobs{}
logrus.Debugf("getting jobs for node ip %v", config)

for _, podFrom := range resources.Pods {
for _, node := range resources.Nodes {
job := &Job{
FromKey: podFrom.PodString().String(),
FromNamespace: podFrom.Namespace,
FromNamespaceLabels: resources.Namespaces[podFrom.Namespace],
FromPod: podFrom.Name,
FromPodLabels: podFrom.Labels,
FromContainer: podFrom.Containers[0].Name,
FromIP: podFrom.IP,
ToKey: node.Name,
ToHost: node.IP,
ToNamespace: "node",
ToNamespaceLabels: map[string]string{},
ToPodLabels: map[string]string{},
ToIP: node.IP,
ResolvedPort: config.PortProtocol.Port.IntValue(),
ResolvedPortName: "Custom",
Protocol: config.PortProtocol.Protocol,
TimeoutSeconds: j.TimeoutSeconds,
}
jobs.Valid = append(jobs.Valid, job)

}
}

return jobs
}

func (j *JobBuilder) GetJobsForNamedPortProtocol(resources *Resources, port intstr.IntOrString, protocol v1.Protocol, mode generator.ProbeMode) *Jobs {
jobs := &Jobs{}
logrus.Debugf("named port getting jobs for resources %+v", resources)
for _, podFrom := range resources.Pods {
logrus.Debugf("named port getting jobs for podfrom %+v", podFrom)
for _, podTo := range resources.Pods {
logrus.Debugf("named port getting jobs for podTo %+v", podTo)
job := &Job{
FromKey: podFrom.PodString().String(),
FromNamespace: podFrom.Namespace,
Expand Down Expand Up @@ -76,8 +116,11 @@ func (j *JobBuilder) GetJobsForNamedPortProtocol(resources *Resources, port ints

func (j *JobBuilder) GetJobsAllAvailableServers(resources *Resources, mode generator.ProbeMode) *Jobs {
var jobs []*Job
logrus.Debugf("all available getting jobs for resources %+v", resources)
for _, podFrom := range resources.Pods {
logrus.Debugf("all available getting jobs for podfrom %+v", podFrom)
for _, podTo := range resources.Pods {
logrus.Debugf("all available getting jobs for podTo %+v", podTo)
for _, contTo := range podTo.Containers {
jobs = append(jobs, &Job{
FromKey: podFrom.PodString().String(),
Expand Down
25 changes: 20 additions & 5 deletions pkg/connectivity/probe/jobrunner.go
@@ -1,13 +1,14 @@
package probe

import (
"strings"

"github.com/mattfenwick/cyclonus/pkg/generator"
"github.com/mattfenwick/cyclonus/pkg/kube"
"github.com/mattfenwick/cyclonus/pkg/matcher"
"github.com/mattfenwick/cyclonus/pkg/utils"
"github.com/mattfenwick/cyclonus/pkg/worker"
"github.com/sirupsen/logrus"
"strings"
)

type Runner struct {
Expand All @@ -28,12 +29,24 @@ func NewKubeBatchRunner(kubernetes kube.IKubernetes, workers int, jobBuilder *Jo
}

func (p *Runner) RunProbeForConfig(probeConfig *generator.ProbeConfig, resources *Resources) *Table {
return NewTableFromJobResults(resources, p.runProbe(p.JobBuilder.GetJobsForProbeConfig(resources, probeConfig)))
jobs := p.JobBuilder.GetJobsForProbeConfig(resources, probeConfig)
logrus.Debugf("got jobs %+v", jobs)
jobresults := p.runProbe(jobs)
if probeConfig.Mode == generator.ProbeModeNodeIP {
return NewNodeTableFromJobResults(resources, jobresults)
} else {
return NewPodTableFromJobResults(resources, jobresults)
}
}

func (p *Runner) runProbe(jobs *Jobs) []*JobResult {
logrus.Debugf("running probe for job %+v", jobs)
resultSlice := p.JobRunner.RunJobs(jobs.Valid)

for _, res := range resultSlice {
logrus.Debugf("resultslice combined: %+v, ingress: %+v, egress %+v", res.Combined, res.Ingress, res.Egress)
}

invalidPP := ConnectivityInvalidPortProtocol
unknown := ConnectivityUnknown
for _, j := range jobs.BadPortProtocol {
Expand Down Expand Up @@ -100,6 +113,7 @@ type KubeJobRunner struct {
}

func (k *KubeJobRunner) RunJobs(jobs []*Job) []*JobResult {
logrus.Debugf("run job single with %+v", jobs)
size := len(jobs)
jobsChan := make(chan *Job, size)
resultsChan := make(chan *JobResult, size)
Expand All @@ -124,6 +138,7 @@ func (k *KubeJobRunner) RunJobs(jobs []*Job) []*JobResult {
// it only writes pass/fail status to a channel and has no failure side effects, this is by design since we do not want to fail inside a goroutine.
func (k *KubeJobRunner) worker(jobs <-chan *Job, results chan<- *JobResult) {
for job := range jobs {
logrus.Debugf("probing connectivity for job %+v", job)
connectivity, _ := probeConnectivity(k.Kubernetes, job)
results <- &JobResult{
Job: job,
Expand All @@ -135,13 +150,13 @@ func (k *KubeJobRunner) worker(jobs <-chan *Job, results chan<- *JobResult) {
func probeConnectivity(k8s kube.IKubernetes, job *Job) (Connectivity, string) {
commandDebugString := strings.Join(job.KubeExecCommand(), " ")
stdout, stderr, commandErr, err := k8s.ExecuteRemoteCommand(job.FromNamespace, job.FromPod, job.FromContainer, job.ClientCommand())
logrus.Debugf("stdout, stderr from %s: \n%s\n%s", commandDebugString, stdout, stderr)
logrus.Debugf("stdout, stderr from [%s]: \n%s\n%s", commandDebugString, stdout, stderr)
if err != nil {
logrus.Errorf("unable to set up command %s: %+v", commandDebugString, err)
logrus.Errorf("unable to set up command [%s]: %+v", commandDebugString, err)
return ConnectivityCheckFailed, commandDebugString
}
if commandErr != nil {
logrus.Debugf("unable to run command %s: %+v", commandDebugString, commandErr)
logrus.Debugf("unable to run command [%s]: %+v", commandDebugString, commandErr)
return ConnectivityBlocked, commandDebugString
}
return ConnectivityAllowed, commandDebugString
Expand Down
15 changes: 15 additions & 0 deletions pkg/connectivity/probe/node.go
@@ -0,0 +1,15 @@
package probe

type Node struct {
Name string
Labels map[string]string
IP string
}

func NewNode(name string, labels map[string]string, ip string) *Node {
return &Node{
Name: name,
Labels: make(map[string]string),
IP: ip,
}
}
35 changes: 34 additions & 1 deletion pkg/connectivity/probe/pod.go
Expand Up @@ -2,13 +2,14 @@ package probe

import (
"fmt"
"strings"

"github.com/mattfenwick/collections/pkg/slice"
"github.com/mattfenwick/cyclonus/pkg/generator"
"github.com/mattfenwick/cyclonus/pkg/kube"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"strings"
)

const (
Expand Down Expand Up @@ -48,6 +49,7 @@ type Pod struct {
Labels map[string]string
ServiceIP string
IP string
NodeIP string
Containers []*Container
}

Expand All @@ -59,6 +61,8 @@ func (p *Pod) Host(probeMode generator.ProbeMode) string {
return p.IP
case generator.ProbeModeServiceIP:
return p.ServiceIP
case generator.ProbeModeNodeIP:
return p.NodeIP
default:
panic(errors.Errorf("invalid mode %s", probeMode))
}
Expand Down Expand Up @@ -89,6 +93,10 @@ func (p *Pod) ServiceName() string {
return fmt.Sprintf("s-%s-%s", p.Namespace, p.Name)
}

func (p *Pod) ServiceNameLoadBalancer() string {
return fmt.Sprintf("s-%s-%s-lb", p.Namespace, p.Name)
}

func (p *Pod) KubePod() *v1.Pod {
zero := int64(0)
return &v1.Pod{
Expand Down Expand Up @@ -117,6 +125,22 @@ func (p *Pod) KubeService() *v1.Service {
}
}

func (p *Pod) KubeServiceLoadBalancer() *v1.Service {
ports := slice.Map(func(cont *Container) v1.ServicePort { return cont.KubeServicePort() }, p.Containers)
tcpPorts := slice.Filter(func(port v1.ServicePort) bool { return port.Protocol == v1.ProtocolTCP }, ports)
return &v1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: p.ServiceNameLoadBalancer(),
Namespace: p.Namespace,
},
Spec: v1.ServiceSpec{
Ports: tcpPorts,
Selector: p.Labels,
Type: v1.ServiceTypeLoadBalancer,
},
}
}

func (p *Pod) KubeContainers() []v1.Container {
return slice.Map(func(cont *Container) v1.Container { return cont.KubeContainer() }, p.Containers)
}
Expand Down Expand Up @@ -188,6 +212,15 @@ func (c *Container) KubeServicePort() v1.ServicePort {
}
}

// when using load balancer types, cannot contain more than 1 protocol
func (c *Container) KubeServicePortTCP() v1.ServicePort {
return v1.ServicePort{
Name: fmt.Sprintf("service-port-%s-%d", strings.ToLower(string(v1.ProtocolTCP)), c.Port),
Protocol: v1.ProtocolTCP,
Port: int32(c.Port),
}
}

func (c *Container) Image() string {
if c.BatchJobs {
return cyclonusWorkerImage
Expand Down