Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
ref: collapse pkg/policy into k8s Client (#5014)
Browse files Browse the repository at this point in the history
ref: collapse pkg/policy into k8s Client

Signed-off-by: Allen Leigh <allenlsy@gmail.com>
  • Loading branch information
allenlsy committed Sep 2, 2022
1 parent 68e99eb commit e78aa42
Show file tree
Hide file tree
Showing 41 changed files with 2,027 additions and 1,092 deletions.
Empty file added .force-touch
Empty file.
6 changes: 1 addition & 5 deletions cmd/osm-controller/osm-controller.go
Expand Up @@ -47,7 +47,6 @@ import (
"github.com/openservicemesh/osm/pkg/logger"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/metricsstore"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/reconciler"
"github.com/openservicemesh/osm/pkg/signals"
"github.com/openservicemesh/osm/pkg/smi"
Expand Down Expand Up @@ -229,12 +228,9 @@ func main() {

ingress.Initialize(kubeClient, k8sClient, stop, certManager, msgBroker)

policyController := policy.NewPolicyController(informerCollection, k8sClient, msgBroker)

meshCatalog := catalog.NewMeshCatalog(
meshSpec,
certManager,
policyController,
stop,
computeClient,
msgBroker,
Expand All @@ -253,7 +249,7 @@ func main() {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, "Error initializing ADS server")
}

if err := validator.NewValidatingWebhook(ctx, validatorWebhookConfigName, osmNamespace, osmVersion, meshName, enableReconciler, validateTrafficTarget, certManager, kubeClient, policyController); err != nil {
if err := validator.NewValidatingWebhook(ctx, validatorWebhookConfigName, osmNamespace, osmVersion, meshName, enableReconciler, validateTrafficTarget, certManager, kubeClient, computeClient); err != nil {
events.GenericEventRecorder().FatalEvent(err, events.InitializationError, fmt.Sprintf("Error starting the validating webhook server: %s", err))
}

Expand Down
3 changes: 0 additions & 3 deletions mockspec/rules
Expand Up @@ -8,9 +8,6 @@ k8s; pkg/k8s/mock_controller_generated.go; github.com/openservicemesh/osm/pkg/k8
# pkg/smi
smi; pkg/smi/mock_meshspec_generated.go; github.com/openservicemesh/osm/pkg/smi; MeshSpec

# pkg/policy
policy; pkg/policy/mock_client_generated.go; github.com/openservicemesh/osm/pkg/policy; Controller

# pkg/catalog
catalog; pkg/catalog/mock_catalog_generated.go; github.com/openservicemesh/osm/pkg/catalog; MeshCataloger

Expand Down
9 changes: 3 additions & 6 deletions pkg/catalog/catalog.go
Expand Up @@ -6,21 +6,18 @@ import (
"github.com/openservicemesh/osm/pkg/certificate"
"github.com/openservicemesh/osm/pkg/compute"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/ticker"
)

// NewMeshCatalog creates a new service catalog
func NewMeshCatalog(meshSpec smi.MeshSpec, certManager *certificate.Manager,
policyController policy.Controller, stop <-chan struct{},
stop <-chan struct{},
computeInterface compute.Interface,
msgBroker *messaging.Broker) *MeshCatalog {
mc := &MeshCatalog{
Interface: computeInterface,
meshSpec: meshSpec,
policyController: policyController,

Interface: computeInterface,
meshSpec: meshSpec,
certManager: certManager,
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/catalog/egress.go
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
Expand All @@ -35,7 +34,7 @@ func (mc *MeshCatalog) GetEgressTrafficPolicy(serviceIdentity identity.ServiceId
var trafficMatches []*trafficpolicy.TrafficMatch
var clusterConfigs []*trafficpolicy.EgressClusterConfig
portToRouteConfigMap := make(map[int][]*trafficpolicy.EgressHTTPRouteConfig)
egressResources := mc.policyController.ListEgressPoliciesForSourceIdentity(serviceIdentity.ToK8sServiceAccount())
egressResources := mc.ListEgressPoliciesForServiceAccount(serviceIdentity.ToK8sServiceAccount())

for _, egress := range egressResources {
upstreamTrafficSetting, err := mc.getUpstreamTrafficSettingForEgress(egress)
Expand Down Expand Up @@ -136,8 +135,7 @@ func (mc *MeshCatalog) getUpstreamTrafficSettingForEgress(egressPolicy *policyv1
Namespace: egressPolicy.Namespace,
Name: match.Name,
}
upstreamtrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{NamespacedName: &namespacedName})
upstreamtrafficSetting := mc.GetUpstreamTrafficSettingByNamespace(&namespacedName)

if upstreamtrafficSetting == nil {
return nil, fmt.Errorf("UpstreamTrafficSetting %s specified in Egress policy %s/%s could not be found, ignoring it",
Expand Down
12 changes: 5 additions & 7 deletions pkg/catalog/egress_test.go
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/openservicemesh/osm/pkg/compute"

"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"

"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
Expand Down Expand Up @@ -437,19 +436,18 @@ func TestGetEgressTrafficPolicy(t *testing.T) {
for i, tc := range testCases {
t.Run(fmt.Sprintf("Running test case %d: %s", i, tc.name), func(t *testing.T) {
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockPolicyController := policy.NewMockController(mockCtrl)

for _, rg := range tc.httpRouteGroups {
mockMeshSpec.EXPECT().GetHTTPRouteGroup(fmt.Sprintf("%s/%s", rg.Namespace, rg.Name)).Return(rg).AnyTimes()
}
mockPolicyController.EXPECT().ListEgressPoliciesForSourceIdentity(gomock.Any()).Return(tc.egressPolicies).Times(1)
mockPolicyController.EXPECT().GetUpstreamTrafficSetting(gomock.Any()).Return(tc.upstreamTrafficSetting).AnyTimes()

mockCompute := compute.NewMockInterface(mockCtrl)
mockCompute.EXPECT().ListEgressPoliciesForServiceAccount(gomock.Any()).Return(tc.egressPolicies).Times(1)
mockCompute.EXPECT().GetUpstreamTrafficSettingByService(gomock.Any()).Return(tc.upstreamTrafficSetting).AnyTimes()
mockCompute.EXPECT().GetUpstreamTrafficSettingByNamespace(gomock.Any()).Return(tc.upstreamTrafficSetting).AnyTimes()
mc := &MeshCatalog{
meshSpec: mockMeshSpec,
Interface: mockCompute,
policyController: mockPolicyController,
meshSpec: mockMeshSpec,
Interface: mockCompute,
}

mockCompute.EXPECT().GetMeshConfig().Return(configv1alpha2.MeshConfig{Spec: configv1alpha2.MeshConfigSpec{Traffic: configv1alpha2.TrafficSpec{EnableEgress: false}}}).Times(1) // Enables EgressPolicy
Expand Down
14 changes: 1 addition & 13 deletions pkg/catalog/fake/fake.go
Expand Up @@ -3,33 +3,21 @@ package fake
import (
"time"

"github.com/golang/mock/gomock"
"github.com/onsi/ginkgo"

"github.com/openservicemesh/osm/pkg/compute"

"github.com/openservicemesh/osm/pkg/catalog"
tresorFake "github.com/openservicemesh/osm/pkg/certificate/providers/tresor/fake"
"github.com/openservicemesh/osm/pkg/messaging"
"github.com/openservicemesh/osm/pkg/policy"
smiFake "github.com/openservicemesh/osm/pkg/smi/fake"
)

// NewFakeMeshCatalog creates a new struct implementing catalog.MeshCataloger interface used for testing.
func NewFakeMeshCatalog(provider compute.Interface) *catalog.MeshCatalog {
mockCtrl := gomock.NewController(ginkgo.GinkgoT())
mockPolicyController := policy.NewMockController(mockCtrl)

meshSpec := smiFake.NewFakeMeshSpecClient()

stop := make(<-chan struct{})

certManager := tresorFake.NewFake(1 * time.Hour)

mockPolicyController.EXPECT().ListEgressPoliciesForSourceIdentity(gomock.Any()).Return(nil).AnyTimes()
mockPolicyController.EXPECT().GetIngressBackendPolicy(gomock.Any()).Return(nil).AnyTimes()
mockPolicyController.EXPECT().GetUpstreamTrafficSetting(gomock.Any()).Return(nil).AnyTimes()

return catalog.NewMeshCatalog(meshSpec, certManager,
mockPolicyController, stop, provider, messaging.NewBroker(stop))
return catalog.NewMeshCatalog(meshSpec, certManager, stop, provider, messaging.NewBroker(stop))
}
2 changes: 1 addition & 1 deletion pkg/catalog/helpers_test.go
Expand Up @@ -52,5 +52,5 @@ func newFakeMeshCatalogForRoutes(t *testing.T, testParams testParams) *MeshCatal
mockMeshSpec.EXPECT().ListTrafficSplits().Return([]*split.TrafficSplit{}).AnyTimes()

return NewMeshCatalog(mockMeshSpec, tresorFake.NewFake(1*time.Hour),
nil, stop, provider, messaging.NewBroker(stop))
stop, provider, messaging.NewBroker(stop))
}
4 changes: 1 addition & 3 deletions pkg/catalog/inbound_traffic_policies.go
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/errcode"
"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
"github.com/openservicemesh/osm/pkg/trafficpolicy"
Expand Down Expand Up @@ -67,8 +66,7 @@ func (mc *MeshCatalog) GetInboundMeshTrafficPolicy(upstreamIdentity identity.Ser
}
clusterConfigs = append(clusterConfigs, clusterConfigForSvc)

upstreamTrafficSetting := mc.policyController.GetUpstreamTrafficSetting(
policy.UpstreamTrafficSettingGetOpt{MeshService: &upstreamSvc})
upstreamTrafficSetting := mc.GetUpstreamTrafficSettingByService(&upstreamSvc)
clusterConfigs = append(clusterConfigs, getRateLimitServiceClusters(upstreamTrafficSetting, rlsClusterSet)...)

// ---
Expand Down
120 changes: 91 additions & 29 deletions pkg/catalog/inbound_traffic_policies_test.go
Expand Up @@ -23,7 +23,6 @@ import (
"github.com/openservicemesh/osm/pkg/k8s"

"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/policy"
"github.com/openservicemesh/osm/pkg/service"
"github.com/openservicemesh/osm/pkg/smi"
smiFake "github.com/openservicemesh/osm/pkg/smi/fake"
Expand Down Expand Up @@ -76,7 +75,7 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
httpRouteGroups []*spec.HTTPRouteGroup
tcpRoutes []*spec.TCPRoute
trafficSplits []*split.TrafficSplit
upstreamTrafficSetting *policyv1alpha1.UpstreamTrafficSetting
upstreamTrafficSettings []*policyv1alpha1.UpstreamTrafficSetting
prepare func(mockMeshSpec *smi.MockMeshSpec, trafficSplits []*split.TrafficSplit)
expectedInboundMeshPolicy *trafficpolicy.InboundMeshTrafficPolicy
}{
Expand Down Expand Up @@ -1795,13 +1794,34 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
},
},
trafficSplits: nil,
upstreamTrafficSetting: &policyv1alpha1.UpstreamTrafficSetting{
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: "/get", // matches route allowed by HTTPRouteGroup
RateLimit: perRouteLocalRateLimitConfig,
upstreamTrafficSettings: []*policyv1alpha1.UpstreamTrafficSetting{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s1.ns1.svc.cluster.local",
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: "/get", // matches route allowed by HTTPRouteGroup
RateLimit: perRouteLocalRateLimitConfig,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s2.ns1.svc.cluster.local",
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: "/get", // matches route allowed by HTTPRouteGroup
RateLimit: perRouteLocalRateLimitConfig,
},
},
},
},
Expand Down Expand Up @@ -1930,13 +1950,34 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
},
},
permissiveMode: true,
upstreamTrafficSetting: &policyv1alpha1.UpstreamTrafficSetting{
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteLocalRateLimitConfig,
upstreamTrafficSettings: []*policyv1alpha1.UpstreamTrafficSetting{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s1.ns1.svc.cluster.local",
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteLocalRateLimitConfig,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s2.ns1.svc.cluster.local",
RateLimit: virtualHostLocalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteLocalRateLimitConfig,
},
},
},
},
Expand Down Expand Up @@ -2045,13 +2086,34 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {
},
},
permissiveMode: true,
upstreamTrafficSetting: &policyv1alpha1.UpstreamTrafficSetting{
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
RateLimit: virtualHostGlobalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteGlobalRateLimitConfig,
upstreamTrafficSettings: []*policyv1alpha1.UpstreamTrafficSetting{
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s1.ns1.svc.cluster.local",
RateLimit: virtualHostGlobalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteGlobalRateLimitConfig,
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Namespace: "ns1",
},
Spec: policyv1alpha1.UpstreamTrafficSettingSpec{
Host: "s2.ns1.svc.cluster.local",
RateLimit: virtualHostGlobalRateLimitConfig,
HTTPRoutes: []policyv1alpha1.HTTPRouteSpec{
{
Path: ".*", // matches wildcard path regex for permissive mode
RateLimit: perRouteGlobalRateLimitConfig,
},
},
},
},
Expand Down Expand Up @@ -2156,19 +2218,19 @@ func TestGetInboundMeshTrafficPolicy(t *testing.T) {

fakeCertManager := tresorFake.NewFake(1 * time.Hour)

mockPolicyController := policy.NewMockController(mockCtrl)
mockMeshSpec := smi.NewMockMeshSpec(mockCtrl)
mockK8s := k8s.NewMockController(mockCtrl)
provider := kube.NewClient(mockK8s)

mc := MeshCatalog{
policyController: mockPolicyController,
certManager: fakeCertManager,
meshSpec: mockMeshSpec,
Interface: provider,
certManager: fakeCertManager,
meshSpec: mockMeshSpec,
Interface: provider,
}

mockPolicyController.EXPECT().GetUpstreamTrafficSetting(gomock.Any()).Return(tc.upstreamTrafficSetting).AnyTimes()
mockK8s.EXPECT().ListUpstreamTrafficSettings().Return(tc.upstreamTrafficSettings).AnyTimes()
mockK8s.EXPECT().ListEgressPolicies().Return([]*policyv1alpha1.Egress{}).AnyTimes()

mockK8s.EXPECT().GetMeshConfig().Return(v1alpha2.MeshConfig{
Spec: v1alpha2.MeshConfigSpec{
Traffic: v1alpha2.TrafficSpec{
Expand Down
2 changes: 1 addition & 1 deletion pkg/catalog/ingress.go
Expand Up @@ -42,7 +42,7 @@ func (mc *MeshCatalog) GetIngressTrafficPolicies(meshServices []service.MeshServ
// Depending on if the IngressBackend API is enabled, the policies will be generated either from the IngressBackend
// or Kubernetes Ingress API.
func (mc *MeshCatalog) GetIngressTrafficPolicy(svc service.MeshService) (*trafficpolicy.IngressTrafficPolicy, error) {
ingressBackendPolicy := mc.policyController.GetIngressBackendPolicy(svc)
ingressBackendPolicy := mc.GetIngressBackendPolicyForService(svc)
if ingressBackendPolicy == nil {
log.Trace().Msgf("Did not find IngressBackend policy for service %s", svc)
return nil, nil
Expand Down
7 changes: 2 additions & 5 deletions pkg/catalog/ingress_test.go
Expand Up @@ -12,7 +12,6 @@ import (
policyV1alpha1 "github.com/openservicemesh/osm/pkg/apis/policy/v1alpha1"
"github.com/openservicemesh/osm/pkg/compute"
"github.com/openservicemesh/osm/pkg/endpoint"
"github.com/openservicemesh/osm/pkg/policy"

"github.com/openservicemesh/osm/pkg/identity"
"github.com/openservicemesh/osm/pkg/service"
Expand Down Expand Up @@ -390,16 +389,14 @@ func TestGetIngressTrafficPolicy(t *testing.T) {
defer mockCtrl.Finish()

mockProvider := compute.NewMockInterface(mockCtrl)
mockPolicyController := policy.NewMockController(mockCtrl)

meshCatalog := &MeshCatalog{
Interface: mockProvider,
policyController: mockPolicyController,
Interface: mockProvider,
}

// Note: if AnyTimes() is used with a mock function, it implies the function may or may not be called
// depending on the test case.
mockPolicyController.EXPECT().GetIngressBackendPolicy(tc.meshSvc).Return(tc.ingressBackend).AnyTimes()
mockProvider.EXPECT().GetIngressBackendPolicyForService(tc.meshSvc).Return(tc.ingressBackend).AnyTimes()
mockProvider.EXPECT().ListEndpointsForService(ingressSourceSvc).Return(ingressBackendSvcEndpoints).AnyTimes()
mockProvider.EXPECT().ListEndpointsForService(sourceSvcWithoutEndpoints).Return(nil).AnyTimes()
mockProvider.EXPECT().UpdateIngressBackendStatus(gomock.Any()).Return(nil, nil).AnyTimes()
Expand Down

0 comments on commit e78aa42

Please sign in to comment.