Skip to content

Commit

Permalink
Update branch for latest changes from master
Browse files Browse the repository at this point in the history
  • Loading branch information
grs committed Aug 11, 2020
2 parents e310003 + 7e6a1aa commit cf48d53
Show file tree
Hide file tree
Showing 18 changed files with 698 additions and 85 deletions.
2 changes: 2 additions & 0 deletions api/types/types.go
Expand Up @@ -212,6 +212,7 @@ type AssemblySpec struct {
}

type VanRouterStatusSpec struct {
SiteName string `json:"siteName,omitempty"`
Mode string `json:"mode,omitempty"`
TransportReadyReplicas int32 `json:"transportReadyReplicas,omitempty"`
ConnectedSites TransportConnectedSites `json:"connectedSites,omitempty"`
Expand Down Expand Up @@ -315,6 +316,7 @@ type TransportConnectedSites struct {
Direct int
Indirect int
Total int
Warnings []string
}

type ServiceInterface struct {
Expand Down
4 changes: 4 additions & 0 deletions client/van_router_inspect.go
Expand Up @@ -17,6 +17,10 @@ func (cli *VanClient) VanRouterInspect(ctx context.Context) (*types.VanRouterIns

current, err := cli.KubeClient.AppsV1().Deployments(cli.Namespace).Get(types.TransportDeploymentName, metav1.GetOptions{})
if err == nil {
siteConfig, err := cli.VanSiteConfigInspect(ctx, nil)
if err == nil && siteConfig != nil {
vir.Status.SiteName = siteConfig.Spec.SkupperName
}
vir.Status.Mode = string(qdr.GetTransportMode(current))
vir.Status.TransportReadyReplicas = current.Status.ReadyReplicas
connected, err := qdr.GetConnectedSites(vir.Status.Mode == types.TransportModeEdge, cli.Namespace, cli.KubeClient, cli.RestConfig)
Expand Down
125 changes: 125 additions & 0 deletions client/van_serviceinterface_inspect_test.go
@@ -0,0 +1,125 @@
package client

import (
"context"
"strings"
"testing"

"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/pkg/kube"

"gotest.tools/assert"
)

func TestVanServiceInterfaceInspect(t *testing.T) {
testcases := []struct {
namespace string
doc string
addr string
proto string
port int
init bool
expectedCreationError string
}{
{
namespace: "vsii-1",
addr: "vsii-1-addr",
proto: "tcp",
port: 5672,
init: true,
expectedCreationError: "",
},
{
namespace: "vsii-2",
addr: "vsii-2-addr",
proto: "tcp",
port: 5672,
init: false,
expectedCreationError: "Skupper not initialised",
},
}
for _, testcase := range testcases {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Run in a real cluster, or in a mock environment.
var cli *VanClient
var err error
isCluster := *clusterRun
if isCluster {
cli, err = NewClient(testcase.namespace, "", "")
} else {
cli, err = newMockClient(testcase.namespace, "", "")
}
assert.Check(t, err, testcase.namespace)

_, err = kube.NewNamespace(testcase.namespace, cli.KubeClient)
assert.Check(t, err, "%s: Namespace creation failed.", testcase.namespace)
defer kube.DeleteNamespace(testcase.namespace, cli.KubeClient)

// Create a skupper router -- or don't if the test
// wants a creation error.
if testcase.init {
err = cli.VanRouterCreate(ctx, types.VanSiteConfig{
Spec: types.VanSiteConfigSpec{
SkupperName: testcase.namespace,
IsEdge: false,
EnableController: true,
EnableServiceSync: true,
EnableConsole: false,
AuthMode: "",
User: "",
Password: "",
ClusterLocal: true,
},
})
assert.Check(t, err, "%s: Unable to create VAN router", testcase.namespace)
}

// Create the VanServiceInterface.
service := types.ServiceInterface{
Address: testcase.addr,
Protocol: testcase.proto,
Port: testcase.port,
}
err = cli.VanServiceInterfaceCreate(ctx, &service)

// If initialization was not done, we should see an error.
// In this case, don't try to check the Service Interface --
// it isn't there.
if testcase.expectedCreationError != "" {
assert.Check(t,
err != nil && strings.Contains(err.Error(), testcase.expectedCreationError),
"\n\nTest %s failure: The expected error |%s| was not reported.\n",
testcase.namespace,
testcase.expectedCreationError)
} else {
assert.Check(t, err, "\n\nTest %s failure: Creation failed.\n", testcase.namespace)

// When we inspect the VanServiceInterface, make sure that the
// expected values have been set.
serviceInterface, err := cli.VanServiceInterfaceInspect(ctx, testcase.addr)
assert.Check(t, err, "Inspection failed.")

assert.Equal(t, testcase.addr, serviceInterface.Address,
"\n\nTest %s failure: Address was |%s| but should be |%s|.\n",
testcase.namespace,
serviceInterface.Address,
testcase.addr)
assert.Equal(t, testcase.proto, serviceInterface.Protocol,
"\n\nTest %s failure: Protocol was |%s| but should be |%s|.\n",
testcase.namespace,
serviceInterface.Protocol,
testcase.proto)
assert.Equal(t, testcase.port, serviceInterface.Port,
"\n\nTest %s failure: Port was %d but should be %d.\n",
testcase.namespace,
serviceInterface.Port,
testcase.port)
assert.Assert(t, nil == serviceInterface.Headless,
"\n\nTest %s failure: Headless was |%#v| but should be nil.\n",
testcase.namespace,
serviceInterface.Headless)
}
}
}
5 changes: 3 additions & 2 deletions cmd/service-controller/bridges.go
Expand Up @@ -357,11 +357,12 @@ func getStringByKey(attributes map[string]interface{}, key string) string {
}

func getIntByKey(attributes map[string]interface{}, key string) int {
i, ok := attributes[key].(int)
// Unmarshal stores float64 in interface value for JSON numbers
i, ok := attributes[key].(float64)
if !ok {
return 0
}
return i
return int(i)
}

func getBoolByKey(attributes map[string]interface{}, key string) bool {
Expand Down
5 changes: 4 additions & 1 deletion cmd/service-controller/controller.go
Expand Up @@ -47,9 +47,10 @@ type Controller struct {
amqpClient *amqp.Client
amqpSession *amqp.Session
byOrigin map[string]map[string]types.ServiceInterface
Local []types.ServiceInterface
localServices map[string]types.ServiceInterface
byName map[string]types.ServiceInterface
desiredServices map[string]types.ServiceInterface
heardFrom map[string]time.Time

definitionMonitor *DefinitionMonitor
consoleServer *ConsoleServer
Expand Down Expand Up @@ -129,8 +130,10 @@ func NewController(cli *client.VanClient, origin string, tlsConfig *tls.Config)

// Organize service definitions
controller.byOrigin = make(map[string]map[string]types.ServiceInterface)
controller.localServices = make(map[string]types.ServiceInterface)
controller.byName = make(map[string]types.ServiceInterface)
controller.desiredServices = make(map[string]types.ServiceInterface)
controller.heardFrom = make(map[string]time.Time)

log.Println("Setting up event handlers")
svcDefInformer.AddEventHandler(controller.newEventHandler("servicedefs", AnnotatedKey, ConfigMapResourceVersionTest))
Expand Down
7 changes: 6 additions & 1 deletion cmd/service-controller/definition_monitor.go
Expand Up @@ -156,10 +156,15 @@ func (m *DefinitionMonitor) getServiceDefinitionFromAnnotatedDeployment(deployme
} else {
svc.Address = deployment.ObjectMeta.Name
}

selector := ""
if deployment.Spec.Selector != nil {
selector = utils.StringifySelector(deployment.Spec.Selector.MatchLabels)
}
svc.Targets = []types.ServiceInterfaceTarget{
types.ServiceInterfaceTarget{
Name: deployment.ObjectMeta.Name,
Selector: utils.StringifySelector(deployment.Spec.Selector.MatchLabels),
Selector: selector,
},
}
svc.Origin = "annotation"
Expand Down
16 changes: 16 additions & 0 deletions cmd/service-controller/main.go
Expand Up @@ -9,9 +9,13 @@ import (
"os"
"os/signal"
"syscall"
"time"

corev1 "k8s.io/api/core/v1"

"github.com/skupperproject/skupper/api/types"
"github.com/skupperproject/skupper/client"
"github.com/skupperproject/skupper/pkg/kube"
)

func describe(i interface{}) {
Expand Down Expand Up @@ -89,6 +93,18 @@ func main() {
log.Fatal("Error getting new controller", err.Error())
}

log.Println("Waiting for Skupper transport to start")
pods, err := kube.GetDeploymentPods(types.TransportDeploymentName, namespace, cli.KubeClient)
if err != nil {
log.Fatal("Error getting transport deployment pods", err.Error())
}
for _, pod := range pods {
_, err := kube.WaitForPodStatus(namespace, cli.KubeClient, pod.Name, corev1.PodRunning, time.Second*180, time.Second*5)
if err != nil {
log.Fatal("Error waiting for skupper transport pod running status", err.Error())
}
}

// start the controller workers
if err = controller.Run(stopCh); err != nil {
log.Fatal("Error running controller: ", err.Error())
Expand Down

0 comments on commit cf48d53

Please sign in to comment.