Skip to content

Commit

Permalink
MESH-1656 - Adding workload selector based on service
Browse files Browse the repository at this point in the history
  • Loading branch information
vinaygonuguntla committed Aug 1, 2022
1 parent 5ed0b07 commit d11bc66
Show file tree
Hide file tree
Showing 7 changed files with 121 additions and 18 deletions.
17 changes: 13 additions & 4 deletions admiral/pkg/clusters/envoyfilter.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clusters

import (
"bytes"
"encoding/gob"
"fmt"
"github.com/gogo/protobuf/types"
v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
Expand All @@ -18,11 +20,18 @@ var (
)
const hostsKey = "hosts: "

func createOrUpdateEnvoyFilter( rc *RemoteController, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType, workloadIdentityKey string, admiralCache *AdmiralCache) (*networking.EnvoyFilter, error) {

func createOrUpdateEnvoyFilter( rc *RemoteController, routingPolicy *v1.RoutingPolicy, eventType admiral.EventType, workloadIdentityKey string, admiralCache *AdmiralCache, workloadSelectorString string) (*networking.EnvoyFilter, error) {
workloadSelectors := make(map[string]string)
workloadSelectors[common.GetWorkloadIdentifier()] = workloadIdentityKey
workloadSelectors[common.GetEnvKey()] = common.GetRoutingPolicyEnv(routingPolicy)
buffer := new(bytes.Buffer)
buffer.WriteString(workloadSelectorString)
decoder := gob.NewDecoder(buffer)

// Decoding the serialized data into map
err := decoder.Decode(&workloadSelectors)
if err != nil {
log.Error("error during decoding map data")
return nil, err
}

envoyfilterSpec := constructEnvoyFilterStruct(routingPolicy, workloadSelectors)

Expand Down
21 changes: 15 additions & 6 deletions admiral/pkg/clusters/envoyfilter_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package clusters

import (
"bytes"
"context"
"encoding/gob"
"errors"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/model"
v1 "github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
Expand Down Expand Up @@ -82,21 +84,28 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) {
Status: v1.RoutingPolicyStatus{},
}

selectors := map[string]string{"one":"test1", "two":"test2"}

workloadSelectorBuffer := new(bytes.Buffer)
encoder := gob.NewEncoder(workloadSelectorBuffer)
err := encoder.Encode(selectors)
assert.Nil(t, err)

getSha1 = getSha1Error

envoyfilter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "barstage", registry.AdmiralCache)
envoyfilter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "barstage", registry.AdmiralCache, workloadSelectorBuffer.String())

assert.NotNil(t, err)
assert.Nil(t, envoyfilter)

getSha1 = common.GetSha1

envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar", registry.AdmiralCache)
assert.Equal(t, "bar", envoyfilter.Spec.WorkloadSelector.GetLabels()["identity"])
assert.Equal(t, "stage", envoyfilter.Spec.WorkloadSelector.GetLabels()["admiral.io/env"])
envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar", registry.AdmiralCache, workloadSelectorBuffer.String())
assert.Equal(t, "test1", envoyfilter.Spec.WorkloadSelector.GetLabels()["one"])
assert.Equal(t, "test2", envoyfilter.Spec.WorkloadSelector.GetLabels()["two"])
assert.Equal(t, "test-dynamicrouting-d0fdd-1.13", envoyfilter.Name)

envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Update, "bar", registry.AdmiralCache)
envoyfilter, err = createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Update, "bar", registry.AdmiralCache, workloadSelectorBuffer.String())
assert.Nil(t, err)


Expand All @@ -105,7 +114,7 @@ func TestCreateOrUpdateEnvoyFilter(t *testing.T) {
return true, nil, errors.New("error creating envoyfilter")
},
)
envoyfilter3, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar2", registry.AdmiralCache)
envoyfilter3, err := createOrUpdateEnvoyFilter(remoteController, routingPolicyFoo, admiral.Add, "bar2", registry.AdmiralCache, workloadSelectorBuffer.String())
assert.NotNil(t, err)
assert.Nil(t, envoyfilter3)

Expand Down
1 change: 1 addition & 0 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis

w.AdmiralCache = &AdmiralCache{
IdentityClusterCache: common.NewMapOfMaps(),
WorkloadSelectorCache: common.NewMapOfMaps(),
CnameClusterCache: common.NewMapOfMaps(),
CnameDependentClusterCache: common.NewMapOfMaps(),
ClusterLocalityCache: common.NewMapOfMaps(),
Expand Down
4 changes: 4 additions & 0 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,10 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
}

remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID)
workloadSelectors := GetServiceSelector(rc.ClusterID, serviceInstance)
if workloadSelectors != "" {
remoteRegistry.AdmiralCache.WorkloadSelectorCache.Put(sourceIdentity, rc.ClusterID, workloadSelectors)
}
remoteRegistry.AdmiralCache.CnameClusterCache.Put(cname, rc.ClusterID, rc.ClusterID)
remoteRegistry.AdmiralCache.CnameIdentityCache.Store(cname, sourceIdentity)
sourceServices[rc.ClusterID] = serviceInstance
Expand Down
15 changes: 9 additions & 6 deletions admiral/pkg/clusters/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type AdmiralCache struct {
CnameDependentClusterCache *common.MapOfMaps
CnameIdentityCache *sync.Map
IdentityClusterCache *common.MapOfMaps
WorkloadSelectorCache *common.MapOfMaps
ClusterLocalityCache *common.MapOfMaps
IdentityDependencyCache *common.MapOfMaps
SubsetServiceEntryIdentityCache *sync.Map
Expand Down Expand Up @@ -242,13 +243,15 @@ func (r RoutingPolicyHandler) processroutingPolicy(dependents map[string]string,

// Check if the dependent exists in this remoteCluster. If so, we create an envoyFilter with dependent identity as workload selector
if _, ok := r.RemoteRegistry.AdmiralCache.IdentityClusterCache.Get(dependent).Copy()[remoteController.ClusterID]; ok {
if selectors, ok := r.RemoteRegistry.AdmiralCache.WorkloadSelectorCache.Get(dependent).Copy()[remoteController.ClusterID]; ok {

filter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache)
if err != nil {
// Best effort create
log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err)
} else {
log.Infof("msg=%s name=%s cluster=%s", "created envoyfilter", filter.Name, remoteController.ClusterID)
filter, err := createOrUpdateEnvoyFilter(remoteController, routingPolicy, eventType, dependent, r.RemoteRegistry.AdmiralCache, selectors)
if err != nil {
// Best effort create
log.Errorf(LogErrFormat, eventType, "routingpolicy", routingPolicy.Name, remoteController.ClusterID, err)
} else {
log.Infof("msg=%s name=%s cluster=%s", "created envoyfilter", filter.Name, remoteController.ClusterID)
}
}
}
}
Expand Down
19 changes: 19 additions & 0 deletions admiral/pkg/clusters/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package clusters

import (
"bytes"
"encoding/gob"
"errors"
argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
Expand All @@ -27,6 +29,23 @@ func GetMeshPortsForRollout(clusterName string, destService *k8sV1.Service,
return ports
}

func GetServiceSelector(clusterName string, destService *k8sV1.Service) string {
var selectors = destService.Spec.Selector
if len(selectors) == 0{
return ""
}
buffer := new(bytes.Buffer)
encoder := gob.NewEncoder(buffer)
// Encoding the map
err := encoder.Encode(selectors)
if err != nil {
log.Warnf(LogErrFormat, "GetServiceLabels", "Failed to encode map", destService.Name, clusterName, err)
}else {
log.Debugf(LogFormat, "GetServiceLabels", "Encoded Map", destService.Name, clusterName, selectors)
}
return buffer.String()
}

func getMeshPortsHelper(meshPorts string, destService *k8sV1.Service, clusterName string) map[string]uint32 {
var ports = make(map[string]uint32)

Expand Down
62 changes: 60 additions & 2 deletions admiral/pkg/clusters/util_test.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
package clusters

import (
"bytes"
"encoding/gob"
"errors"
argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/stretchr/testify/assert"
k8sAppsV1 "k8s.io/api/apps/v1"
coreV1 "k8s.io/api/core/v1"
k8sV1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"reflect"
"strconv"
"testing"

k8sV1 "k8s.io/api/core/v1"
)

func TestGetMeshPorts(t *testing.T) {
Expand Down Expand Up @@ -215,6 +217,62 @@ func TestValidateConfigmapBeforePutting(t *testing.T) {

}

func TestGetServiceSelector(t *testing.T) {

selector := map[string]string {"app":"test1"}
buffer := new(bytes.Buffer)

e := gob.NewEncoder(buffer)

// Encoding the map
err := e.Encode(selector)
assert.Nil(t, err)

testCases := []struct {
name string
clusterName string
service k8sV1.Service
expected string
}{
{
name: "should return a selectors based on service",
clusterName: "test-cluster",
service: k8sV1.Service{
ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}},
Spec: k8sV1.ServiceSpec{Selector: selector},
},
expected: buffer.String(),
},
{
name: "should return empty selectors",
clusterName: "test-cluster",
service: k8sV1.Service{
ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}},
Spec: k8sV1.ServiceSpec{Selector: map[string]string{}},
},
expected: "",
},
{
name: "should return nil",
clusterName: "test-cluster",
service: k8sV1.Service{
ObjectMeta: v1.ObjectMeta{Name: "server", Labels: map[string]string{"asset": "Intuit.platform.mesh.server"}},
Spec: k8sV1.ServiceSpec{Selector: nil},
},
expected: "",
},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
selectors := GetServiceSelector(c.clusterName,&c.service)
if !reflect.DeepEqual(selectors, c.expected) {
t.Errorf("Wanted selectors: %v, got: %v", c.expected, selectors)
}
})
}
}

func TestGetMeshPortsForRollout(t *testing.T) {

annotatedPort := 8090
Expand Down

0 comments on commit d11bc66

Please sign in to comment.