Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update PE to support a new ns scoped pods field and status conditions #70

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
using awslab package to manage conditions
  • Loading branch information
haouc committed Jan 26, 2024
commit 3c29dc77d27525aae8e6675a90a97635becbc49d
35 changes: 14 additions & 21 deletions api/v1alpha1/policyendpoint_types.go
Original file line number Diff line number Diff line change
@@ -17,6 +17,7 @@ limitations under the License.
package v1alpha1

import (
"github.com/awslabs/operatorpkg/status"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -105,7 +106,7 @@ type PolicyEndpointStatus struct {
// Important: Run "make" to regenerate code after modifying this file

// +optional
Conditions []PolicyEndpointCondition `json:"conditions,omitempty"`
Conditions []status.Condition `json:"conditions,omitempty"`
}

type PolicyEndpointConditionType string
@@ -115,26 +116,6 @@ const (
Updated PolicyEndpointConditionType = "PatchedPolicyEndpoint"
)

// PolicyEndpointCondition describes the state of a PolicyEndpoint at a certain point.
// For example, binpacking PE slices should be updated as a condition change
type PolicyEndpointCondition struct {
// Type of PolicyEndpoint condition.
// +optional
Type PolicyEndpointConditionType `json:"type"`
// Status of the condition, one of True, False, Unknown.
// +optional
Status corev1.ConditionStatus `json:"status"`
// Last time the condition transitioned from one status to another.
// +optional
LastTransitionTime metav1.Time `json:"lastTransitionTime,omitempty"`
// The reason for the condition's last transition.
// +optional
Reason string `json:"reason,omitempty"`
// A human readable message indicating details about the transition.
// +optional
Message string `json:"message,omitempty"`
}

//+kubebuilder:object:root=true
//+kubebuilder:subresource:status

@@ -159,3 +140,15 @@ type PolicyEndpointList struct {
func init() {
SchemeBuilder.Register(&PolicyEndpoint{}, &PolicyEndpointList{})
}

func (s *PolicyEndpoint) GetConditions() []status.Condition {
return []status.Condition(s.Status.Conditions)
}

func (s *PolicyEndpoint) SetConditions(conds []status.Condition) {
s.Status.Conditions = conds
}

func (s *PolicyEndpoint) StatusConditions() status.ConditionSet {
return status.NewReadyConditions().For(s)
}
10 changes: 9 additions & 1 deletion api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
@@ -116,7 +116,7 @@ func main() {
os.Exit(1)
}

policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(mgr.GetClient(),
policyEndpointsManager := policyendpoints.NewPolicyEndpointsManager(ctx, mgr.GetClient(),
controllerCFG.EndpointChunkSize, ctrl.Log.WithName("endpoints-manager"))
finalizerManager := k8s.NewDefaultFinalizerManager(mgr.GetClient(), ctrl.Log.WithName("finalizer-manager"))
policyController := controllers.NewPolicyReconciler(mgr.GetClient(), policyEndpointsManager,
55 changes: 45 additions & 10 deletions config/crd/bases/networking.k8s.aws_policyendpoints.yaml
Original file line number Diff line number Diff line change
@@ -232,28 +232,63 @@ spec:
properties:
conditions:
items:
description: PolicyEndpointCondition describes the state of a PolicyEndpoint
at a certain point. For example, binpacking PE slices should be
updated as a condition change
description: Condition aliases the upstream type and adds additional
helper methods
properties:
lastTransitionTime:
description: Last time the condition transitioned from one status
to another.
description: lastTransitionTime is the last time the condition
transitioned from one status to another. This should be when
the underlying condition changed. If that is not known, then
using the time when the API field changed is acceptable.
format: date-time
type: string
message:
description: A human readable message indicating details about
the transition.
description: message is a human readable message indicating
details about the transition. This may be an empty string.
maxLength: 32768
type: string
observedGeneration:
description: observedGeneration represents the .metadata.generation
that the condition was set based upon. For instance, if .metadata.generation
is currently 12, but the .status.conditions[x].observedGeneration
is 9, the condition is out of date with respect to the current
state of the instance.
format: int64
minimum: 0
type: integer
reason:
description: The reason for the condition's last transition.
description: reason contains a programmatic identifier indicating
the reason for the condition's last transition. Producers
of specific condition types may define expected values and
meanings for this field, and whether the values are considered
a guaranteed API. The value should be a CamelCase string.
This field may not be empty.
maxLength: 1024
minLength: 1
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
type: string
status:
description: Status of the condition, one of True, False, Unknown.
description: status of the condition, one of True, False, Unknown.
enum:
- "True"
- "False"
- Unknown
type: string
type:
description: Type of PolicyEndpoint condition.
description: type of condition in CamelCase or in foo.example.com/CamelCase.
--- Many .condition.type values are consistent across resources
like Available, but because arbitrary conditions can be useful
(see .node.status.conditions), the ability to deconflict is
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
maxLength: 316
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
type: string
required:
- lastTransitionTime
- message
- reason
- status
- type
type: object
type: array
type: object
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ require (
)

require (
github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592 h1:LSaLHzJ4IMZZLgVIx/2YIcvUCIAaE5OqLhjWzdwF060=
github.com/awslabs/operatorpkg v0.0.0-20231211224023-fce5f0fa8592/go.mod h1:kqgbtyanB/ObfvsSUdGZOk1f3K807kvoibKoKX0wMK4=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
23 changes: 17 additions & 6 deletions pkg/policyendpoints/manager.go
Original file line number Diff line number Diff line change
@@ -12,7 +12,6 @@ import (
"github.com/go-logr/logr"
"github.com/pkg/errors"
"github.com/samber/lo"
corev1 "k8s.io/api/core/v1"
networking "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -33,9 +32,10 @@ type PolicyEndpointsManager interface {
}

// NewPolicyEndpointsManager constructs a new policyEndpointsManager
func NewPolicyEndpointsManager(k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager {
func NewPolicyEndpointsManager(ctx context.Context, k8sClient client.Client, endpointChunkSize int, logger logr.Logger) *policyEndpointsManager {
endpointsResolver := resolvers.NewEndpointsResolver(k8sClient, logger.WithName("endpoints-resolver"))
return &policyEndpointsManager{
ctx: ctx,
k8sClient: k8sClient,
endpointsResolver: endpointsResolver,
endpointChunkSize: endpointChunkSize,
@@ -59,6 +59,7 @@ const (
var _ PolicyEndpointsManager = (*policyEndpointsManager)(nil)

type policyEndpointsManager struct {
ctx context.Context
k8sClient client.Client
endpointsResolver resolvers.EndpointsResolver
endpointChunkSize int
@@ -88,6 +89,12 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
if err := m.k8sClient.Create(ctx, &policyEndpoint); err != nil {
return err
}
// initialize the PE's conditions
conditions.CreatePEInitCondition(m.ctx,
m.k8sClient,
types.NamespacedName{Name: policyEndpoint.Name, Namespace: policyEndpoint.Namespace},
m.logger,
)
m.logger.Info("Created policy endpoint", "id", k8s.NamespacedName(&policyEndpoint))
}

@@ -107,11 +114,13 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
peId,
m.logger,
policyinfo.Updated,
corev1.ConditionFalse,
metav1.ConditionFalse,
reasonPatching,
fmt.Sprintf("patching policy endpoint failed: %s", err.Error()),
// keep condition history for error states
true,
); cErr != nil {
m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId)
m.logger.Error(cErr, "Adding PE patch failure condition updates to PE failed", "PENamespacedName", peId, "RV", policyEndpoint.ResourceVersion)
}
return err
}
@@ -122,9 +131,11 @@ func (m *policyEndpointsManager) Reconcile(ctx context.Context, policy *networki
peId,
m.logger,
policyinfo.Packed,
corev1.ConditionTrue,
metav1.ConditionTrue,
reasonBinPacking,
fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1),
fmt.Sprintf("binpacked network policy endpoint slices on Ingress - %t, Egress - %t, PodSelector - %t with RV %s", packed&ingBit>>ingressShift == 1, packed&egBit>>egressShift == 1, packed&psBit>>psShift == 1, policyEndpoint.ResourceVersion),
// don't keep packing states history. if required, this can be changed to true later.
false,
); err != nil {
m.logger.Error(err, "Adding bingpacking condition updates to PE failed", "PENamespacedName", peId)
}
67 changes: 61 additions & 6 deletions pkg/utils/conditions/conditions.go
Original file line number Diff line number Diff line change
@@ -2,34 +2,89 @@ package conditions

import (
"context"
"time"

policyinfo "github.com/aws/amazon-network-policy-controller-k8s/api/v1alpha1"
"github.com/awslabs/operatorpkg/status"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
jitterWaitTime = time.Millisecond * 100
)

func CreatePEInitCondition(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger) {
// using a goroutine to add the condition with jitter wait.
go func() {
// since adding an init condition immediate after the PE is created
// waiting for a small time before calling
time.Sleep(wait.Jitter(jitterWaitTime, 0.25))
err := retry.OnError(
wait.Backoff{
Duration: time.Millisecond * 100,
Factor: 3.0,
Jitter: 0.1,
Steps: 5,
Cap: time.Second * 10,
},
func(err error) bool { return errors.IsNotFound(err) },
func() error {
pe := &policyinfo.PolicyEndpoint{}
var err error
if err = k8sClient.Get(ctx, key, pe); err != nil {
log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace)
} else {
copy := pe.DeepCopy()
copy.StatusConditions()
if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil {
log.Error(err, "creating PE init status failed", "PEName", pe.Name, "PENamespace", pe.Namespace)
}
}
return err
},
)
if err != nil {
log.Error(err, "adding PE init condition failed after retries", "PENamespacedName", key)
} else {
log.Info("added PE init condition", "PENamespacedName", key)
}
}()
}

func UpdatePEConditions(ctx context.Context, k8sClient client.Client, key types.NamespacedName, log logr.Logger,
cType policyinfo.PolicyEndpointConditionType,
cStatus corev1.ConditionStatus,
cStatus metav1.ConditionStatus,
cReason string,
cMsg string) error {
cMsg string,
keepConditions bool) error {
pe := &policyinfo.PolicyEndpoint{}
var err error
if err = k8sClient.Get(ctx, key, pe); err != nil {
log.Error(err, "getting PE for conditions update failed", "PEName", pe.Name, "PENamespace", pe.Namespace)
} else {
copy := pe.DeepCopy()
cond := policyinfo.PolicyEndpointCondition{
Type: cType,
cond := status.Condition{
Type: string(cType),
Status: cStatus,
LastTransitionTime: metav1.Now(),
Reason: cReason,
Message: cMsg,
}
copy.Status.Conditions = append(copy.Status.Conditions, cond)
if keepConditions {
// not overwrite old conditions that have the same type
conds := copy.GetConditions()
conds = append(conds, cond)
copy.SetConditions(conds)
} else {
// overwrite old conditions that have the same type
copy.StatusConditions().Set(cond)
}
log.Info("the controller added condition to PE", "PEName", copy.Name, "PENamespace", copy.Namespace, "Conditions", copy.Status.Conditions)
if err = k8sClient.Status().Patch(ctx, copy, client.MergeFrom(pe)); err != nil {
log.Error(err, "updating PE status failed", "PEName", pe.Name, "PENamespace", pe.Namespace)
Loading
Oops, something went wrong.