Skip to content

Commit

Permalink
Optimize GlobalTrafficPolicy fetching and endpoint generation time (i…
Browse files Browse the repository at this point in the history
…stio-ecosystem#206)

Signed-off-by: psikka1 <pankaj_sikka@intuit.com>
  • Loading branch information
aattuluri authored and psikka1 committed Jun 15, 2022
1 parent 14ab1d1 commit 3a8fcf4
Show file tree
Hide file tree
Showing 11 changed files with 413 additions and 1,455 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@
.idea/vcs.xml

out*
*.tar.gz
*.tar.gz
*.out
4 changes: 0 additions & 4 deletions admiral/pkg/clusters/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,10 @@ import (
"fmt"
"github.com/istio-ecosystem/admiral/admiral/pkg/apis/admiral/v1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/istio"
v12 "k8s.io/api/apps/v1"
"k8s.io/client-go/rest"
"sync"
"time"

argo "github.com/argoproj/argo-rollouts/pkg/apis/rollouts/v1alpha1"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/admiral"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/common"
"github.com/istio-ecosystem/admiral/admiral/pkg/controller/secret"
Expand Down Expand Up @@ -46,8 +44,6 @@ func InitAdmiral(ctx context.Context, params common.AdmiralParams) (*RemoteRegis

gtpCache := &globalTrafficCache{}
gtpCache.identityCache = make(map[string]*v1.GlobalTrafficPolicy)
gtpCache.dependencyCache = make(map[string]*v12.Deployment)
gtpCache.dependencyRolloutCache = make(map[string]*argo.Rollout)
gtpCache.mutex = &sync.Mutex{}

w.AdmiralCache = &AdmiralCache{
Expand Down
32 changes: 0 additions & 32 deletions admiral/pkg/clusters/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,38 +312,6 @@ func TestAdded(t *testing.T) {

}

func TestPodHandler(t *testing.T) {

p := common.AdmiralParams{
KubeconfigPath: "testdata/fake.config",
}

rr, _ := InitAdmiral(context.Background(), p)

rc, _ := createMockRemoteController(func(i interface{}) {

})
rr.RemoteControllers["test.cluster"] = rc

ph := PodHandler{
RemoteRegistry: rr,
}

pod := k8sCoreV1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
Spec: k8sCoreV1.PodSpec{
Hostname: "test.local",
},
}

ph.Added(&pod)

ph.Deleted(&pod)
}

func TestGetServiceForDeployment(t *testing.T) {
baseRc, _ := createMockRemoteController(func(i interface{}) {
//res := i.(istio.Config)
Expand Down
50 changes: 48 additions & 2 deletions admiral/pkg/clusters/serviceentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math"
"math/rand"
"reflect"
"sort"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -70,6 +71,11 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
var serviceInstance *k8sV1.Service
var weightedServices map[string]*WeightedService
var rollout *admiral.RolloutClusterEntry
var gtps = make(map[string][]*v1.GlobalTrafficPolicy)

var namespace string

var gtpKey = common.ConstructGtpKey(sourceIdentity, env)

start := time.Now()
for _, rc := range remoteRegistry.RemoteControllers {
Expand All @@ -87,7 +93,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
if serviceInstance == nil {
continue
}

namespace = deploymentInstance.Namespace
localMeshPorts := GetMeshPorts(rc.ClusterID, serviceInstance, deploymentInstance)

cname = common.GetCname(deploymentInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
Expand All @@ -106,7 +112,7 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
serviceInstance = sInstance.Service
break
}

namespace = rolloutInstance.Namespace
localMeshPorts := GetMeshPortsForRollout(rc.ClusterID, serviceInstance, rolloutInstance)

cname = common.GetCnameForRollout(rolloutInstance, common.GetWorkloadIdentifier(), common.GetHostnameSuffix())
Expand All @@ -117,6 +123,11 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
continue
}

gtpsInNamespace := rc.GlobalTraffic.Cache.Get(gtpKey, namespace)
if len(gtps) > 0 {
gtps[rc.ClusterID] = gtpsInNamespace
}

remoteRegistry.AdmiralCache.IdentityClusterCache.Put(sourceIdentity, rc.ClusterID, rc.ClusterID)
remoteRegistry.AdmiralCache.CnameClusterCache.Put(cname, rc.ClusterID, rc.ClusterID)
remoteRegistry.AdmiralCache.CnameIdentityCache.Store(cname, sourceIdentity)
Expand All @@ -126,6 +137,9 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s

util.LogElapsedTimeSince("BuildServiceEntry", sourceIdentity, env, "", start)

//cache the latest GTP in global cache to be reused during DR creation
updateGlobalGtpCache(remoteRegistry.AdmiralCache, sourceIdentity, env, gtps)

dependents := remoteRegistry.AdmiralCache.IdentityDependencyCache.Get(sourceIdentity).Copy()

//handle local updates (source clusters first)
Expand Down Expand Up @@ -215,6 +229,38 @@ func modifyServiceEntryForNewServiceOrPod(event admiral.EventType, env string, s
return serviceEntries
}

//Does two things;
//i) Picks the GTP that was created most recently from the passed in GTP list (GTPs from all clusters)
//ii) Updates the global GTP cache with the selected GTP in i)
func updateGlobalGtpCache(cache *AdmiralCache, identity, env string, gtps map[string][]*v1.GlobalTrafficPolicy) {
defer util.LogElapsedTime("updateGlobalGtpCache", identity, env, "")()
gtpsOrdered := make([]*v1.GlobalTrafficPolicy, 0)
for _, gtpsInCluster := range gtps {
gtpsOrdered = append(gtpsOrdered, gtpsInCluster...)
}
if len(gtpsOrdered) == 0 {
cache.GlobalTrafficCache.Delete(env, identity)
return
} else if len(gtpsOrdered) > 1 {
//sort by creation time with most recent at the beginning
sort.Slice(gtpsOrdered, func(i, j int) bool {
iTime := gtpsOrdered[i].CreationTimestamp.Nanosecond()
jTime := gtpsOrdered[j].CreationTimestamp.Nanosecond()
return iTime > jTime
})
}

mostRecentGtp := gtpsOrdered[0]

err := cache.GlobalTrafficCache.Put(mostRecentGtp)

if err != nil {
log.Errorf("Error in updating GTP with name=%s in namespace=%s as actively used for identity=%s with err=%v", mostRecentGtp.Name, mostRecentGtp.Namespace, common.GetGtpKey(mostRecentGtp), err)
} else {
log.Infof("GTP with name=%s in namespace=%s is actively used for identity=%s", mostRecentGtp.Name, mostRecentGtp.Namespace, common.GetGtpKey(mostRecentGtp))
}
}

func updateEndpointsForBlueGreen(rollout *argo.Rollout, weightedServices map[string]*WeightedService, cnames map[string]string,
ep *networking.ServiceEntry_Endpoint, sourceCluster string, meshHost string) {
activeServiceName := rollout.Spec.Strategy.BlueGreen.ActiveService
Expand Down
76 changes: 75 additions & 1 deletion admiral/pkg/clusters/serviceentry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ func TestAddServiceEntriesWithDr(t *testing.T) {

gtpCache := &globalTrafficCache{}
gtpCache.identityCache = make(map[string]*v13.GlobalTrafficPolicy)
gtpCache.dependencyCache = make(map[string]*v14.Deployment)
gtpCache.mutex = &sync.Mutex{}
admiralCache.GlobalTrafficCache = gtpCache

Expand Down Expand Up @@ -890,6 +889,8 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))

gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{"test.test.mesh-se": common.LocalAddressPrefix + ".10.1"},
Addresses: []string{common.LocalAddressPrefix + ".10.1"},
Expand All @@ -912,6 +913,7 @@ func TestCreateServiceEntryForNewServiceOrPodRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1019,6 +1021,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
t.Fail()
}
s, e := admiral.NewServiceController(make(chan struct{}), &test.MockServiceHandler{}, &config, time.Second*time.Duration(300))
gtpc, e := admiral.NewGlobalTrafficController(make(chan struct{}), &test.MockGlobalTrafficHandler{}, &config, time.Second*time.Duration(300))

cacheWithEntry := ServiceEntryAddressStore{
EntryAddresses: map[string]string{
Expand All @@ -1045,6 +1048,7 @@ func TestCreateServiceEntryForBlueGreenRolloutsUsecase(t *testing.T) {
RolloutController: r,
ServiceController: s,
VirtualServiceController: v,
GlobalTraffic: gtpc,
}
rc.ClusterID = "test.cluster"
rr.RemoteControllers["test.cluster"] = rc
Expand Down Expand Up @@ -1331,6 +1335,76 @@ func TestUpdateEndpointsForWeightedServices(t *testing.T) {

}

func TestUpdateGlobalGtpCache(t *testing.T) {

var (
admiralCache = &AdmiralCache{GlobalTrafficCache: &globalTrafficCache{identityCache: make(map[string]*v13.GlobalTrafficPolicy), mutex: &sync.Mutex{}}}

identity1 = "identity1"

env_stage = "stage"

gtp = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-30))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hello"}},
},}

gtp2 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp2", Namespace: "namespace1", CreationTimestamp: v12.NewTime(time.Now().Add(time.Duration(-15))), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp2"}},
},}

gtp3 = &v13.GlobalTrafficPolicy{ObjectMeta: v12.ObjectMeta{Name: "gtp3", Namespace: "namespace2", CreationTimestamp: v12.NewTime(time.Now()), Labels: map[string]string{"identity": identity1, "env": env_stage}}, Spec: model.GlobalTrafficPolicy{
Policy: []*model.TrafficPolicy {{DnsPrefix: "hellogtp3"}},
},}

)

testCases := []struct {
name string
identity string
env string
gtps map[string][]*v13.GlobalTrafficPolicy
expectedGtp *v13.GlobalTrafficPolicy
}{ {
name: "Should return nil when no GTP present",
gtps: map[string][]*v13.GlobalTrafficPolicy{},
identity: identity1,
env: env_stage,
expectedGtp: nil,
},
{
name: "Should return the only existing gtp",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp}},
identity: identity1,
env: env_stage,
expectedGtp: gtp,
},
{
name: "Should return the gtp recently created within the cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}},
identity: identity1,
env: env_stage,
expectedGtp: gtp2,
},
{
name: "Should return the gtp recently created from another cluster",
gtps: map[string][]*v13.GlobalTrafficPolicy{"c1": {gtp, gtp2}, "c2": {gtp3}},
identity: identity1,
env: env_stage,
expectedGtp: gtp3,
},
}

for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
updateGlobalGtpCache(admiralCache, c.identity, c.env, c.gtps)
gtp := admiralCache.GlobalTrafficCache.GetFromIdentity(c.identity, c.env)
if !reflect.DeepEqual(c.expectedGtp, gtp) {
t.Errorf("Test %s failed expected gtp: %v got %v", c.name, c.expectedGtp, gtp)
}
})
}
}

func isLower(s string) bool {
for _, r := range s {
if !unicode.IsLower(r) && unicode.IsLetter(r) {
Expand Down
Loading

0 comments on commit 3a8fcf4

Please sign in to comment.