diff --git a/api/types/types.go b/api/types/types.go index 8058f7f64..6e5b4bdca 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -212,6 +212,7 @@ type AssemblySpec struct { } type VanRouterStatusSpec struct { + SiteName string `json:"siteName,omitempty"` Mode string `json:"mode,omitempty"` TransportReadyReplicas int32 `json:"transportReadyReplicas,omitempty"` ConnectedSites TransportConnectedSites `json:"connectedSites,omitempty"` @@ -315,6 +316,7 @@ type TransportConnectedSites struct { Direct int Indirect int Total int + Warnings []string } type ServiceInterface struct { diff --git a/client/van_router_inspect.go b/client/van_router_inspect.go index fded5fc4a..780855a21 100644 --- a/client/van_router_inspect.go +++ b/client/van_router_inspect.go @@ -17,6 +17,10 @@ func (cli *VanClient) VanRouterInspect(ctx context.Context) (*types.VanRouterIns current, err := cli.KubeClient.AppsV1().Deployments(cli.Namespace).Get(types.TransportDeploymentName, metav1.GetOptions{}) if err == nil { + siteConfig, err := cli.VanSiteConfigInspect(ctx, nil) + if err == nil && siteConfig != nil { + vir.Status.SiteName = siteConfig.Spec.SkupperName + } vir.Status.Mode = string(qdr.GetTransportMode(current)) vir.Status.TransportReadyReplicas = current.Status.ReadyReplicas connected, err := qdr.GetConnectedSites(vir.Status.Mode == types.TransportModeEdge, cli.Namespace, cli.KubeClient, cli.RestConfig) diff --git a/client/van_serviceinterface_inspect_test.go b/client/van_serviceinterface_inspect_test.go new file mode 100644 index 000000000..0be3083bd --- /dev/null +++ b/client/van_serviceinterface_inspect_test.go @@ -0,0 +1,125 @@ +package client + +import ( + "context" + "strings" + "testing" + + "github.com/skupperproject/skupper/api/types" + "github.com/skupperproject/skupper/pkg/kube" + + "gotest.tools/assert" +) + +func TestVanServiceInterfaceInspect(t *testing.T) { + testcases := []struct { + namespace string + doc string + addr string + proto string + port int + init bool + expectedCreationError string + }{ + { + namespace: "vsii-1", + addr: "vsii-1-addr", + proto: "tcp", + port: 5672, + init: true, + expectedCreationError: "", + }, + { + namespace: "vsii-2", + addr: "vsii-2-addr", + proto: "tcp", + port: 5672, + init: false, + expectedCreationError: "Skupper not initialised", + }, + } + for _, testcase := range testcases { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Run in a real cluster, or in a mock environment. + var cli *VanClient + var err error + isCluster := *clusterRun + if isCluster { + cli, err = NewClient(testcase.namespace, "", "") + } else { + cli, err = newMockClient(testcase.namespace, "", "") + } + assert.Check(t, err, testcase.namespace) + + _, err = kube.NewNamespace(testcase.namespace, cli.KubeClient) + assert.Check(t, err, "%s: Namespace creation failed.", testcase.namespace) + defer kube.DeleteNamespace(testcase.namespace, cli.KubeClient) + + // Create a skupper router -- or don't if the test + // wants a creation error. + if testcase.init { + err = cli.VanRouterCreate(ctx, types.VanSiteConfig{ + Spec: types.VanSiteConfigSpec{ + SkupperName: testcase.namespace, + IsEdge: false, + EnableController: true, + EnableServiceSync: true, + EnableConsole: false, + AuthMode: "", + User: "", + Password: "", + ClusterLocal: true, + }, + }) + assert.Check(t, err, "%s: Unable to create VAN router", testcase.namespace) + } + + // Create the VanServiceInterface. + service := types.ServiceInterface{ + Address: testcase.addr, + Protocol: testcase.proto, + Port: testcase.port, + } + err = cli.VanServiceInterfaceCreate(ctx, &service) + + // If initialization was not done, we should see an error. + // In this case, don't try to check the Service Interface -- + // it isn't there. + if testcase.expectedCreationError != "" { + assert.Check(t, + err != nil && strings.Contains(err.Error(), testcase.expectedCreationError), + "\n\nTest %s failure: The expected error |%s| was not reported.\n", + testcase.namespace, + testcase.expectedCreationError) + } else { + assert.Check(t, err, "\n\nTest %s failure: Creation failed.\n", testcase.namespace) + + // When we inspect the VanServiceInterface, make sure that the + // expected values have been set. + serviceInterface, err := cli.VanServiceInterfaceInspect(ctx, testcase.addr) + assert.Check(t, err, "Inspection failed.") + + assert.Equal(t, testcase.addr, serviceInterface.Address, + "\n\nTest %s failure: Address was |%s| but should be |%s|.\n", + testcase.namespace, + serviceInterface.Address, + testcase.addr) + assert.Equal(t, testcase.proto, serviceInterface.Protocol, + "\n\nTest %s failure: Protocol was |%s| but should be |%s|.\n", + testcase.namespace, + serviceInterface.Protocol, + testcase.proto) + assert.Equal(t, testcase.port, serviceInterface.Port, + "\n\nTest %s failure: Port was %d but should be %d.\n", + testcase.namespace, + serviceInterface.Port, + testcase.port) + assert.Assert(t, nil == serviceInterface.Headless, + "\n\nTest %s failure: Headless was |%#v| but should be nil.\n", + testcase.namespace, + serviceInterface.Headless) + } + } +} diff --git a/cmd/service-controller/bridges.go b/cmd/service-controller/bridges.go index 91371d3d0..4604de712 100644 --- a/cmd/service-controller/bridges.go +++ b/cmd/service-controller/bridges.go @@ -357,11 +357,12 @@ func getStringByKey(attributes map[string]interface{}, key string) string { } func getIntByKey(attributes map[string]interface{}, key string) int { - i, ok := attributes[key].(int) + // Unmarshal stores float64 in interface value for JSON numbers + i, ok := attributes[key].(float64) if !ok { return 0 } - return i + return int(i) } func getBoolByKey(attributes map[string]interface{}, key string) bool { diff --git a/cmd/service-controller/controller.go b/cmd/service-controller/controller.go index 3b70533e9..5e905e181 100644 --- a/cmd/service-controller/controller.go +++ b/cmd/service-controller/controller.go @@ -47,9 +47,10 @@ type Controller struct { amqpClient *amqp.Client amqpSession *amqp.Session byOrigin map[string]map[string]types.ServiceInterface - Local []types.ServiceInterface + localServices map[string]types.ServiceInterface byName map[string]types.ServiceInterface desiredServices map[string]types.ServiceInterface + heardFrom map[string]time.Time definitionMonitor *DefinitionMonitor consoleServer *ConsoleServer @@ -129,8 +130,10 @@ func NewController(cli *client.VanClient, origin string, tlsConfig *tls.Config) // Organize service definitions controller.byOrigin = make(map[string]map[string]types.ServiceInterface) + controller.localServices = make(map[string]types.ServiceInterface) controller.byName = make(map[string]types.ServiceInterface) controller.desiredServices = make(map[string]types.ServiceInterface) + controller.heardFrom = make(map[string]time.Time) log.Println("Setting up event handlers") svcDefInformer.AddEventHandler(controller.newEventHandler("servicedefs", AnnotatedKey, ConfigMapResourceVersionTest)) diff --git a/cmd/service-controller/definition_monitor.go b/cmd/service-controller/definition_monitor.go index 4205c5dfe..872b2c0e3 100644 --- a/cmd/service-controller/definition_monitor.go +++ b/cmd/service-controller/definition_monitor.go @@ -156,10 +156,15 @@ func (m *DefinitionMonitor) getServiceDefinitionFromAnnotatedDeployment(deployme } else { svc.Address = deployment.ObjectMeta.Name } + + selector := "" + if deployment.Spec.Selector != nil { + selector = utils.StringifySelector(deployment.Spec.Selector.MatchLabels) + } svc.Targets = []types.ServiceInterfaceTarget{ types.ServiceInterfaceTarget{ Name: deployment.ObjectMeta.Name, - Selector: utils.StringifySelector(deployment.Spec.Selector.MatchLabels), + Selector: selector, }, } svc.Origin = "annotation" diff --git a/cmd/service-controller/main.go b/cmd/service-controller/main.go index 40d637c17..c509ffd95 100644 --- a/cmd/service-controller/main.go +++ b/cmd/service-controller/main.go @@ -9,9 +9,13 @@ import ( "os" "os/signal" "syscall" + "time" + + corev1 "k8s.io/api/core/v1" "github.com/skupperproject/skupper/api/types" "github.com/skupperproject/skupper/client" + "github.com/skupperproject/skupper/pkg/kube" ) func describe(i interface{}) { @@ -89,6 +93,18 @@ func main() { log.Fatal("Error getting new controller", err.Error()) } + log.Println("Waiting for Skupper transport to start") + pods, err := kube.GetDeploymentPods(types.TransportDeploymentName, namespace, cli.KubeClient) + if err != nil { + log.Fatal("Error getting transport deployment pods", err.Error()) + } + for _, pod := range pods { + _, err := kube.WaitForPodStatus(namespace, cli.KubeClient, pod.Name, corev1.PodRunning, time.Second*180, time.Second*5) + if err != nil { + log.Fatal("Error waiting for skupper transport pod running status", err.Error()) + } + } + // start the controller workers if err = controller.Run(stopCh); err != nil { log.Fatal("Error running controller: ", err.Error()) diff --git a/cmd/service-controller/service_sync.go b/cmd/service-controller/service_sync.go index 647652a36..f9432cc31 100644 --- a/cmd/service-controller/service_sync.go +++ b/cmd/service-controller/service_sync.go @@ -6,7 +6,6 @@ import ( "fmt" "log" "reflect" - "sort" "time" amqp "github.com/interconnectedcloud/go-amqp" @@ -16,8 +15,17 @@ import ( "github.com/skupperproject/skupper/pkg/kube" ) +func (c *Controller) pareByOrigin(service string) { + for _, origin := range c.byOrigin { + if _, ok := origin[service]; ok { + delete(origin, service) + return + } + } +} + func (c *Controller) serviceSyncDefinitionsUpdated(definitions map[string]types.ServiceInterface) { - var latest []types.ServiceInterface // becomes c.Local + latest := make(map[string]types.ServiceInterface) // becomes c.localServices byName := make(map[string]types.ServiceInterface) var added []types.ServiceInterface var modified []types.ServiceInterface @@ -38,31 +46,22 @@ func (c *Controller) serviceSyncDefinitionsUpdated(definitions map[string]types. } c.byOrigin[service.Origin][name] = service } else { - latest = append(latest, service) + latest[service.Address] = service + // may have previously been tracked by origin + c.pareByOrigin(service.Address) } byName[service.Address] = service } - sort.Sort(types.ByServiceInterfaceAddress(latest)) - - last := make(map[string]types.ServiceInterface) - for _, def := range c.Local { - last[def.Address] = def - } - current := make(map[string]types.ServiceInterface) - for _, def := range latest { - current[def.Address] = def - } - - for _, def := range last { - if _, ok := current[def.Address]; !ok { + for _, def := range c.localServices { + if _, ok := latest[def.Address]; !ok { removed = append(removed, def) - } else if !reflect.DeepEqual(def, current[def.Address]) { + } else if !reflect.DeepEqual(def, latest[def.Address]) { modified = append(modified, def) } } - for _, def := range current { - if _, ok := last[def.Address]; !ok { + for _, def := range latest { + if _, ok := c.localServices[def.Address]; !ok { added = append(added, def) } } @@ -78,7 +77,7 @@ func (c *Controller) serviceSyncDefinitionsUpdated(definitions map[string]types. log.Println("Service interface(s) modified", modified) } - c.Local = latest + c.localServices = latest c.byName = byName } @@ -103,6 +102,8 @@ func (c *Controller) ensureServiceInterfaceDefinitions(origin string, serviceInt var changed []types.ServiceInterface var deleted []string + c.heardFrom[origin] = time.Now() + for _, def := range serviceInterfaceDefs { existing, ok := c.byName[def.Address] if !ok || (existing.Origin == origin && !equivalentServiceDefinition(&def, &existing)) { @@ -110,7 +111,6 @@ func (c *Controller) ensureServiceInterfaceDefinitions(origin string, serviceInt } } - // TODO: think about aging entries if _, ok := c.byOrigin[origin]; !ok { c.byOrigin[origin] = make(map[string]types.ServiceInterface) } else { @@ -121,7 +121,12 @@ func (c *Controller) ensureServiceInterfaceDefinitions(origin string, serviceInt } } } + kube.UpdateSkupperServices(changed, deleted, origin, c.vanClient.Namespace, c.vanClient.KubeClient) + + for _, name := range deleted { + delete(c.byOrigin[origin], name) + } } func (c *Controller) syncSender(sendLocal chan bool) { @@ -138,7 +143,8 @@ func (c *Controller) syncSender(sendLocal chan bool) { sender.Close(ctx) }() - ticker := time.NewTicker(5 * time.Second) + tickerSend := time.NewTicker(5 * time.Second) + tickerAge := time.NewTicker(30 * time.Second) properties.Subject = "service-sync-update" request.Properties = &properties @@ -147,10 +153,10 @@ func (c *Controller) syncSender(sendLocal chan bool) { for { select { - case <-ticker.C: + case <-tickerSend.C: local := make([]types.ServiceInterface, 0) - for _, si := range c.Local { + for _, si := range c.localServices { local = append(local, si) } @@ -161,6 +167,34 @@ func (c *Controller) syncSender(sendLocal chan bool) { } request.Value = string(encoded) err = sender.Send(ctx, &request) + + case <-tickerAge.C: + var agedOrigins []string + + now := time.Now() + + for origin, _ := range c.byOrigin { + var deleted []string + + if lastHeard, ok := c.heardFrom[origin]; ok { + if now.Sub(lastHeard) >= 60*time.Second { + agedOrigins = append(agedOrigins, origin) + agedDefinitions := c.byOrigin[origin] + for name, _ := range agedDefinitions { + deleted = append(deleted, name) + } + if len(deleted) > 0 { + kube.UpdateSkupperServices([]types.ServiceInterface{}, deleted, origin, c.vanClient.Namespace, c.vanClient.KubeClient) + } + } + } + } + + for _, originName := range agedOrigins { + log.Println("Service sync aged out service definitions from origin ", originName) + delete(c.heardFrom, originName) + delete(c.byOrigin, originName) + } } } } @@ -168,14 +202,14 @@ func (c *Controller) syncSender(sendLocal chan bool) { func (c *Controller) runServiceSync() { ctx := context.Background() - log.Println("Establishing connection to skupper-messaging service...") + log.Println("Establishing connection to skupper-messaging service for service sync") client, err := amqp.Dial("amqps://skupper-messaging:5671", amqp.ConnSASLExternal(), amqp.ConnMaxFrameSize(4294967295), amqp.ConnTLSConfig(c.tlsConfig)) if err != nil { utilruntime.HandleError(fmt.Errorf("Failed to create amqp connection %s", err.Error())) return } - log.Println("connection to skupper-messaging service established") + log.Println("Service sync connection to skupper-messaging service established") c.amqpClient = client defer c.amqpClient.Close() diff --git a/cmd/service-controller/site_query.go b/cmd/service-controller/site_query.go index b1f1ec0ef..b044280d1 100644 --- a/cmd/service-controller/site_query.go +++ b/cmd/service-controller/site_query.go @@ -87,6 +87,7 @@ func (s *SiteQueryServer) run() { utilruntime.HandleError(fmt.Errorf("Failed to create amqp connection for site query server: %s", err.Error())) return } + log.Println("Site query server connection to skupper-messaging service established") defer client.Close() session, err := client.NewSession() @@ -125,8 +126,6 @@ func (s *SiteQueryServer) run() { correlationId, ok := qdr.AsUint64(msg.Properties.CorrelationID) if !ok { log.Printf("WARN: Could not get correlationid from site query request: %#v (%T)", msg.Properties.CorrelationID, msg.Properties.CorrelationID) - } else { - log.Printf("Sending site query response for %v: %s", correlationId, string(bytes)) } response := amqp.Message{ Properties: &amqp.MessageProperties{ diff --git a/cmd/skupper/skupper.go b/cmd/skupper/skupper.go index 1fd522640..2af31ca3d 100644 --- a/cmd/skupper/skupper.go +++ b/cmd/skupper/skupper.go @@ -64,6 +64,9 @@ func expose(cli *client.VanClient, ctx context.Context, targetType string, targe } else if options.Protocol != "" && service.Protocol != options.Protocol { return fmt.Errorf("Invalid protocol %s for service with mapping %s", options.Protocol, service.Protocol) } + + // service may exist from remote origin + service.Origin = "" return cli.VanServiceInterfaceBind(ctx, service, targetType, targetName, options.Protocol, options.TargetPort) } @@ -413,10 +416,20 @@ func main() { if vir.Status.Mode == types.TransportModeEdge { modedesc = " in edge mode" } + sitename := "" + if vir.Status.SiteName != "" && vir.Status.SiteName != cli.Namespace { + sitename = fmt.Sprintf(" with site name %q", vir.Status.SiteName) + } + fmt.Printf("Skupper is enabled for namespace %q%s%s.", cli.Namespace, sitename, modedesc) if vir.Status.TransportReadyReplicas == 0 { - fmt.Printf("Skupper is enabled for namespace '%q%s'. Status pending...", cli.Namespace, modedesc) + fmt.Printf(" Status pending...") } else { - fmt.Printf("Skupper is enabled for namespace '%q%s'.", cli.Namespace, modedesc) + if len(vir.Status.ConnectedSites.Warnings) > 0 { + for _, w := range vir.Status.ConnectedSites.Warnings { + fmt.Printf("Warning: %s", w) + fmt.Println() + } + } if vir.Status.ConnectedSites.Total == 0 { fmt.Printf(" It is not connected to any other sites.") } else if vir.Status.ConnectedSites.Total == 1 { diff --git a/go.mod b/go.mod index 86e03e332..938c94aac 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.13 require ( github.com/Azure/go-autorest/autorest v0.10.0 // indirect + github.com/davecgh/go-spew v1.1.1 github.com/google/go-cmp v0.4.0 github.com/gophercloud/gophercloud v0.8.0 // indirect github.com/imdario/mergo v0.3.8 // indirect @@ -12,6 +13,7 @@ require ( github.com/openshift/client-go v0.0.0-20200109173103-2763c6378941 github.com/pkg/errors v0.8.1 // indirect github.com/spf13/cobra v0.0.6 + github.com/tsenart/vegeta/v12 v12.8.3 gotest.tools v2.2.0+incompatible k8s.io/api v0.17.0 k8s.io/apimachinery v0.17.0 diff --git a/pkg/kube/configmaps.go b/pkg/kube/configmaps.go index c40b5e6b2..c86e630cc 100644 --- a/pkg/kube/configmaps.go +++ b/pkg/kube/configmaps.go @@ -50,7 +50,7 @@ func NewConfigMap(name string, data *map[string]string, owner *metav1.OwnerRefer created, err := configMaps.Create(cm) if err != nil { - return nil, fmt.Errorf("Failed to crate config map: %w", err) + return nil, fmt.Errorf("Failed to create config map: %w", err) } else { return created, nil } diff --git a/pkg/kube/deployments.go b/pkg/kube/deployments.go index cb226c6e7..56317895d 100644 --- a/pkg/kube/deployments.go +++ b/pkg/kube/deployments.go @@ -14,6 +14,19 @@ import ( "github.com/skupperproject/skupper/api/types" ) +func GetDeploymentPods(name string, namespace string, cli kubernetes.Interface) ([]corev1.Pod, error) { + deployment, err := GetDeployment(name, namespace, cli) + if err != nil { + return nil, err + } + options := metav1.ListOptions{LabelSelector: "application=" + deployment.Name} + podList, err := cli.CoreV1().Pods(namespace).List(options) + if err != nil { + return nil, err + } + return podList.Items, err +} + func GetDeploymentOwnerReference(dep *appsv1.Deployment) metav1.OwnerReference { return metav1.OwnerReference{ APIVersion: "apps/v1", diff --git a/pkg/kube/pods.go b/pkg/kube/pods.go index f1d9022d4..dbff0b08f 100644 --- a/pkg/kube/pods.go +++ b/pkg/kube/pods.go @@ -15,13 +15,17 @@ limitations under the License. package kube import ( + "context" "errors" "fmt" "strings" + "time" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + + "github.com/skupperproject/skupper/pkg/utils" ) func IsPodReady(pod *corev1.Pod) bool { @@ -84,3 +88,21 @@ func GetComponentVersion(namespace string, clientset kubernetes.Interface, compo return "not-found" } } + +func WaitForPodStatus(namespace string, clientset kubernetes.Interface, podName string, status corev1.PodPhase, timeout time.Duration, interval time.Duration) (*corev1.Pod, error) { + var pod *corev1.Pod + var err error + + ctx, cancel := context.WithTimeout(context.TODO(), timeout) + defer cancel() + err = utils.RetryWithContext(ctx, interval, func() (bool, error) { + pod, err = clientset.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{}) + if err != nil { + // pod does not exist yet + return false, nil + } + return pod.Status.Phase == status, nil + }) + + return pod, err +} diff --git a/pkg/qdr/mgmt.go b/pkg/qdr/mgmt.go index 3bdb713b3..58a271b3b 100644 --- a/pkg/qdr/mgmt.go +++ b/pkg/qdr/mgmt.go @@ -47,28 +47,113 @@ type Connection struct { Dir string `json:"dir"` } -func getConnectedSitesFromNodes(nodes []RouterNode, direct bool, namespace string, clientset kubernetes.Interface, config *restclient.Config) (types.TransportConnectedSites, error) { +func getConnectedSitesFromNodesEdge(namespace string, clientset kubernetes.Interface, config *restclient.Config) (types.TransportConnectedSites, error) { result := types.TransportConnectedSites{} - for _, n := range nodes { - edges, err := GetEdgeSitesForRouter(n.Id, namespace, clientset, config) + direct := make(map[string]bool) + indirect := make(map[string]bool) + interiors := make(map[string]RouterNode) + + uplinks, err := getEdgeUplinkConnections(namespace, clientset, config) + if err != nil { + return result, err + } + // Go through this list once to add all of its + // members to the directly-connected list... + for _, c := range uplinks { + nodeName := fmt.Sprintf("router.node/%s", c.Container) + direct[nodeName] = true + } + // ...and go through it again to get all of + // the indirect nodes. + interiorsRetrieved := false + for _, c := range uplinks { + if interiorsRetrieved { + key := fmt.Sprintf("router.node/%s", c.Container) + if _, ok := interiors[key]; !ok { + result.Warnings = append(result.Warnings, "There are edge uplinks to distinct networks, please verify topology (connected counts may not be accurate).") + continue + } + } + interiorNodes, err := getNodesForRouter(c.Container, namespace, clientset, config) if err != nil { - return result, fmt.Errorf("Failed to check edge nodes for %s: %w", n.Id, err) + return result, err + } else { + interiorsRetrieved = true + for _, interiorNode := range interiorNodes { + if _, present := interiors[interiorNode.Name]; !present { + interiors[interiorNode.Name] = interiorNode + } + // Don't count a node as being indirectly connected + // if we already know that it is directly connected. + if _, present := direct[interiorNode.Name]; !present { + indirect[interiorNode.Name] = true + } + } } + } + localId, err := getLocalRouterId(namespace, clientset, config) + if err != nil { + return result, err + } + for _, interiorNode := range interiors { + edges, err := getEdgeConnectionsForInterior(interiorNode.Id, namespace, clientset, config) + if err != nil { + return result, err + } else { + for _, edge := range edges { + key := fmt.Sprintf("router.node/%s", edge.Container) + if _, present := direct[key]; !present && edge.Container != localId { + indirect[key] = true + } + } + } + } + result.Direct = len(direct) + result.Indirect = len(indirect) + result.Total = result.Direct + result.Indirect + return result, nil +} + +func getConnectedSitesFromNodesInterior(nodes []RouterNode, namespace string, clientset kubernetes.Interface, config *restclient.Config) (types.TransportConnectedSites, error) { + result := types.TransportConnectedSites{} + direct := make(map[string]bool) + indirect := make(map[string]bool) + for _, n := range nodes { if n.NextHop == "(self)" { - if direct { - result.Direct += edges - } else { - result.Indirect += edges + edges, err := getEdgeConnectionsForInterior(n.Id, namespace, clientset, config) + if err != nil { + return result, fmt.Errorf("Failed to check edge nodes for %s: %w", n.Id, err) + } + for _, edge := range edges { + if _, present := direct[edge.Container]; !present { + direct[edge.Container] = true + } + } + break + } + } + for _, n := range nodes { + if n.NextHop != "(self)" { + edges, err := getEdgeConnectionsForInterior(n.Id, namespace, clientset, config) + if err != nil { + return result, fmt.Errorf("Failed to check edge nodes for %s: %w", n.Id, err) + } + for _, edge := range edges { + if _, present := direct[edge.Container]; !present { + if _, present = indirect[edge.Container]; !present { + indirect[edge.Container] = true + } + } } - } else { - result.Indirect += edges if n.NextHop == "" { - result.Direct++ + direct[n.Id] = true } else { - result.Indirect++ + indirect[n.Id] = true } } } + result.Direct = len(direct) + result.Indirect = len(indirect) result.Total = result.Direct + result.Indirect return result, nil } @@ -76,35 +161,11 @@ func getConnectedSitesFromNodes(nodes []RouterNode, direct bool, namespace strin func GetConnectedSites(edge bool, namespace string, clientset kubernetes.Interface, config *restclient.Config) (types.TransportConnectedSites, error) { result := types.TransportConnectedSites{} if edge { - uplink, err := getEdgeUplink(namespace, clientset, config) - if err == nil { - if uplink == nil { - return result, nil - } else { - nodes, err := getNodesForRouter(uplink.Container, namespace, clientset, config) - if err == nil { - result, err := getConnectedSitesFromNodes(nodes, false, namespace, clientset, config) - if err != nil { - return result, err - } - return result, nil - } else { - fmt.Println("Failed to get nodes from uplink:", err) - return result, err - } - } - } else { - fmt.Println("Failed to get edge uplink:", err) - return result, err - } + return getConnectedSitesFromNodesEdge(namespace, clientset, config) } else { nodes, err := GetNodes(namespace, clientset, config) if err == nil { - result, err = getConnectedSitesFromNodes(nodes, true, namespace, clientset, config) - if err != nil { - return result, err - } - return result, nil + return getConnectedSitesFromNodesInterior(nodes, namespace, clientset, config) } else { return result, err } @@ -113,6 +174,7 @@ func GetConnectedSites(edge bool, namespace string, clientset kubernetes.Interfa func GetEdgeSitesForRouter(routerid string, namespace string, clientset kubernetes.Interface, config *restclient.Config) (int, error) { connections, err := getConnectionsForRouter(routerid, namespace, clientset, config) + if err == nil { count := 0 for _, c := range connections { @@ -173,22 +235,36 @@ func GetInterRouterOrEdgeConnection(host string, connections []Connection) *Conn return nil } -func getEdgeUplink(namespace string, clientset kubernetes.Interface, config *restclient.Config) (*Connection, error) { +func GetConnections(namespace string, clientset kubernetes.Interface, config *restclient.Config) ([]Connection, error) { + return getConnectionsForRouter("", namespace, clientset, config) +} + +func getEdgeUplinkConnections(namespace string, clientset kubernetes.Interface, config *restclient.Config) ([]Connection, error) { connections, err := GetConnections(namespace, clientset, config) - if err == nil { - for _, c := range connections { - if c.Role == "edge" && c.Dir == "out" { - return &c, nil - } - } - return nil, nil - } else { + if err != nil { return nil, err } + + return getEdgeConnections("out", connections) } -func GetConnections(namespace string, clientset kubernetes.Interface, config *restclient.Config) ([]Connection, error) { - return getConnectionsForRouter("", namespace, clientset, config) +func getEdgeConnectionsForInterior(routerid string, namespace string, clientset kubernetes.Interface, config *restclient.Config) ([]Connection, error) { + connections, err := getConnectionsForRouter(routerid, namespace, clientset, config) + if err != nil { + return nil, err + } + + return getEdgeConnections("in", connections) +} + +func getEdgeConnections(direction string, connections []Connection) ([]Connection, error) { + result := []Connection{} + for _, c := range connections { + if c.Role == "edge" && c.Dir == direction { + result = append(result, c) + } + } + return result, nil } func getConnectionsForRouter(routerid string, namespace string, clientset kubernetes.Interface, config *restclient.Config) ([]Connection, error) { @@ -208,6 +284,27 @@ func getConnectionsForRouter(routerid string, namespace string, clientset kubern } } +func getLocalRouterId(namespace string, clientset kubernetes.Interface, config *restclient.Config) (string, error) { + command := get_query("router") + buffer, err := router_exec(command, namespace, clientset, config) + if err != nil { + return "", err + } else { + results := []interface{}{} + err = json.Unmarshal(buffer.Bytes(), &results) + if err != nil { + return "", fmt.Errorf("Failed to parse JSON: %s %q", err, buffer.String()) + } else { + if router, ok := results[0].(map[string]interface{}); ok { + if id, ok := router["id"].(string); ok { + return id, nil + } + } + return "", fmt.Errorf("Could not get router id from %#v", results) + } + } +} + func router_exec(command []string, namespace string, clientset kubernetes.Interface, config *restclient.Config) (*bytes.Buffer, error) { pod, err := kube.GetReadyPod(namespace, clientset, "router") if err != nil { @@ -248,7 +345,7 @@ func router_exec(command []string, namespace string, clientset kubernetes.Interf Stderr: nil, }) if err != nil { - return nil, err + return nil, fmt.Errorf("Error executing %s: %v", command, err) } else { return &buffer, nil } diff --git a/pkg/utils/retry.go b/pkg/utils/retry.go new file mode 100644 index 000000000..2fa58e07e --- /dev/null +++ b/pkg/utils/retry.go @@ -0,0 +1,84 @@ +/* +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package utils + +import ( + "context" + "fmt" + "time" +) + +type RetryError struct { + n int +} + +func (e *RetryError) Error() string { + return fmt.Sprintf("still failing after %d retries", e.n) +} + +func IsRetryFailure(err error) bool { + _, ok := err.(*RetryError) + return ok +} + +type ConditionFunc func() (bool, error) + +// Retry retries f every interval until after maxRetries. +// The interval won't be affected by how long f takes. +// For example, if interval is 3s, f takes 1s, another f will be called 2s later. +// However, if f takes longer than interval, it will be delayed. +func Retry(interval time.Duration, maxRetries int, f ConditionFunc) error { + if maxRetries <= 0 { + return fmt.Errorf("maxRetries (%d) should be > 0", maxRetries) + } + tick := time.NewTicker(interval) + defer tick.Stop() + + for i := 0; ; i++ { + ok, err := f() + if err != nil { + return err + } + if ok { + return nil + } + if i == maxRetries { + break + } + <-tick.C + } + return &RetryError{maxRetries} +} + +// RetryWithContext retries f every interval until the specified context times out. +func RetryWithContext(ctx context.Context, interval time.Duration, f ConditionFunc) error { + tick := time.NewTicker(interval) + defer tick.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("the context timeout has been reached") + case <-tick.C: + r, err := f() + if err != nil { + return err + } + if r { + return nil + } + } + } +} diff --git a/test/integration/http/http.go b/test/integration/http/http.go new file mode 100644 index 000000000..81ed33327 --- /dev/null +++ b/test/integration/http/http.go @@ -0,0 +1,177 @@ +package http + +import ( + "context" + "fmt" + "os/exec" + "time" + + "github.com/skupperproject/skupper/api/types" + "github.com/skupperproject/skupper/test/cluster" + "gotest.tools/assert" + + appsv1 "k8s.io/api/apps/v1" + apiv1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/davecgh/go-spew/spew" + vegeta "github.com/tsenart/vegeta/v12/lib" +) + +type HttpClusterTestRunner struct { + cluster.ClusterTestRunnerBase +} + +func int32Ptr(i int32) *int32 { return &i } + +const minute time.Duration = 60 + +var httpbinDep *appsv1.Deployment = &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "httpbin", + }, + Spec: appsv1.DeploymentSpec{ + Replicas: int32Ptr(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"application": "httpbin"}, + }, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "application": "httpbin", + }, + }, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Name: "httpbin", + Image: "docker.io/kennethreitz/httpbin", + ImagePullPolicy: apiv1.PullIfNotPresent, + Ports: []apiv1.ContainerPort{ + { + Name: "http", + Protocol: apiv1.ProtocolTCP, + ContainerPort: 80, + }, + }, + }, + }, + }, + }, + }, +} + +func sendReceive(servAddr string) { + return +} + +func (r *HttpClusterTestRunner) RunTests(ctx context.Context) { + //TODO https://github.com/skupperproject/skupper/issues/95 + //all this hardcoded sleeps must be fixed, probably along with #95 + //for now I am just keeping them in the same values that we are using + //for tcp_echo test, since in case of reducing test may fail + //intermitently + + r.Pub1Cluster.GetService("httpbin", 3*minute) + time.Sleep(20 * time.Second) //TODO XXX What is the right condition to wait for? + + r.Pub1Cluster.KubectlExecAsync(fmt.Sprintf("port-forward service/httpbin 8080:80")) + defer exec.Command("pkill", "kubectl").Run() + time.Sleep(60 * time.Second) //give time to port forwarding to start + + // The test we are doing here is the most basic one, TODO add more + // testing, asserts, etc. + rate := vegeta.Rate{Freq: 100, Per: time.Second} + duration := 4 * time.Second + targeter := vegeta.NewStaticTargeter(vegeta.Target{ + Method: "GET", + URL: "http://localhost:8080/", + }) + attacker := vegeta.NewAttacker() + + var metrics vegeta.Metrics + for res := range attacker.Attack(targeter, rate, duration, "Big Bang!") { + metrics.Add(res) + } + metrics.Close() + + spew.Dump(metrics) + + // Success is the percentage of non-error responses. + assert.Assert(r.T, metrics.Success > 0.95, "too many errors! see the log for details.") +} + +func (r *HttpClusterTestRunner) Setup(ctx context.Context) { + var err error + err = r.Pub1Cluster.CreateNamespace() + assert.Assert(r.T, err) + + err = r.Priv1Cluster.CreateNamespace() + assert.Assert(r.T, err) + + privateDeploymentsClient := r.Priv1Cluster.VanClient.KubeClient.AppsV1().Deployments(r.Priv1Cluster.CurrentNamespace) + + fmt.Println("Creating deployment...") + result, err := privateDeploymentsClient.Create(httpbinDep) + assert.Assert(r.T, err) + + fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName()) + + vanRouterCreateOpts := types.VanSiteConfig{ + Spec: types.VanSiteConfigSpec{ + SkupperName: "", + IsEdge: false, + EnableController: true, + EnableServiceSync: true, + EnableConsole: false, + AuthMode: types.ConsoleAuthModeUnsecured, + User: "nicob?", + Password: "nopasswordd", + ClusterLocal: false, + Replicas: 1, + }, + } + + vanRouterCreateOpts.Spec.SkupperNamespace = r.Priv1Cluster.CurrentNamespace + r.Priv1Cluster.VanClient.VanRouterCreate(ctx, vanRouterCreateOpts) + + service := types.ServiceInterface{ + Address: "httpbin", + Protocol: "http", + Port: 80, + } + + err = r.Priv1Cluster.VanClient.VanServiceInterfaceCreate(ctx, &service) + assert.Assert(r.T, err) + + err = r.Priv1Cluster.VanClient.VanServiceInterfaceBind(ctx, &service, "deployment", "httpbin", "http", 0) + assert.Assert(r.T, err) + + vanRouterCreateOpts.Spec.SkupperNamespace = r.Pub1Cluster.CurrentNamespace + err = r.Pub1Cluster.VanClient.VanRouterCreate(ctx, vanRouterCreateOpts) + + err = r.Pub1Cluster.VanClient.VanConnectorTokenCreateFile(ctx, types.DefaultVanName, "/tmp/public_secret.yaml") + assert.Assert(r.T, err) + + var vanConnectorCreateOpts types.VanConnectorCreateOptions = types.VanConnectorCreateOptions{ + SkupperNamespace: r.Priv1Cluster.CurrentNamespace, + Name: "", + Cost: 0, + } + r.Priv1Cluster.VanClient.VanConnectorCreateFromFile(ctx, "/tmp/public_secret.yaml", vanConnectorCreateOpts) +} + +func (r *HttpClusterTestRunner) TearDown(ctx context.Context) { + r.Pub1Cluster.DeleteNamespace() + r.Priv1Cluster.DeleteNamespace() +} + +func (r *HttpClusterTestRunner) Run(ctx context.Context) { + defer r.TearDown(ctx) + r.Setup(ctx) + r.RunTests(ctx) +} diff --git a/test/integration/http/http_test.go b/test/integration/http/http_test.go new file mode 100644 index 000000000..6d75ad92a --- /dev/null +++ b/test/integration/http/http_test.go @@ -0,0 +1,16 @@ +// +build integration + +package http + +import ( + "context" + "testing" +) + +func TestHttp(t *testing.T) { + testRunner := &HttpClusterTestRunner{} + + testRunner.Build(t, "http") + ctx := context.Background() + testRunner.Run(ctx) +}