diff --git a/test/extended/project/project.go b/test/extended/project/project.go index 18bec80b4db9..ea8859881a80 100644 --- a/test/extended/project/project.go +++ b/test/extended/project/project.go @@ -1,8 +1,12 @@ package project import ( + "bufio" "context" "fmt" + "strings" + "sync" + "sync/atomic" "time" "github.com/davecgh/go-spew/spew" @@ -564,6 +568,439 @@ func hasExactlyTheseProjects(lister projectv1client.ProjectInterface, projects s return nil } +// cacheRaceTestConfig holds tuning parameters for TestProjectAuthCacheRaceCondition. +// +// These values are calibrated to trigger the concurrent map race (OCPBUGS-57474) in the +// openshift-apiserver authorization cache. The cache does a full rebuild every 15 seconds, +// so 3 rounds × ~30 seconds each spans multiple TTL cycles. 30 concurrent List readers +// (readersPerUser × userCount) plus 10 Watch readers maximize the chance of a List() call +// iterating a subjectRecord.namespaces map while synchronize() mutates it in place. +type cacheRaceTestConfig struct { + namespaceCount int + userCount int + readersPerUser int + setupConcurrency int + maxListLatency time.Duration + deleteFraction int + churnRounds int +} + +var defaultCacheRaceConfig = cacheRaceTestConfig{ + namespaceCount: 1000, + userCount: 10, + readersPerUser: 3, + setupConcurrency: 100, + maxListLatency: 2 * time.Second, + deleteFraction: 3, + churnRounds: 3, +} + +type cacheRaceUser struct { + name string + config *rest.Config +} + +type cacheRaceReaders struct { + wg sync.WaitGroup + listErrors atomic.Int64 + watchErrors atomic.Int64 + maxLatencyNs atomic.Int64 + regressionCount atomic.Int64 + listCalls atomic.Int64 + highWaterMarks []atomic.Int64 + monotonicDone chan struct{} + stopCh chan struct{} +} + +type rbRef struct { + namespace string + name string + user string +} + +func createTestNamespaces(ctx context.Context, oc *exutil.CLI, count, concurrency int) []string { + names := make([]string, 0, count) + var mu sync.Mutex + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + for range count { + wg.Add(1) + go func() { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + ns, err := oc.AdminKubeClient().CoreV1().Namespaces().Create(ctx, &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{GenerateName: "cache-race-"}, + }, metav1.CreateOptions{}) + o.Expect(err).NotTo(o.HaveOccurred(), "failed to create namespace") + oc.AddResourceToDelete(corev1.SchemeGroupVersion.WithResource("namespaces"), ns) + mu.Lock() + names = append(names, ns.Name) + mu.Unlock() + }() + } + wg.Wait() + return names +} + +func startReaders(ctx context.Context, users []cacheRaceUser, cfg cacheRaceTestConfig) *cacheRaceReaders { + r := &cacheRaceReaders{ + highWaterMarks: make([]atomic.Int64, len(users)), + monotonicDone: make(chan struct{}), + stopCh: make(chan struct{}), + } + + for i, u := range users { + for range cfg.readersPerUser { + r.wg.Add(1) + go func(userIdx int, restCfg *rest.Config) { + defer r.wg.Done() + client := projectv1client.NewForConfigOrDie(restCfg) + for { + select { + case <-r.stopCh: + return + default: + } + + start := time.Now() + projects, err := client.Projects().List(ctx, metav1.ListOptions{}) + elapsed := time.Since(start) + r.listCalls.Add(1) + + if elapsed.Nanoseconds() > r.maxLatencyNs.Load() { + r.maxLatencyNs.Store(elapsed.Nanoseconds()) + } + if err != nil { + framework.Logf("Projects().List() error: %v", err) + r.listErrors.Add(1) + continue + } + + // during the add-only phase, project count must never decrease + select { + case <-r.monotonicDone: + default: + count := int64(len(projects.Items)) + for { + prev := r.highWaterMarks[userIdx].Load() + if count < prev { + framework.Logf("REGRESSION: user %d saw %d projects, previously saw %d", userIdx, count, prev) + r.regressionCount.Add(1) + break + } + if count == prev || r.highWaterMarks[userIdx].CompareAndSwap(prev, count) { + break + } + } + } + } + }(i, u.config) + } + + r.wg.Add(1) + go func(restCfg *rest.Config) { + defer r.wg.Done() + client := projectv1client.NewForConfigOrDie(restCfg) + for { + select { + case <-r.stopCh: + return + default: + } + w, err := client.Projects().Watch(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("Projects().Watch() error: %v", err) + r.watchErrors.Add(1) + continue + } + func() { + defer w.Stop() + for { + select { + case <-r.stopCh: + return + case event, ok := <-w.ResultChan(): + if !ok { + return + } + if event.Type == watch.Error { + framework.Logf("Watch error event: %#v", event.Object) + r.watchErrors.Add(1) + } + } + } + }() + } + }(u.config) + } + + return r +} + +// runChurnRounds creates and deletes rolebindings in repeated rounds to generate +// sustained cache invalidation pressure. It returns the set of namespaces where +// the verify user (users[0]) lost access in the final round. +func runChurnRounds(ctx context.Context, oc *exutil.CLI, users []cacheRaceUser, namespaces []string, cfg cacheRaceTestConfig, readers *cacheRaceReaders) sets.Set[string] { + var setupErrors atomic.Int64 + deletedForVerifyUser := sets.New[string]() + + for round := 0; round < cfg.churnRounds; round++ { + framework.Logf("=== churn round %d/%d (listCalls so far: %d) ===", round+1, cfg.churnRounds, readers.listCalls.Load()) + + // create rolebindings — we build them directly instead of using + // authorization.AddUserViewToProject because we need the created + // object's name for the delete phase + roundRBs := createRoleBindings(ctx, oc, users, namespaces, cfg.setupConcurrency, &setupErrors) + framework.Logf("round %d create done: %d rolebindings", round+1, len(roundRBs)) + + if round == 0 { + close(readers.monotonicDone) + } + + // clear per-round tracking — a namespace deleted in a previous round + // gets a fresh rolebinding in this round, so only the final round's + // deletions matter for the correctness check + deletedForVerifyUser = sets.New[string]() + deleteRoleBindings(ctx, oc, roundRBs, users[0].name, cfg, &deletedForVerifyUser) + framework.Logf("round %d delete done", round+1) + } + + framework.Logf("all churn rounds complete (total listCalls: %d, setupErrors: %d)", readers.listCalls.Load(), setupErrors.Load()) + o.Expect(setupErrors.Load()).To(o.BeZero(), fmt.Sprintf("%d rolebinding create errors occurred", setupErrors.Load())) + + return deletedForVerifyUser +} + +func createRoleBindings(ctx context.Context, oc *exutil.CLI, users []cacheRaceUser, namespaces []string, concurrency int, errors *atomic.Int64) []rbRef { + refs := make(chan rbRef, len(users)*len(namespaces)) + sem := make(chan struct{}, concurrency) + var wg sync.WaitGroup + for _, nsName := range namespaces { + for _, u := range users { + wg.Add(1) + go func(ns, user string) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + rb := &rbacv1.RoleBinding{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "cache-race-view-", + }, + RoleRef: rbacv1.RoleRef{ + APIGroup: rbacv1.GroupName, + Kind: "ClusterRole", + Name: "view", + }, + Subjects: []rbacv1.Subject{ + {Kind: "User", Name: user}, + }, + } + created, err := oc.AdminKubeClient().RbacV1().RoleBindings(ns).Create(ctx, rb, metav1.CreateOptions{}) + if err != nil { + errors.Add(1) + return + } + refs <- rbRef{namespace: ns, name: created.Name, user: user} + }(nsName, u.name) + } + } + wg.Wait() + close(refs) + + var result []rbRef + for ref := range refs { + result = append(result, ref) + } + return result +} + +func deleteRoleBindings(ctx context.Context, oc *exutil.CLI, rbs []rbRef, verifyUser string, cfg cacheRaceTestConfig, deletedForVerifyUser *sets.Set[string]) { + sem := make(chan struct{}, cfg.setupConcurrency) + var mu sync.Mutex + var wg sync.WaitGroup + for i, ref := range rbs { + if i%cfg.deleteFraction != 0 { + continue + } + wg.Add(1) + go func(r rbRef) { + defer wg.Done() + sem <- struct{}{} + defer func() { <-sem }() + if err := oc.AdminKubeClient().RbacV1().RoleBindings(r.namespace).Delete(ctx, r.name, metav1.DeleteOptions{}); err != nil { + return + } + if r.user == verifyUser { + mu.Lock() + deletedForVerifyUser.Insert(r.namespace) + mu.Unlock() + } + }(ref) + } + wg.Wait() +} + +func assertNoRestarts(ctx context.Context, oc *exutil.CLI, baseline map[string]int32) { + current := getPodRestartCounts(ctx, oc, "openshift-apiserver") + restarted := false + for podName, base := range baseline { + cur, exists := current[podName] + if !exists { + framework.Logf("pod %s no longer exists (may have been rescheduled)", podName) + continue + } + if cur != base { + framework.Logf("pod %s restarted during test (before=%d, after=%d)", podName, base, cur) + restarted = true + } + } + if restarted { + dumpApiserverCrashEvidence(ctx, oc, "openshift-apiserver") + } + for podName, base := range baseline { + cur, exists := current[podName] + if !exists { + continue + } + o.Expect(cur).To(o.Equal(base), fmt.Sprintf("pod %s restarted during test (before=%d, after=%d)", podName, base, cur)) + } +} + +var _ = g.Describe("[sig-auth][Feature:ProjectAPI] ", func() { + ctx := context.Background() + + defer g.GinkgoRecover() + oc := exutil.NewCLI("project-api") + + g.Describe("TestProjectAuthCacheRaceCondition", func() { + g.It("should not crash or block when listing projects under concurrent cache churn [apigroup:project.openshift.io][apigroup:authorization.openshift.io][apigroup:user.openshift.io]", func() { + cfg := defaultCacheRaceConfig + + g.By(fmt.Sprintf("creating %d test users", cfg.userCount)) + users := make([]cacheRaceUser, 0, cfg.userCount) + for i := range cfg.userCount { + userName := oc.CreateUser(fmt.Sprintf("racetest-%d-", i)).Name + users = append(users, cacheRaceUser{ + name: userName, + config: oc.GetClientConfigForUser(userName), + }) + } + + g.By(fmt.Sprintf("creating %d namespaces", cfg.namespaceCount)) + namespaceNames := createTestNamespaces(ctx, oc, cfg.namespaceCount, cfg.setupConcurrency) + + g.By("recording openshift-apiserver pod restart counts") + baselineRestarts := getPodRestartCounts(ctx, oc, "openshift-apiserver") + + g.By(fmt.Sprintf("starting %d List readers + %d Watch readers", cfg.readersPerUser*cfg.userCount, cfg.userCount)) + readers := startReaders(ctx, users, cfg) + + g.By(fmt.Sprintf("running %d rounds of create/delete churn across %d users × %d namespaces", cfg.churnRounds, cfg.userCount, cfg.namespaceCount)) + deletedForVerifyUser := runChurnRounds(ctx, oc, users, namespaceNames, cfg, readers) + + close(readers.stopCh) + readers.wg.Wait() + + g.By("checking openshift-apiserver pods did not restart") + assertNoRestarts(ctx, oc, baselineRestarts) + + g.By("verifying no List errors occurred") + o.Expect(readers.listErrors.Load()).To(o.BeZero(), fmt.Sprintf("%d Projects().List() errors occurred", readers.listErrors.Load())) + + g.By("verifying no project list regressions occurred during the add phase") + framework.Logf("project count regressions detected: %d", readers.regressionCount.Load()) + o.Expect(readers.regressionCount.Load()).To(o.BeZero(), fmt.Sprintf("%d regressions: users saw fewer projects than previously observed", readers.regressionCount.Load())) + + g.By("verifying no List calls were blocked") + maxLatency := time.Duration(readers.maxLatencyNs.Load()) + framework.Logf("max Projects().List() latency: %s", maxLatency) + o.Expect(maxLatency < cfg.maxListLatency).To(o.BeTrue(), fmt.Sprintf("Projects().List() latency %s exceeded threshold %s", maxLatency, cfg.maxListLatency)) + + g.By("verifying the project list is correct") + expectedNames := sets.New[string](namespaceNames...).Difference(deletedForVerifyUser) + framework.Logf("expecting %d projects visible (%d total - %d deleted)", expectedNames.Len(), len(namespaceNames), deletedForVerifyUser.Len()) + verifyClient := projectv1client.NewForConfigOrDie(users[0].config) + err := wait.PollImmediate(1*time.Second, 2*time.Minute, func() (bool, error) { + projects, listErr := verifyClient.Projects().List(ctx, metav1.ListOptions{}) + if listErr != nil { + return false, listErr + } + visible := sets.New[string]() + for _, p := range projects.Items { + visible.Insert(p.Name) + } + return visible.HasAll(expectedNames.UnsortedList()...), nil + }) + o.Expect(err).NotTo(o.HaveOccurred(), "project list does not contain all expected namespaces after race phase") + }) + }) +}) + +func getPodRestartCounts(ctx context.Context, oc *exutil.CLI, namespace string) map[string]int32 { + pods, err := oc.AdminKubeClient().CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + o.Expect(err).NotTo(o.HaveOccurred(), "failed to list pods in %s", namespace) + counts := make(map[string]int32) + for _, pod := range pods.Items { + var total int32 + for _, status := range pod.Status.ContainerStatuses { + total += status.RestartCount + } + counts[pod.Name] = total + } + return counts +} + +func dumpApiserverCrashEvidence(ctx context.Context, oc *exutil.CLI, namespace string) { + crashSignatures := []string{"fatal error", "concurrent map", "panic:", "runtime error", "goroutine"} + + pods, err := oc.AdminKubeClient().CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{}) + if err != nil { + framework.Logf("failed to list pods in %s: %v", namespace, err) + return + } + + sinceSeconds := int64(600) + for _, pod := range pods.Items { + for _, previous := range []bool{true, false} { + opts := &corev1.PodLogOptions{ + Container: "openshift-apiserver", + Previous: previous, + SinceSeconds: &sinceSeconds, + } + stream, err := oc.AdminKubeClient().CoreV1().Pods(namespace).GetLogs(pod.Name, opts).Stream(ctx) + if err != nil { + continue + } + + var evidence []string + scanner := bufio.NewScanner(stream) + for scanner.Scan() { + line := scanner.Text() + for _, sig := range crashSignatures { + if strings.Contains(strings.ToLower(line), sig) { + evidence = append(evidence, line) + break + } + } + } + stream.Close() + + if len(evidence) > 0 { + label := "current" + if previous { + label = "previous" + } + framework.Logf("=== crash evidence from %s (%s container) ===", pod.Name, label) + for _, line := range evidence { + framework.Logf(" %s", line) + } + framework.Logf("=== end crash evidence ===") + } + } + } +} + func GetScopedClientForUser(oc *exutil.CLI, username string, scopes []string) (*rest.Config, error) { // make sure the user exists user, err := oc.AdminUserClient().UserV1().Users().Get(context.Background(), username, metav1.GetOptions{})