Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update PE to support a new ns scoped pods field and status conditions #70

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
update PE to support a new ns scoped pods field and status conditions
  • Loading branch information
haouc committed Jan 25, 2024
commit 63d44d02826d0d95330f0ab0a2f72df2ab4ddd96
34 changes: 34 additions & 0 deletions api/v1alpha1/policyendpoint_types.go
Original file line number Diff line number Diff line change
@@ -93,12 +93,46 @@ type PolicyEndpointSpec struct {

// Egress is the list of egress rules containing resolved network addresses
Egress []EndpointInfo `json:"egress,omitempty"`

// AllPodsInNameSpace is the boolean value indicating should all pods in the policy namespace be selected
// +optional
AllPodsInNamespace bool `json:"allPodsInNamespace,omitempty"`
}

// PolicyEndpointStatus defines the observed state of PolicyEndpoint
type PolicyEndpointStatus struct {
// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
// Important: Run "make" to regenerate code after modifying this file

// +optional
Conditions []PolicyEndpointCondition `json:"conditions,omitempty"`
}

type PolicyEndpointConditionType string

const (
Packed PolicyEndpointConditionType = "PackedPolicyEndpoint"
Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint"
)

// PolicyEndpointCondition describes the state of a PolicyEndpoint at a certain point.
// For example, binpacking PE slices should be updated as a condition change
type PolicyEndpointCondition struct {
// Type of PolicyEndpoint condition.
// +optional
Type PolicyEndpointConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
// +optional
Status corev1.ConditionStatus `json:"status"`
// Last time the condition transitioned from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// The reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
// +optional
Message string `json:"message,omitempty"`
}

//+kubebuilder:object:root=true
31 changes: 31 additions & 0 deletions config/crd/bases/networking.k8s.aws_policyendpoints.yaml
Original file line number Diff line number Diff line change
@@ -35,6 +35,10 @@ spec:
spec:
description: PolicyEndpointSpec defines the desired state of PolicyEndpoint
properties:
allPodsInNamespace:
description: AllPodsInNameSpace is the boolean value indicating should
all pods in the policy namespace be selected
type: boolean
egress:
description: Egress is the list of egress rules containing resolved
network addresses
@@ -225,6 +229,33 @@ spec:
type: object
status:
description: PolicyEndpointStatus defines the observed state of PolicyEndpoint
properties:
conditions:
items:
description: PolicyEndpointCondition describes the state of a PolicyEndpoint
at a certain point. For example, binpacking PE slices should be
updated as a condition change
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status
to another.
format: date-time
type: string
message:
description: A human readable message indicating details about
the transition.
type: string
reason:
description: The reason for the condition's last transition.
type: string
status:
description: Status of the condition, one of True, False, Unknown.
type: string
type:
description: Type of PolicyEndpoint condition.
type: string
type: object
type: array
type: object
type: object
served: true
111 changes: 89 additions & 22 deletions pkg/policyendpoints/manager.go
Original file line number Diff line number Diff line change
@@ -4,13 +4,15 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"strconv"

"golang.org/x/exp/maps"

"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -21,6 +23,8 @@ import (
policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/k8s"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/resolvers"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conditions"
"github.com/aws/amazon-network-policy-controller-k8s/pkg/utils/conversions"
)

type PolicyEndpointsManager interface {
@@ -39,6 +43,19 @@ func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, l
}
}

const (
ingressShift = 2
egressShift = 1
psShift = 0

ingBit = 4
egBit = 2
psBit = 1

reasonBinPacking = "PEBinPacked"
reasonPatching = "PEPatched"
)

var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil)

type policyEndpointsManager struct {
@@ -60,12 +77,9 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
client.MatchingFields{IndexKeyPolicyReferenceName: policy.Name}); err != nil {
return err
}
existingPolicyEndpoints := make([]policyinfo.PolicyEndpoint, 0, len(policyEndpointList.Items))
for _, policyEndpoint := range policyEndpointList.Items {
existingPolicyEndpoints = append(existingPolicyEndpoints, policyEndpoint)
}

createList, updateList, deleteList, err := m.computePolicyEndpoints(policy, existingPolicyEndpoints, ingressRules, egressRules, podSelectorEndpoints)
createList, updateList, deleteList, packed, err := m.computePolicyEndpoints(policy, policyEndpointList.Items, ingressRules, egressRules, podSelectorEndpoints)
m.logger.Info("the controller is packing PE rules", "Packed", conversions.IntToBool(packed))
if err != nil {
return err
}
@@ -79,17 +93,42 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki

for _, policyEndpoint := range updateList {
oldRes := &policyinfo.PolicyEndpoint{}
if err := m.k8sClient.Get(ctx, k8s.NamespacedName(&policyEndpoint), oldRes); err != nil {
peId := k8s.NamespacedName(&policyEndpoint)
if err := m.k8sClient.Get(ctx, peId, oldRes); err != nil {
return err
}
if equality.Semantic.DeepEqual(oldRes.Spec, policyEndpoint.Spec) {
m.logger.V(1).Info("Policy endpoint already up to date", "id", k8s.NamespacedName(&policyEndpoint))
m.logger.V(1).Info("Policy endpoint already up to date", "id", peId)
continue
}

if err := m.k8sClient.Patch(ctx, &policyEndpoint, client.MergeFrom(oldRes)); err != nil {
if cErr := conditions.UpdatePEConditions(ctx, m.k8sClient,
peId,
m.logger,
policyinfo.Updated,
corev1.ConditionFalse,
reasonPatching,
fmt.Sprintf("patching policy endpoint failed: %s", err.Error()),
); cErr != nil {
m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId)
}
return err
}
m.logger.Info("Updated policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
m.logger.Info("Updated policy endpoint", "id", peId)

if packed > 0 {
if err := conditions.UpdatePEConditions(ctx, m.k8sClient,
peId,
m.logger,
policyinfo.Packed,
corev1.ConditionTrue,
reasonBinPacking,
fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1),
); err != nil {
m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId)
}
}
}

for _, policyEndpoint := range deleteList {
@@ -123,7 +162,7 @@ func (m *policyEndpointsManager) Cleanup(ctx context.Context, policy *networking
func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.NetworkPolicy,
existingPolicyEndpoints []policyinfo.PolicyEndpoint, ingressEndpoints []policyinfo.EndpointInfo,
egressEndpoints []policyinfo.EndpointInfo, podSelectorEndpoints []policyinfo.PodEndpoint) ([]policyinfo.PolicyEndpoint,
[]policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, error) {
[]policyinfo.PolicyEndpoint, []policyinfo.PolicyEndpoint, int, error) {

// Loop through ingressEndpoints, egressEndpoints and podSelectorEndpoints and put in map
// also populate them into policy endpoints
@@ -138,11 +177,11 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
var deletePolicyEndpoints []policyinfo.PolicyEndpoint

// packing new ingress rules
createPolicyEndpoints, doNotDeleteIngress := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeleteIngress, ingPacked := m.packingIngressRules(policy, ingressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new egress rules
createPolicyEndpoints, doNotDeleteEgress := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeleteEgress, egPacked := m.packingEgressRules(policy, egressEndpointsMap, createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
// packing new pod selector
createPolicyEndpoints, doNotDeletePs := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes)
createPolicyEndpoints, doNotDeletePs, psPacked := m.packingPodSelectorEndpoints(policy, podSelectorEndpointSet.UnsortedList(), createPolicyEndpoints, modifiedEndpoints, potentialDeletes)

doNotDelete.Insert(doNotDeleteIngress.UnsortedList()...)
doNotDelete.Insert(doNotDeleteEgress.UnsortedList()...)
@@ -167,7 +206,7 @@ func (m *policyEndpointsManager) computePolicyEndpoints(policy *networking.Netwo
}
}

return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, nil
return createPolicyEndpoints, updatePolicyEndpoints, deletePolicyEndpoints, (conversions.BoolToint(ingPacked) << ingressShift) | (conversions.BoolToint(egPacked) << egressShift) | (conversions.BoolToint(psPacked) << psShift), nil
}

func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPolicy,
@@ -202,6 +241,13 @@ func (m *policyEndpointsManager) newPolicyEndpoint(policy *networking.NetworkPol
Egress: egressRules,
},
}

// if no pod selector is specified, the controller adds a boolean value true to AllPodsInNamespace
if policy.Spec.PodSelector.Size() == 0 {
m.logger.Info("Creating a new PE but requested NP doesn't have pod selector", "NPName", policy.Name, "NPNamespace", policy.Namespace)
policyEndpoint.Spec.AllPodsInNamespace = true
}

return policyEndpoint
}

@@ -319,24 +365,32 @@ func (m *policyEndpointsManager) processExistingPolicyEndpoints(
// it returns the ingress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
ingressList := maps.Keys(rulesMap)

packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Ingress) < m.endpointChunkSize && chunkEndIdx < len(ingressList) {
// when new ingress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Ingress)
packed = spots > 0 && len(sliceToCheck[i].Spec.Ingress) > 0 && spots < len(ingressList)

if spots > 0 && chunkEndIdx < len(ingressList) {
for len(sliceToCheck[i].Spec.Ingress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(ingressList)-1 {
chunkEndIdx++
}

sliceToCheck[i].Spec.Ingress = append(sliceToCheck[i].Spec.Ingress, m.getListOfEndpointInfoFromHash(lo.Slice(ingressList, chunkStartIdx, chunkEndIdx+1), rulesMap)...)

// move the end to next available index to prepare next appending
chunkEndIdx++
}
@@ -355,26 +409,33 @@ func (m *policyEndpointsManager) packingIngressRules(policy *networking.NetworkP
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}

// packingEgressRules iterates over egress rules across available policy endpoints and required egress rule changes.
// it returns the egress rules packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPolicy,
rulesMap map[string]policyinfo.EndpointInfo,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {
doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
egressList := maps.Keys(rulesMap)

packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.Egress) < m.endpointChunkSize && chunkEndIdx < len(egressList) {
// when new egress rule list is greater than available spots in current non-empty PE rule's list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.Egress)
packed = spots > 0 && len(sliceToCheck[i].Spec.Egress) > 0 && spots < len(egressList)

if spots > 0 && chunkEndIdx < len(egressList) {
for len(sliceToCheck[i].Spec.Egress)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(egressList)-1 {
chunkEndIdx++
}
@@ -398,26 +459,32 @@ func (m *policyEndpointsManager) packingEgressRules(policy *networking.NetworkPo
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}

// packingPodSelectorEndpoints iterates over pod selectors across available policy endpoints and required pod selector changes.
// it returns the pod selectors packed in policy endpoints and a set of policy endpoints that need to be kept.
func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.NetworkPolicy,
psList []policyinfo.PodEndpoint,
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName]) {
createPolicyEndpoints, modifiedEndpoints, potentialDeletes []policyinfo.PolicyEndpoint) ([]policyinfo.PolicyEndpoint, sets.Set[types.NamespacedName], bool) {

doNotDelete := sets.Set[types.NamespacedName]{}
chunkStartIdx := 0
chunkEndIdx := 0
packed := false

// try to fill existing polciy endpoints first and then new ones if needed
for _, sliceToCheck := range [][]policyinfo.PolicyEndpoint{modifiedEndpoints, potentialDeletes, createPolicyEndpoints} {
for i := range sliceToCheck {
// reset start pointer if end pointer is updated
chunkStartIdx = chunkEndIdx

// Instead of adding the entire chunk we should try to add to full the slice
if len(sliceToCheck[i].Spec.PodSelectorEndpoints) < m.endpointChunkSize && chunkEndIdx < len(psList) {
// when new pods list is greater than available spots in current non-empty PS list, we do binpacking
spots := m.endpointChunkSize - len(sliceToCheck[i].Spec.PodSelectorEndpoints)
packed = spots > 0 && len(sliceToCheck[i].Spec.PodSelectorEndpoints) > 0 && spots < len(psList)

if spots > 0 && chunkEndIdx < len(psList) {
for len(sliceToCheck[i].Spec.PodSelectorEndpoints)+(chunkEndIdx-chunkStartIdx+1) < m.endpointChunkSize && chunkEndIdx < len(psList)-1 {
chunkEndIdx++
}
@@ -441,5 +508,5 @@ func (m *policyEndpointsManager) packingPodSelectorEndpoints(policy *networking.
createPolicyEndpoints = append(createPolicyEndpoints, newEP)
}
}
return createPolicyEndpoints, doNotDelete
return createPolicyEndpoints, doNotDelete, packed
}
2 changes: 1 addition & 1 deletion pkg/policyendpoints/manager_test.go
Original file line number Diff line number Diff line change
@@ -448,7 +448,7 @@ func Test_policyEndpointsManager_computePolicyEndpoints(t *testing.T) {
m := &policyEndpointsManager{
endpointChunkSize: tt.fields.endpointChunkSize,
}
createList, updateList, deleteList, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints,
createList, updateList, deleteList, _, err := m.computePolicyEndpoints(tt.args.policy, tt.args.policyEndpoints,
tt.args.ingressRules, tt.args.egressRules, tt.args.podselectorEndpoints)

if len(tt.wantErr) > 0 {
Loading
Oops, something went wrong.
Loading
Oops, something went wrong.