-
Notifications
You must be signed in to change notification settings - Fork 228
USHIFT-6795: C2CC: Router controller #6599
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
pmtk
wants to merge
35
commits into
openshift:main
Choose a base branch
from
pmtk:c2cc/routes
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
35 commits
Select commit
Hold shift + click to select a range
5efb8ca
Add extra host variables to variables.yaml
pmtk 00ff01c
Multi cluster keywords
pmtk 2898697
c2cc sanity test
pmtk dda675d
Init C2CC Route Manager
pmtk cf6866e
Vendor libovsdb & knftables
pmtk d5216b0
Routes controller
pmtk 9aad82c
Service Routes Controller
pmtk e215be9
NBDB
pmtk 72dd158
OVN-K controller
pmtk 98a24cd
nftables controller
pmtk 2324c74
Node annotation controller
pmtk 4638ea6
default NetworkPolicy controller
pmtk 7a0e9c8
Integrate sub-controllers
pmtk 47b9edd
Cleanup on reconcile failure
pmtk a668f1c
Subscribe to events to retrigger reconciliation
pmtk 97fe31c
Refactor common parts for routes
pmtk 79554f2
routes: remove LinkIndex so kernel can resolve nexthop via main table
pmtk 3d79493
Infra Test Suite
pmtk c463a56
Connectivity Test Suite
pmtk 467dfcd
Inject network config before installation
pmtk 331506c
Test: wait on greenboot before enabling c2cc
pmtk cb25307
Don't rollback on failure
pmtk 4172943
Address local CR review
pmtk 913ae39
Add negative tests for reconciliation
pmtk 9000560
Debounce 'nftables-change' log
pmtk 1094edf
CR fixes
pmtk 3188ea9
Filter annotation subscribe to SNAT annotation changes only
pmtk ec33707
Add context to nftables subscribe for explicit shutdown
pmtk d3f3321
Drain reconcile channel to coalesce queued events
pmtk 8e59bc6
Apply CodeRabbit suggestions from PR #6599
pmtk 625d5d7
self review
pmtk 305e342
Merge/subtract SNAT annotation CIDRs instead of owning the whole value
pmtk 41e3ab2
Add E2E tests for C2CC cleanup on disable
pmtk 43b3bd9
Check OVSDB per-operation errors in OVN cleanup
pmtk 22dbfdd
Use copy_file_from_vm for IPv6-safe kubeconfig fetch
pmtk File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,208 @@ | ||
| package c2cc | ||
|
|
||
| import ( | ||
| "context" | ||
| "encoding/json" | ||
| "fmt" | ||
| "sort" | ||
| "time" | ||
|
|
||
| corev1 "k8s.io/api/core/v1" | ||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||
| "k8s.io/apimachinery/pkg/types" | ||
| "k8s.io/apimachinery/pkg/watch" | ||
| "k8s.io/client-go/kubernetes" | ||
| "k8s.io/klog/v2" | ||
| ) | ||
|
|
||
| const ( | ||
| ovnNodeDontSNATSubnets = "k8s.ovn.org/node-ingress-snat-exclude-subnets" | ||
| c2ccSNATTrackingAnnotation = "microshift.io/c2cc-snat-subnets" | ||
| ) | ||
|
|
||
| type annotationManager struct { | ||
| kubeClient kubernetes.Interface | ||
| nodeName string | ||
| desiredCIDRs []string | ||
| } | ||
|
|
||
| func newAnnotationManager(kubeClient kubernetes.Interface, nodeName string, remoteCIDRs []string) *annotationManager { | ||
| sorted := make([]string, len(remoteCIDRs)) | ||
| copy(sorted, remoteCIDRs) | ||
| sort.Strings(sorted) | ||
|
|
||
| return &annotationManager{ | ||
| kubeClient: kubeClient, | ||
| nodeName: nodeName, | ||
| desiredCIDRs: sorted, | ||
| } | ||
| } | ||
|
|
||
| func parseCIDRAnnotation(value string) []string { | ||
| if value == "" { | ||
| return nil | ||
| } | ||
| var cidrs []string | ||
| if err := json.Unmarshal([]byte(value), &cidrs); err != nil { | ||
| return nil | ||
| } | ||
| return cidrs | ||
| } | ||
|
|
||
| func cidrSetContainsAll(superset, subset []string) bool { | ||
| set := make(map[string]bool, len(superset)) | ||
| for _, c := range superset { | ||
| set[c] = true | ||
| } | ||
| for _, c := range subset { | ||
| if !set[c] { | ||
| return false | ||
| } | ||
| } | ||
| return true | ||
| } | ||
|
|
||
| func (a *annotationManager) reconcile(ctx context.Context) error { | ||
| node, err := a.kubeClient.CoreV1().Nodes().Get(ctx, a.nodeName, metav1.GetOptions{}) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get node %q: %w", a.nodeName, err) | ||
| } | ||
|
|
||
| existing := parseCIDRAnnotation(node.Annotations[ovnNodeDontSNATSubnets]) | ||
| previous := parseCIDRAnnotation(node.Annotations[c2ccSNATTrackingAnnotation]) | ||
|
|
||
| // Target = (existing - previous) + desired | ||
| // This replaces only the CIDRs C2CC previously wrote, preserving anything added by other components. | ||
| foreignCIDRs := make(map[string]bool, len(existing)) | ||
| for _, c := range existing { | ||
| foreignCIDRs[c] = true | ||
| } | ||
| for _, c := range previous { | ||
| delete(foreignCIDRs, c) | ||
| } | ||
|
|
||
| targetSet := make(map[string]bool, len(foreignCIDRs)+len(a.desiredCIDRs)) | ||
| for c := range foreignCIDRs { | ||
| targetSet[c] = true | ||
| } | ||
| for _, c := range a.desiredCIDRs { | ||
| targetSet[c] = true | ||
| } | ||
|
|
||
| target := make([]string, 0, len(targetSet)) | ||
| for c := range targetSet { | ||
| target = append(target, c) | ||
| } | ||
| sort.Strings(target) | ||
|
|
||
| targetJSON, _ := json.Marshal(target) | ||
| desiredJSON, _ := json.Marshal(a.desiredCIDRs) | ||
|
|
||
| if node.Annotations[ovnNodeDontSNATSubnets] == string(targetJSON) && | ||
| node.Annotations[c2ccSNATTrackingAnnotation] == string(desiredJSON) { | ||
| return nil | ||
| } | ||
|
|
||
| patch := fmt.Sprintf(`{"metadata":{"annotations":{%q:%q,%q:%q}}}`, | ||
| ovnNodeDontSNATSubnets, string(targetJSON), | ||
| c2ccSNATTrackingAnnotation, string(desiredJSON)) | ||
| _, err = a.kubeClient.CoreV1().Nodes().Patch(ctx, a.nodeName, | ||
| types.MergePatchType, []byte(patch), metav1.PatchOptions{}) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to patch node annotation: %w", err) | ||
| } | ||
| klog.V(2).Infof("Updated node annotation %s = %s (tracking: %s)", ovnNodeDontSNATSubnets, string(targetJSON), string(desiredJSON)) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *annotationManager) cleanup(ctx context.Context) error { | ||
| node, err := a.kubeClient.CoreV1().Nodes().Get(ctx, a.nodeName, metav1.GetOptions{}) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to get node %q for cleanup: %w", a.nodeName, err) | ||
| } | ||
|
|
||
| tracked := parseCIDRAnnotation(node.Annotations[c2ccSNATTrackingAnnotation]) | ||
| if len(tracked) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| existing := parseCIDRAnnotation(node.Annotations[ovnNodeDontSNATSubnets]) | ||
| trackedSet := make(map[string]bool, len(tracked)) | ||
| for _, c := range tracked { | ||
| trackedSet[c] = true | ||
| } | ||
|
|
||
| var remaining []string | ||
| for _, c := range existing { | ||
| if !trackedSet[c] { | ||
| remaining = append(remaining, c) | ||
| } | ||
| } | ||
|
|
||
| var snatValue string | ||
| if len(remaining) == 0 { | ||
| snatValue = "null" | ||
| } else { | ||
| sort.Strings(remaining) | ||
| data, _ := json.Marshal(remaining) | ||
| snatValue = fmt.Sprintf("%q", string(data)) | ||
| } | ||
|
|
||
| patch := fmt.Sprintf(`{"metadata":{"annotations":{%s:%s,%q:null}}}`, | ||
| fmt.Sprintf("%q", ovnNodeDontSNATSubnets), snatValue, | ||
| c2ccSNATTrackingAnnotation) | ||
| _, err = a.kubeClient.CoreV1().Nodes().Patch(ctx, a.nodeName, | ||
| types.MergePatchType, []byte(patch), metav1.PatchOptions{}) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to cleanup node annotation: %w", err) | ||
| } | ||
| klog.V(2).Infof("Cleaned up node annotation %s (removed %d C2CC CIDRs, %d remaining)", ovnNodeDontSNATSubnets, len(tracked), len(remaining)) | ||
| return nil | ||
| } | ||
|
|
||
| func (a *annotationManager) subscribe(ctx context.Context, reconcileCh chan<- string) { | ||
| go func() { | ||
| for { | ||
| watcher, err := a.kubeClient.CoreV1().Nodes().Watch(ctx, metav1.ListOptions{ | ||
| FieldSelector: "metadata.name=" + a.nodeName, | ||
| }) | ||
| if err != nil { | ||
| klog.Warningf("Could not watch node for annotation changes: %v", err) | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-time.After(10 * time.Second): | ||
| continue | ||
| } | ||
| } | ||
| for event := range watcher.ResultChan() { | ||
| if event.Type != watch.Modified { | ||
| continue | ||
| } | ||
| node, ok := event.Object.(*corev1.Node) | ||
| if !ok { | ||
| continue | ||
| } | ||
| current := parseCIDRAnnotation(node.Annotations[ovnNodeDontSNATSubnets]) | ||
| if cidrSetContainsAll(current, a.desiredCIDRs) { | ||
| continue | ||
| } | ||
| select { | ||
| case reconcileCh <- "node-annotation-changed": | ||
| default: | ||
| } | ||
| } | ||
| watcher.Stop() | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
| klog.V(4).Infof("Node watch closed unexpectedly, reconnecting") | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| case <-time.After(1 * time.Second): | ||
| } | ||
| } | ||
| }() | ||
| klog.V(2).Infof("Subscribed to node annotation changes for %s", a.nodeName) | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.