Skip to content

fix: possible race condition where multiple organizations could be created#2162

Merged
wilsonrivera merged 18 commits intomainfrom
wilson/eng-7670-organization-created-twice
Oct 1, 2025
Merged

fix: possible race condition where multiple organizations could be created#2162
wilsonrivera merged 18 commits intomainfrom
wilson/eng-7670-organization-created-twice

Conversation

@wilsonrivera
Copy link
Copy Markdown
Contributor

@wilsonrivera wilsonrivera commented Aug 22, 2025

Summary by CodeRabbit

  • New Features
    • First-time login now runs a guarded, transactional onboarding to avoid duplicate work: creates your workspace, adds you as owner/admin, seeds and imports identity-provider groups, and links the workspace to the provider root group.
    • Default namespace is provisioned automatically so you can start immediately.
    • Successful registration emits a USER_REGISTER_SUCCESS webhook that now includes first and last name.
    • If you already have memberships, onboarding is skipped and normal sign-in continues.
  • Bug Fixes
    • If the onboarding lock cannot be acquired, sign-in responds with a 429 (please slow down).

Checklist

  • I have discussed my proposed changes in an issue and have received approval to proceed.
  • I have followed the coding standards of the project.
  • Tests or benchmarks have been added or updated.
  • Documentation has been updated on https://github.com/wundergraph/cosmo-docs.
  • I have read the Contributors Guide.

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Aug 22, 2025

Walkthrough

Onboarding and membership checks now run inside a single DB transaction guarded by an advisory lock; when no memberships the flow seeds Keycloak, creates the Organization with kcGroupId, imports groups, assigns admin membership, creates a default namespace, and emits a USER_REGISTER_SUCCESS webhook.

Changes

Cohort / File(s) Summary of Changes
Auth onboarding flow
controlplane/src/core/controllers/auth.ts
Consolidated membership check and onboarding into a single transaction using pg_try_advisory_xact_lock(hashtext(userId)). Inside the transaction: check memberships, authenticate with Keycloak, seed root group, create Organization (store kcGroupId), import/create org groups, add user to org/admin group if present, create default namespace, and return an indicator. After the transaction: handle advisory-lock failure (429), emit USER_REGISTER_SUCCESS including user first/last names, and adjust redirect logic based on onboarding outcome.
Organization repository update
controlplane/src/core/repositories/OrganizationRepository.ts
Extended updateOrganization input to accept optional kcGroupId?: string and include kcGroupId in the SQL UPDATE payload alongside name and slug.
Test import formatting
controlplane/test/utils.test.ts
Reformatted import from ../src/core/util.js into a multi-line named import block; no behavioral changes.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Pre-merge checks and finishing touches

✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The title concisely and accurately describes the primary change in this pull request, which is fixing a race condition that could lead to multiple organizations being created; it follows the Conventional Commits style and clearly communicates the intent of the change.
Docstring Coverage ✅ Passed No functions found in the changes. Docstring coverage check skipped.
✨ Finishing touches
  • 📝 Generate Docstrings

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between d0561df and e5971f7.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • controlplane/src/core/controllers/auth.ts
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)

🧪 Early access (Sonnet 4.5): enabled

We are currently testing the Sonnet 4.5 model, which is expected to improve code review quality. However, this model may lead to increased noise levels in the review comments. Please disable the early access features if the noise level causes any inconvenience.

Note:

  • Public repositories are always opted into early access features.
  • You can enable or disable early access features from the CodeRabbit UI or by updating the CodeRabbit configuration file.

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
controlplane/src/core/controllers/auth.ts (1)

259-329: Bug: Creating an organization but returning [] triggers unintended migrate redirect.

Inside runLocking, you may create an org for first-time users, but you still return an empty array. This causes userOrganizations.length === 0 and redirects to ?migrate=true even though an org was created. Choose one path:

  • A) Remove auto-creation entirely (safer for race conditions) and rely on the migrate flow.
  • B) Keep auto-creation but return the fresh memberships to avoid the migrate redirect.

Given the PR objective to prevent duplicate org creation, Option A is recommended.

Here is Option A (remove creation path, keep single-flight around membership fetch):

-        const userOrganizations = await runLocking(userId, async () => {
-          const orgs = await opts.organizationRepository.memberships({
-            userId,
-          });
-
-          if (orgs.length > 0) {
-            return orgs;
-          }
-
-          await opts.keycloakClient.authenticateClient();
-
-          const organizationSlug = uid(8);
-
-          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
-            userID: userId,
-            organizationSlug,
-            realm: opts.keycloakRealm,
-          });
-
-          await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
-            const orgGroupRepo = new OrganizationGroupRepository(tx);
-
-            const insertedOrg = await orgRepo.createOrganization({
-              organizationName: userEmail.split('@')[0],
-              organizationSlug,
-              ownerID: userId,
-              kcGroupId: kcRootGroupId,
-            });
-
-            const orgMember = await orgRepo.addOrganizationMember({
-              organizationID: insertedOrg.id,
-              userID: userId,
-            });
-
-            await orgGroupRepo.importKeycloakGroups({
-              organizationId: insertedOrg.id,
-              kcGroups: kcCreatedGroups,
-            });
-
-            const orgAdminGroup = await orgGroupRepo.byName({
-              organizationId: insertedOrg.id,
-              name: 'admin',
-            });
-
-            if (orgAdminGroup) {
-              await orgGroupRepo.addUserToGroup({
-                organizationMemberId: orgMember.id,
-                groupId: orgAdminGroup.groupId,
-              });
-            }
-
-            const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
-            const ns = await namespaceRepo.create({
-              name: DefaultNamespace,
-              createdBy: userId,
-            });
-            if (!ns) {
-              throw new Error(`Could not create ${DefaultNamespace} namespace`);
-            }
-          });
-
-          opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
-            user_id: userId,
-            user_email: userEmail,
-            user_first_name: firstName,
-            user_last_name: lastName,
-          });
-
-          return [];
-        });
+        const userOrganizations = await runLocking(userId, () =>
+          opts.organizationRepository.memberships({ userId }),
+        );

If you instead prefer Option B (keep creation), return the refreshed memberships:

-          return [];
+          return await opts.organizationRepository.memberships({ userId });

Also applies to: 353-354

🧹 Nitpick comments (7)
controlplane/src/core/util.ts (3)

657-659: Prefer Promise.resolve().then(action) over an async IIFE.

Functionally equivalent but simpler and avoids an extra async frame. It also still converts sync throws into a rejected promise.

-  const promise = (async () => await action())();
+  const promise = Promise.resolve().then(action);

651-671: Clarify single-flight semantics and keying; add brief docs.

This is a per-process, per-key single-flight deduper (not a reentrant mutex or a queue). Consider adding a short JSDoc comment to set caller expectations and to warn about cross-process behavior.

 export function runLocking<TResult>(key: string, action: () => Promise<TResult>): Promise<TResult> {
+  /**
+   * Single-flight: coalesce concurrent calls with the same key into one in-flight Promise.
+   * Notes:
+   * - No queuing: later callers share the same Promise, not serialized execution.
+   * - In-memory: only effective within the same Node.js process.
+   * - The entry is removed on settle (resolve/reject).
+   */

45-46: In-memory locking won’t prevent duplicates across instances; add a durable guard.

If the goal is to prevent duplicate organization creation, this Map won’t help across multiple pods/processes. Add a DB-level idempotency guard (e.g., unique partial index on “default organization per user” or an idempotency key table) and make the creation path use INSERT ... ON CONFLICT. Keep runLocking for intra-process coalescing.

Would you like me to draft a DB schema change (partial unique index + repository upsert) and a small refactor to adopt it?

Also applies to: 651-671

controlplane/test/test-util.ts (1)

987-991: Make expectPending configurable and resilient.

Allow passing a custom timeout and document that fake timers should be advanced by at least that amount to complete. Minor ergonomics improvement.

-export async function expectPending(p: Promise<unknown>) {
-  const timeout = new Promise((resolve) => setTimeout(() => resolve("timeout"), 20));
+export async function expectPending(p: Promise<unknown>, ms = 20) {
+  const timeout = new Promise((resolve) => setTimeout(() => resolve("timeout"), ms));
   const race = await Promise.race([p.then(() => "resolved" as const), timeout]);
   expect(race).toBe("timeout");
 }
controlplane/test/utils.test.ts (2)

1-9: Nit: Consistent import order.

Optional: group local test utilities after util imports to match common style in the codebase.


127-151: Add a test for rejection cleanup semantics.

Ensure that a rejection clears the entry and a subsequent call proceeds independently.

   test('starts fresh after a call completes (entry cleanup)', async () => {
@@
     await p2;
   })
+
+  test('cleans up after rejection and allows a fresh call', async () => {
+    let gate = deferred<void>();
+    const worker = vi.fn(async (shouldFail = false) => {
+      await gate.promise;
+      if (shouldFail) throw new Error('boom');
+      return "ok";
+    });
+
+    // First run rejects
+    const p1 = runLocking("user:err", () => worker(true));
+    gate.resolve();
+    await expect(p1).rejects.toThrow('boom');
+
+    // Second run should start fresh and resolve
+    gate = deferred<void>();
+    const p2 = runLocking("user:err", () => worker(false));
+    gate.resolve();
+    await expect(p2).resolves.toBe("ok");
+    expect(worker).toHaveBeenCalledTimes(2);
+  });
controlplane/src/core/controllers/auth.ts (1)

259-329: Cross-instance race still possible; add an idempotency guard at the DB.

Even with runLocking, two callbacks can land on different pods. If you keep auto-creation (Option B), enforce a durable uniqueness policy (e.g., a “default organization” per user with a partial unique index) and use an upsert to make creation idempotent. Consider also making the USER_REGISTER_SUCCESS webhook idempotent (e.g., an event key).

I can propose a schema change and repository upsert to guarantee idempotency across pods. Want me to draft it?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between fb585be and c988f2a.

📒 Files selected for processing (4)
  • controlplane/src/core/controllers/auth.ts (4 hunks)
  • controlplane/src/core/util.ts (2 hunks)
  • controlplane/test/test-util.ts (1 hunks)
  • controlplane/test/utils.test.ts (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
controlplane/test/utils.test.ts (2)
controlplane/src/core/util.ts (1)
  • runLocking (651-671)
controlplane/test/test-util.ts (2)
  • deferred (979-985)
  • expectPending (987-991)
controlplane/src/core/controllers/auth.ts (1)
controlplane/src/core/util.ts (1)
  • runLocking (651-671)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
🔇 Additional comments (7)
controlplane/src/core/util.ts (1)

651-671: Add a test for rejection cleanup.

Currently tests cover success. Please add a test ensuring that after a rejected action the entry is removed and a subsequent call starts fresh.

I can add this to controlplane/test/utils.test.ts if you want.

controlplane/test/test-util.ts (1)

979-985: LGTM: Handy deferred helper for orchestration.

controlplane/test/utils.test.ts (4)

60-67: LGTM: Properly scoping fake timers to the runLocking suite.


69-93: LGTM: Same-key calls coalesce; verifies reference equality and single invocation.


129-151: LGTM: Confirms cleanup and fresh start on subsequent call.


95-103: Fix invalid TypeScript type for deferred.

ReturnType<typeof deferred> is not valid. Use ReturnType.

-      const worker = vi.fn(async (val: string, gate: ReturnType<typeof deferred<void>>) => {
+      const worker = vi.fn(async (val: string, gate: ReturnType<typeof deferred>) => {
         await gate.promise;
         return val;
       });

Likely an incorrect or invalid review comment.

controlplane/src/core/controllers/auth.ts (1)

21-21: LGTM: Importing runLocking at the controller layer.

Import path looks correct with ESM .js resolution.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (1)
controlplane/src/core/util.ts (1)

651-669: Eliminate re-entrancy window: set the lock before invoking action

There’s a small window where action() is invoked before lockedPromises.set(key, promise) runs. If action (directly or indirectly) re-enters runLocking with the same key, it can start a second execution instead of coalescing. Not common, but easy to guard against.

Apply this diff to create a deferred promise, store it first, then execute the action. This makes the lock visible to re-entrant calls and keeps the existing cleanup/race-safe delete:

-export function runLocking<TResult>(key: string, action: () => Promise<TResult>): Promise<TResult> {
-  const existingPromise = lockedPromises.get(key);
-  if (existingPromise) {
-    return existingPromise as Promise<TResult>;
-  }
-
-  const promise = (async () => await action())();
-  lockedPromises.set(key, promise);
-
-  promise.finally(() => {
-    const other = lockedPromises.get(key);
-    if (other === promise) {
-      // Remove only if we're still pointing at the same promise (avoid race).
-      lockedPromises.delete(key);
-    }
-  });
-
-  return promise;
-}
+export function runLocking<TResult>(key: string, action: () => Promise<TResult>): Promise<TResult> {
+  const existing = lockedPromises.get(key);
+  if (existing) {
+    return existing as Promise<TResult>;
+  }
+
+  // Create a deferred promise and register it before invoking the action
+  let resolve!: (value: TResult | PromiseLike<TResult>) => void;
+  let reject!: (reason?: unknown) => void;
+  const promise = new Promise<TResult>((res, rej) => {
+    resolve = res;
+    reject = rej;
+  });
+  lockedPromises.set(key, promise);
+
+  (async () => {
+    try {
+      const result = await action();
+      resolve(result);
+    } catch (err) {
+      reject(err);
+    } finally {
+      const other = lockedPromises.get(key);
+      if (other === promise) {
+        // Remove only if we're still pointing at the same promise (avoid race).
+        lockedPromises.delete(key);
+      }
+    }
+  })();
+
+  return promise;
+}

Additional nits:

  • Add a brief TSDoc noting that only the first caller’s action executes and that later same-key callers receive its promise; side effects in later callers’ action closures will not run.
  • Consider an optional timeout/TTL for extremely long-running actions to avoid holding the lock indefinitely if a caller disconnects. Happy to sketch this if useful.

I can add a focused unit test for re-entrancy (same key invoked inside action) to ensure only one execution occurs. Want me to open a follow-up?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c988f2a and 8db60b6.

📒 Files selected for processing (1)
  • controlplane/src/core/util.ts (2 hunks)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
🔇 Additional comments (1)
controlplane/src/core/util.ts (1)

45-46: Per-process single-flight map is fine; confirm cross-instance safety and key hygiene

Short summary

  • Good: lockedPromises is module-scoped (no logging of its contents) so the local single-flight/coalescing behavior is fine.
  • Important: that only coalesces within a single Node process — you still need DB-level idempotency if organization creation can be triggered from multiple replicas.

What I verified (from your script output)

  • runLocking call sites found:
    • controlplane/src/core/controllers/auth.ts — runLocking(userId, ...).
    • controlplane/test/utils.test.ts — tests exercising runLocking (multiple test calls).
    • No call sites using runLocking for org creation were found.
  • Schema/indexes found in: controlplane/src/db/schema.ts
    • uniqueIndex('organization_name_idx').on(t.organizationId, t.type, t.name, t.namespaceId)
    • uniqueIndex('organization_integration_idx').on(t.organizationId, t.name)
  • Search for upsert / ON CONFLICT patterns returned no matches (no obvious upsert/ON CONFLICT usage found).

Concrete requests / recommendations

  • Please confirm where organization creation is implemented and whether it uses DB-level uniqueness + upsert/ON CONFLICT handling. If creation can be raced across replicas, enforce idempotency at the DB (unique constraint on slug/name + safe upsert or handle unique-violation retry). File(s) to check: controlplane/src/db/schema.ts and the org-creation code path.
  • Confirm runLocking keys are stable and namespaced (example: org:create:${orgSlug}) and do NOT embed secrets/emails. If sensitive identifiers might be logged, use a hash (e.g. org:create:${sha256(orgSlug)}) instead.
  • If you rely only on the in-process map for org creation, change approach: either add DB constraints/upsert or add a distributed lock.

Places to look (from verification)

  • controlplane/src/core/util.ts — const lockedPromises = new Map<string, Promise>();
  • controlplane/src/core/controllers/auth.ts — found runLocking(userId, ...).
  • controlplane/src/db/schema.ts — found organization_name_idx and organization_integration_idx unique indexes.
  • No upsert / ON CONFLICT matches found in the repo search output.

If you want I can run a targeted search for the exact org-creation function(s) (or for INSERT sites that reference the organization table) and look for error-handling around unique-violation paths.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
controlplane/src/core/controllers/auth.ts (1)

259-334: End-to-end, make onboarding single-execution per user with locking and refreshed state

Putting it all together, here’s a consolidated patch that addresses the race, the transaction scope, and the wrong ID bug, and refreshes state for redirects:

Apply this diff (adjust runLocking signature if needed):

-        const orgs = await opts.organizationRepository.memberships({ userId });
-        if (orgs.length === 0) {
-          const organizationSlug = uid(8);
-
-          // First, we need to create the organization and add the user as an organization member
-          const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId);
-
-            // Create the organization...
-            const inserted = await orgRepo.createOrganization({
-              organizationName: userEmail.split('@')[0],
-              organizationSlug,
-              ownerID: userId,
-            });
-
-            // ...and add the user as an organization member.
-            const orgMember = await orgRepo.addOrganizationMember({
-              organizationID: inserted.id,
-              userID: userId,
-            });
-
-            return [inserted, orgMember];
-          });
-
-          // Finalize the organization setup by seeding the Keycloak group structure
-          await opts.keycloakClient.authenticateClient();
-          await new Promise((resolve) => setTimeout(resolve, 10_000));
-          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
-            userID: userId,
-            organizationSlug,
-            realm: opts.keycloakRealm,
-          });
-
-          await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
-            const orgGroupRepo = new OrganizationGroupRepository(tx);
-
-            await orgRepo.updateOrganization({
-              id: insertedSession.id,
-              kcGroupId: kcRootGroupId,
-            });
-
-            await orgGroupRepo.importKeycloakGroups({
-              organizationId: insertedOrg.id,
-              kcGroups: kcCreatedGroups,
-            });
-
-            const orgAdminGroup = await orgGroupRepo.byName({
-              organizationId: insertedOrg.id,
-              name: 'admin',
-            });
-
-            if (orgAdminGroup) {
-              await orgGroupRepo.addUserToGroup({
-                organizationMemberId: orgMember.id,
-                groupId: orgAdminGroup.groupId,
-              });
-            }
-
-            const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
-            const ns = await namespaceRepo.create({
-              name: DefaultNamespace,
-              createdBy: userId,
-            });
-            if (!ns) {
-              throw new Error(`Could not create ${DefaultNamespace} namespace`);
-            }
-          });
-
-          opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
-            user_id: userId,
-            user_email: userEmail,
-            user_first_name: firstName,
-            user_last_name: lastName,
-          });
-        }
+        let orgs = await opts.organizationRepository.memberships({ userId });
+        await runLocking(opts.db, `auth:onboard:${userId}`, async () => {
+          // Re-check inside the lock to avoid races.
+          const fresh = await opts.organizationRepository.memberships({ userId });
+          if (fresh.length > 0) {
+            orgs = fresh;
+            return;
+          }
+
+          const organizationSlug = uid(8);
+
+          // Create organization and upsert membership atomically.
+          const { insertedOrg, orgMember } = await opts.db.transaction(async (tx) => {
+            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+            const insertedOrg = await orgRepo.createOrganization({
+              organizationName: userEmail.split('@')[0],
+              organizationSlug,
+              ownerID: userId,
+            });
+            const [orgMember] = await tx
+              .insert(organizationsMembers)
+              .values({ userId, organizationId: insertedOrg.id })
+              .onConflictDoUpdate({
+                target: [organizationsMembers.userId, organizationsMembers.organizationId],
+                set: { userId, organizationId: insertedOrg.id },
+              })
+              .returning()
+              .execute();
+            return { insertedOrg, orgMember };
+          });
+
+          // Finalize organization: seed Keycloak groups and persist kcGroupId.
+          await opts.keycloakClient.authenticateClient();
+          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
+            userID: userId,
+            organizationSlug,
+            realm: opts.keycloakRealm,
+          });
+
+          await opts.db.transaction(async (tx) => {
+            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+            const orgGroupRepo = new OrganizationGroupRepository(tx);
+
+            await orgRepo.updateOrganization({ id: insertedOrg.id, kcGroupId: kcRootGroupId });
+            await orgGroupRepo.importKeycloakGroups({ organizationId: insertedOrg.id, kcGroups: kcCreatedGroups });
+
+            const orgAdminGroup = await orgGroupRepo.byName({ organizationId: insertedOrg.id, name: 'admin' });
+            if (orgAdminGroup) {
+              await orgGroupRepo.addUserToGroup({
+                organizationMemberId: orgMember.id,
+                groupId: orgAdminGroup.groupId,
+              });
+            }
+
+            const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
+            const ns = await namespaceRepo.create({ name: DefaultNamespace, createdBy: userId });
+            if (!ns) throw new Error(`Could not create ${DefaultNamespace} namespace`);
+          });
+
+          opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
+            user_id: userId,
+            user_email: userEmail,
+            user_first_name: firstName,
+            user_last_name: lastName,
+          });
+        });
🧹 Nitpick comments (5)
controlplane/src/core/repositories/OrganizationRepository.ts (1)

98-106: Extend updateOrganization to include kcGroupId — OK, but avoid writing undefined fields

The new optional kcGroupId is aligned with schema (uuid, unique). However, the current .set({ name, slug, kcGroupId }) may try to write undefined depending on caller usage. Prefer building the update payload with only defined keys to prevent accidental NULL/constraint issues and noisy diffs.

Apply this refactor:

-  public async updateOrganization(input: { id: string; slug?: string; name?: string; kcGroupId?: string }) {
-    await this.db
-      .update(organizations)
-      .set({
-        name: input.name,
-        slug: input.slug,
-        kcGroupId: input.kcGroupId,
-      })
-      .where(eq(organizations.id, input.id))
-      .execute();
-  }
+  public async updateOrganization(input: { id: string; slug?: string; name?: string; kcGroupId?: string }) {
+    const set: Partial<typeof organizations.$inferInsert> = {};
+    if (input.name !== undefined) set.name = input.name;
+    if (input.slug !== undefined) set.slug = input.slug;
+    if (input.kcGroupId !== undefined) set.kcGroupId = input.kcGroupId;
+
+    await this.db.update(organizations).set(set).where(eq(organizations.id, input.id)).execute();
+  }

Additionally, consider returning the number of affected rows (or the updated row) to enable call sites to detect “no-op” updates due to wrong IDs. I’m calling this out because a caller currently passes the wrong ID (see auth.ts Lines 296-299).

controlplane/src/core/controllers/auth.ts (4)

283-291: Keycloak seeding: 10-second fixed sleep degrades UX and is brittle

A hardcoded await new Promise(...10_000) in the auth callback path increases login latency and still doesn’t guarantee readiness. Prefer polling with a bounded retry/backoff or querying Keycloak until the client/realm is ready.

Proposed approach (pseudocode):

  • Authenticate client.
  • Retry seedGroup up to N times with exponential backoff on 409/404/5xx readiness failures.
  • Fail fast on non-retryable errors.

292-326: Make the post-seed step idempotent and bound to the same onboarding flow

Importing groups and adding the user to the admin group should be safe to run multiple times. Ensure importKeycloakGroups and addUserToGroup are idempotent (onConflictDoNothing, or prior existence checks). From the snippet, import seems safe; verify addUserToGroup behavior.

Would you like me to add an upsert wrapper for group membership to the OrganizationGroupRepository?


358-362: Redirect decision uses stale orgs array after onboarding

After creating an org, orgs still reflects the pre-onboarding state, so the user is redirected with ?migrate=true. Refresh memberships (or set a flag) before the redirect.

Apply this diff:

-        } else if (orgs.length === 0) {
+        // Refresh after potential onboarding to ensure correct UX.
+        orgs = await opts.organizationRepository.memberships({ userId });
+        } else if (orgs.length === 0) {
           res.redirect(opts.webBaseUrl + '?migrate=true');
         } else {
           res.redirect(opts.webBaseUrl);
         }

259-362: Optional: add tests to prove the race is closed

Please add a test simulating two concurrent /callback requests for the same user where both initially see zero memberships. Assert that only one organization is created and the user is a member/admin of exactly one org. I can scaffold this for you.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 8db60b6 and 52a85e9.

📒 Files selected for processing (3)
  • controlplane/src/core/controllers/auth.ts (3 hunks)
  • controlplane/src/core/repositories/OrganizationRepository.ts (1 hunks)
  • controlplane/test/utils.test.ts (1 hunks)
✅ Files skipped from review due to trivial changes (1)
  • controlplane/test/utils.test.ts
🧰 Additional context used
🧬 Code graph analysis (2)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
controlplane/src/db/schema.ts (1)
  • organizations (1192-1215)
controlplane/src/core/controllers/auth.ts (1)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
  • OrganizationRepository (50-1661)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
🔇 Additional comments (2)
controlplane/src/core/controllers/auth.ts (2)

328-334: Webhook emission after onboarding — LGTM

Emitting USER_REGISTER_SUCCESS with basic user details is appropriate here.


259-263: Please verify the locking helper and wrap the TOCTOU-sensitive block

I wasn’t able to locate the definition of runLocking in the codebase—please confirm that:

  • The helper is exported from controlplane/src/core/util.ts (imported as ../util.js)
  • Its signature accepts (db: DBClient, key: string, fn: () => Promise<void>) or similar

Once confirmed, wrap the membership check and onboarding transaction in a lock and re-check inside it to prevent concurrent sign-ups from both seeing zero memberships:

• Before lock:

let orgs = await opts.organizationRepository.memberships({ userId });

• Inside lock:

await runLocking(opts.db, `auth:onboard:${userId}`, async () => {
  const freshOrgs = await opts.organizationRepository.memberships({ userId });
  if (freshOrgs.length > 0) { orgs = freshOrgs; return; }
  const organizationSlug = uid(8);
  // …onboarding logic…
});

• After lock, proceed based on orgs.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
controlplane/src/core/controllers/auth.ts (1)

283-290: Please make seedGroup idempotent to tolerate retries and avoid duplicate groups

The current seedGroup implementation in controlplane/src/core/services/Keycloak.ts always issues a POST to create groups, which will fail (409 or 400) or create duplicates when retried. To ensure safe retries, update seedGroup to follow the GET-then-POST-then-GET pattern:

• GET /admin/realms/{realm}/groups?search={groupName} for each group (root and sub-groups); if found, reuse its id.
• If not found, POST to create the group.
• After a successful POST (201), GET again to retrieve the new id.
• Catch expected 409/400 during POST and fall back to the GET lookup.

This guarantees that calling seedGroup multiple times will always return the existing group IDs without error or duplication.

Locations to update:

  • controlplane/src/core/services/Keycloak.ts → public async seedGroup({ … }) { … }
  • Add helper (e.g. ensureGroupExists(name: string): Promise<string>) to encapsulate this pattern.
♻️ Duplicate comments (2)
controlplane/src/core/controllers/auth.ts (2)

259-361: Critical: onboarding race still possible — wrap the entire “no memberships → create org” flow in runLocking and re-check inside the lock

As written, concurrent /callback requests can both observe 0 memberships and each create an organization. You imported runLocking but didn’t apply it here, so the race remains. Fix by:

  • Re-checking memberships inside the lock (to close the TOCTOU gap).
  • Performing the create+seed+import+assign+namespace sequence within the locked section.
  • Refreshing orgs before redirect, or track a didOnboard flag to drive redirect logic deterministically.

Apply this diff (also converts orgs to let and adds didOnboard; incorporates the tx fix called out separately):

-        const orgs = await opts.organizationRepository.memberships({ userId });
+        let orgs = await opts.organizationRepository.memberships({ userId });
+        let didOnboard = false;
         if (orgs.length === 0) {
-          const organizationSlug = uid(8);
-
-          // First, we need to create the organization and add the user as an organization member
-          const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId);
-
-            // Create the organization...
-            const inserted = await orgRepo.createOrganization({
-              organizationName: userEmail.split('@')[0],
-              organizationSlug,
-              ownerID: userId,
-            });
-
-            // ...and add the user as an organization member.
-            const orgMember = await orgRepo.addOrganizationMember({
-              organizationID: inserted.id,
-              userID: userId,
-            });
-
-            return [inserted, orgMember];
-          });
-
-          // Finalize the organization setup by seeding the Keycloak group structure
-          await opts.keycloakClient.authenticateClient();
-          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
-            userID: userId,
-            organizationSlug,
-            realm: opts.keycloakRealm,
-          });
-
-          await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
-            const orgGroupRepo = new OrganizationGroupRepository(tx);
-
-            await orgRepo.updateOrganization({
-              id: insertedOrg.id,
-              kcGroupId: kcRootGroupId,
-            });
-
-            await orgGroupRepo.importKeycloakGroups({
-              organizationId: insertedOrg.id,
-              kcGroups: kcCreatedGroups,
-            });
-
-            const orgAdminGroup = await orgGroupRepo.byName({
-              organizationId: insertedOrg.id,
-              name: 'admin',
-            });
-
-            if (orgAdminGroup) {
-              await orgGroupRepo.addUserToGroup({
-                organizationMemberId: orgMember.id,
-                groupId: orgAdminGroup.groupId,
-              });
-            }
-
-            const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
-            const ns = await namespaceRepo.create({
-              name: DefaultNamespace,
-              createdBy: userId,
-            });
-            if (!ns) {
-              throw new Error(`Could not create ${DefaultNamespace} namespace`);
-            }
-          });
-
-          opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
-            user_id: userId,
-            user_email: userEmail,
-            user_first_name: firstName,
-            user_last_name: lastName,
-          });
+          await runLocking(async () => {
+            // Re-check inside the lock to avoid TOCTOU
+            const lockedRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId);
+            const orgsLocked = await lockedRepo.memberships({ userId });
+            if (orgsLocked.length > 0) {
+              orgs = orgsLocked;
+              return;
+            }
+
+            const organizationSlug = uid(8);
+
+            // Create org and add member atomically
+            const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
+              const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+              const inserted = await orgRepo.createOrganization({
+                organizationName: userEmail.split('@')[0],
+                organizationSlug,
+                ownerID: userId,
+              });
+              const member = await orgRepo.addOrganizationMember({
+                organizationID: inserted.id,
+                userID: userId,
+              });
+              return [inserted, member] as const;
+            });
+
+            // Seed KC groups (external side-effect) then persist their mapping in one DB tx
+            await opts.keycloakClient.authenticateClient();
+            const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
+              userID: userId,
+              organizationSlug,
+              realm: opts.keycloakRealm,
+            });
+
+            await opts.db.transaction(async (tx) => {
+              const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+              const orgGroupRepo = new OrganizationGroupRepository(tx);
+              await orgRepo.updateOrganization({ id: insertedOrg.id, kcGroupId: kcRootGroupId });
+              await orgGroupRepo.importKeycloakGroups({ organizationId: insertedOrg.id, kcGroups: kcCreatedGroups });
+              const orgAdminGroup = await orgGroupRepo.byName({ organizationId: insertedOrg.id, name: 'admin' });
+              if (orgAdminGroup) {
+                await orgGroupRepo.addUserToGroup({ organizationMemberId: orgMember.id, groupId: orgAdminGroup.groupId });
+              }
+              const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
+              const ns = await namespaceRepo.create({ name: DefaultNamespace, createdBy: userId });
+              if (!ns) throw new Error(`Could not create ${DefaultNamespace} namespace`);
+            });
+
+            opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
+              user_id: userId,
+              user_email: userEmail,
+              user_first_name: firstName,
+              user_last_name: lastName,
+            });
+
+            // Refresh for redirect logic
+            orgs = await new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId).memberships({ userId });
+            didOnboard = true;
+          }, `onboard:${userId}`);
         }
@@
-        } else if (orgs.length === 0) {
+        } else if (orgs.length === 0 && !didOnboard) {
           res.redirect(opts.webBaseUrl + '?migrate=true');
         } else {
           res.redirect(opts.webBaseUrl);
         }

263-281: Operations inside the transaction are executed against opts.db instead of tx — breaks atomicity and amplifies races

Inside the transaction callback you construct OrganizationRepository with opts.db, so both createOrganization and addOrganizationMember run outside the transaction. Instantiate it with tx.

-          const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
-            const orgRepo = new OrganizationRepository(req.log, opts.db, opts.defaultBillingPlanId);
+          const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
+            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
  • Optional: make the member insert idempotent (onConflictDoNothing/Update) to be resilient under retries.
🧹 Nitpick comments (4)
controlplane/src/core/controllers/auth.ts (4)

317-325: Make default namespace creation idempotent

If the default namespace already exists (e.g., due to a retry across processes), create() may error. Consider a find-or-create helper or a unique constraint on (organizationId, name) with onConflictDoNothing.


261-271: Slug generation: consider collision handling and readability

uid(8) is low collision risk but opaque. Consider:

  • Deriving a slug from the email prefix (slugify + short hash suffix).
  • Handling unique constraint violations with a short retry loop.

327-333: Potential duplicate USER_REGISTER_SUCCESS under race/retry

Without the locking change, this webhook could be emitted twice for the same user. Even with locking, network retries can re-send. If your webhook infra isn’t idempotent, add a dedupe key (e.g., userId) or ensure the receiver is idempotent.


259-334: Distributed-safety: in-process locking won’t protect across instances

runLocking likely guards only within a single Node process. If you run multiple instances, also add a DB-level guard, e.g.:

  • Postgres advisory lock keyed by userId during onboarding, or
  • Insert a “user_onboarding” sentinel row on a unique(userId) and only the inserter proceeds, or
  • Atomically set users.onboardedAt via UPDATE … WHERE onboardedAt IS NULL RETURNING to elect a single winner.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4f41451 and cc38fe3.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-01T13:53:54.146Z
Learnt from: wilsonrivera
PR: wundergraph/cosmo#1919
File: controlplane/src/core/repositories/OrganizationGroupRepository.ts:193-224
Timestamp: 2025-07-01T13:53:54.146Z
Learning: In the Cosmo codebase, database transactions are typically managed at the service layer (e.g., in buf services like deleteOrganizationGroup.ts), where repositories are instantiated with the transaction handle and all operations within those repositories are automatically part of the same transaction.

Applied to files:

  • controlplane/src/core/controllers/auth.ts
🧬 Code graph analysis (1)
controlplane/src/core/controllers/auth.ts (1)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
  • OrganizationRepository (50-1661)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Analyze (go)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: build_test
  • GitHub Check: build_push_image
🔇 Additional comments (1)
controlplane/src/core/controllers/auth.ts (1)

295-298: Good fix: updateOrganization now targets the correct organization id

Using insertedOrg.id (rather than a session id) is correct and ensures kcGroupId is persisted on the newly created organization.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
controlplane/src/core/controllers/auth.ts (1)

295-296: Correct target for updateOrganization (uses insertedOrg.id).

This fixes the earlier bug where sessions.id was mistakenly used. Looks good.

🧹 Nitpick comments (3)
controlplane/src/core/controllers/auth.ts (3)

273-279: Nit: avoid shadowing and tuple returns to reduce footguns.

orgMember is declared both inside the transaction and as a variable returned to the outer scope. Prefer distinct naming and return an object to avoid positional mistakes later.

Apply either of the following small changes:

  • Minimal rename:
-            const orgMember = await orgRepo.addOrganizationMember({
+            const insertedMember = await orgRepo.addOrganizationMember({
               organizationID: inserted.id,
               userID: userId,
             });
-
-            return [inserted, orgMember];
+            return [inserted, insertedMember];
  • Or switch to an object (clearer):
-          const [insertedOrg, orgMember] = await opts.db.transaction(async (tx) => {
+          const { insertedOrg, orgMember } = await opts.db.transaction(async (tx) => {
@@
-            const orgMember = await orgRepo.addOrganizationMember({
+            const insertedMember = await orgRepo.addOrganizationMember({
               organizationID: inserted.id,
               userID: userId,
             });
-
-            return [inserted, orgMember];
+            return { insertedOrg: inserted, orgMember: insertedMember };
           });

282-289: Use the persisted slug and consider idempotency guard for seeding.

Seed with the actual saved slug to avoid divergence if the repo sanitizes/adjusts it. Also safe to no-op if kcGroupId is already set.

-          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
+          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
             userID: userId,
-            organizationSlug,
+            organizationSlug: insertedOrg.slug,
             realm: opts.keycloakRealm,
           });

Optionally, short-circuit before seeding:

// After creating the org but before authenticating Keycloak:
if (insertedOrg.kcGroupId) {
  req.log.info({ orgId: insertedOrg.id }, 'KC already seeded, skipping');
  return;
}

258-258: Minor perf: memberships() is heavy for an existence check.

memberships() builds RBAC evaluators and features. For the “do I have any org?” gate, consider a lightweight count/select on organizationsMembers (or a repo method like hasMemberships(userId)) to reduce latency on hot path.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cc38fe3 and c393048.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-01T13:53:54.146Z
Learnt from: wilsonrivera
PR: wundergraph/cosmo#1919
File: controlplane/src/core/repositories/OrganizationGroupRepository.ts:193-224
Timestamp: 2025-07-01T13:53:54.146Z
Learning: In the Cosmo codebase, database transactions are typically managed at the service layer (e.g., in buf services like deleteOrganizationGroup.ts), where repositories are instantiated with the transaction handle and all operations within those repositories are automatically part of the same transaction.

Applied to files:

  • controlplane/src/core/controllers/auth.ts
🧬 Code graph analysis (1)
controlplane/src/core/controllers/auth.ts (1)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
  • OrganizationRepository (50-1661)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
🔇 Additional comments (1)
controlplane/src/core/controllers/auth.ts (1)

262-281: Good: operations use the transaction handle (tx) per repository best practice.

Instantiating OrganizationRepository with tx ensures createOrganization and addOrganizationMember are atomic. This aligns with the service-layer transaction pattern used across the codebase.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (3)
controlplane/src/core/controllers/auth.ts (3)

258-268: Shorten critical section: replace full memberships() with an existence check under the lock

orgRepo.memberships({ userId }) builds RBAC, features, etc., which is heavy and prolongs the transaction + advisory lock. Use a lightweight existence probe first; fetch full memberships only when exiting early. This reduces lock duration and contention.

Apply this diff:

-        const orgs = await opts.db.transaction(async (tx) => {
+        const onboarding = await opts.db.transaction(async (tx) => {
           await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${userId}))`);
 
           const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
 
-          // Retrieve all the organizations the user is a member of
-          const memberships = await orgRepo.memberships({ userId });
-          if (memberships.length > 0) {
-            // The user is already part of at least one organization
-            return memberships;
-          }
+          // Fast existence check to keep the critical section minimal
+          const exists = await tx
+            .select({ one: sql<number>`1`.as('one') })
+            .from(organizationsMembers)
+            .where(eq(organizationsMembers.userId, userId))
+            .limit(1)
+            .execute();
+          if (exists.length > 0) {
+            // Return full memberships only when needed
+            return { memberships: await orgRepo.memberships({ userId }) };
+          }

258-327: Add a concurrency test to prove single-organization creation under parallel callbacks

Recommend an integration test that triggers two concurrent /callback flows for the same user (e.g., mock Keycloak and platform webhooks), asserting:

  • Exactly one organizations row is created.
  • Exactly one organizationsMembers row for (user, org) exists.
  • Webhook emitted once.
  • No deadlocks or long-held transactions.

I can scaffold this test with mocked services and a barrier to synchronize the two requests—say the second starts while the first is mid-flow—to validate the advisory lock + two-phase onboarding.


294-315: Ensure repository methods don’t open nested transactions

Both NamespaceRepository.create (controlplane/src/core/repositories/NamespaceRepository.ts:80–83) and OrganizationGroupRepository.create (controlplane/src/core/repositories/OrganizationGroupRepository.ts:12–17) unconditionally call this.db.transaction(…), which creates nested savepoints when these repos are instantiated with an existing transaction handle (tx). According to our established pattern, transactions should be orchestrated at the service layer, not inside repository methods.

• controlplane/src/core/repositories/NamespaceRepository.ts:80–83
• controlplane/src/core/repositories/OrganizationGroupRepository.ts:12–17

Suggested optional refactor:

  • Detect whether db is already a transaction handle (e.g. via duck-typing or a marker interface) and only call db.transaction when it’s the root database connection.
  • Or remove the internal db.transaction wrappers altogether and require callers to manage transactions explicitly.
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c393048 and af7ebe1.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (2 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-01T13:53:54.146Z
Learnt from: wilsonrivera
PR: wundergraph/cosmo#1919
File: controlplane/src/core/repositories/OrganizationGroupRepository.ts:193-224
Timestamp: 2025-07-01T13:53:54.146Z
Learning: In the Cosmo codebase, database transactions are typically managed at the service layer (e.g., in buf services like deleteOrganizationGroup.ts), where repositories are instantiated with the transaction handle and all operations within those repositories are automatically part of the same transaction.

Applied to files:

  • controlplane/src/core/controllers/auth.ts
🧬 Code graph analysis (1)
controlplane/src/core/controllers/auth.ts (3)
controlplane/src/core/repositories/OrganizationRepository.ts (2)
  • OrganizationRepository (50-1661)
  • memberships (259-338)
controlplane/src/core/repositories/OrganizationGroupRepository.ts (1)
  • OrganizationGroupRepository (9-356)
controlplane/src/core/repositories/NamespaceRepository.ts (2)
  • NamespaceRepository (9-184)
  • DefaultNamespace (7-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_push_image
  • GitHub Check: build_test
🔇 Additional comments (2)
controlplane/src/core/controllers/auth.ts (2)

4-4: LGTM: importing sql for advisory locks

The added sql import is appropriate and used correctly for pg_advisory_xact_lock.


329-337: LGTM: webhook enriched with user names and sent only on first-time onboarding

Conditionally emitting USER_REGISTER_SUCCESS when orgs.length === 0 preserves single emission across concurrent callbacks and adds first/last names. Looks good.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
controlplane/src/core/controllers/auth.ts (1)

275-317: Critical: external I/O (Keycloak) is executed inside an advisory-locked DB transaction — split into two phases.

Holding a transaction + pg_advisory_xact_lock while calling Keycloak (authenticateClient/seedGroup) and performing group import ties DB locks to an external system that cannot be rolled back. Under latency/outage this can starve the pool and increase contention. Serialize only the local DB “create org + membership + default namespace” work under the lock; do Keycloak seeding and org updates after commit in a short, idempotent follow-up tx.

Below is a minimal, localized refactor that:

  • Keeps the race fix intact (per-user advisory lock).
  • Moves Keycloak calls outside the locked transaction.
  • Returns created org context from the locked section so we can finish wiring after commit.
  • Preserves current webhook/redirect semantics by checking onboarding.created.
-        const orgs = await opts.db.transaction(async (tx) => {
+        const onboarding = await opts.db.transaction(async (tx) => {
           await tx.execute(sql`select pg_advisory_xact_lock(hashtext(${userId}))`);
 
           const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
 
           // Check if the user is already a member of at least one organization
           const existingMemberships = await tx
             .select({ one: sql<number>`1`.as('one') })
             .from(organizationsMembers)
             .where(eq(organizationsMembers.userId, userId))
             .limit(1)
             .execute();
 
-          if (existingMemberships.length > 0) {
-            return existingMemberships.length;
-          }
+          if (existingMemberships.length > 0) {
+            return { created: null as const };
+          }
 
-          // Authenticate on Keycloak and create the organization group
-          await opts.keycloakClient.authenticateClient();
-
           const organizationSlug = uid(8);
-          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
-            userID: userId,
-            organizationSlug,
-            realm: opts.keycloakRealm,
-          });
 
-          // Create the new organization and add the user as a member of the organization
-          const insertedOrg = await orgRepo.createOrganization({
+          // Create the new organization and add the user as a member of the organization (DB-only work)
+          const insertedOrg = await orgRepo.createOrganization({
             organizationName: userEmail.split('@')[0],
             organizationSlug,
             ownerID: userId,
-            kcGroupId: kcRootGroupId,
           });
 
           const orgMember = await orgRepo.addOrganizationMember({
             organizationID: insertedOrg.id,
             userID: userId,
           });
 
-          // Create the organization groups
-          const orgGroupRepo = new OrganizationGroupRepository(tx);
-
-          await orgGroupRepo.importKeycloakGroups({
-            organizationId: insertedOrg.id,
-            kcGroups: kcCreatedGroups,
-          });
-
-          const orgAdminGroup = await orgGroupRepo.byName({
-            organizationId: insertedOrg.id,
-            name: 'admin',
-          });
-
-          if (orgAdminGroup) {
-            await orgGroupRepo.addUserToGroup({
-              organizationMemberId: orgMember.id,
-              groupId: orgAdminGroup.groupId,
-            });
-          }
-
           // Create the default namespace for the organization
           const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
           const ns = await namespaceRepo.create({
             name: DefaultNamespace,
             createdBy: userId,
           });
 
           if (!ns) {
             throw new Error(`Could not create ${DefaultNamespace} namespace`);
           }
 
-          // We return an empty even when we just created the organization, that way we can still send the
-          // user registered webhook and prompt the user to migrate
-          return 0;
+          // Return creation payload (downstream sends webhook and migrate redirect as before)
+          return {
+            created: {
+              organizationId: insertedOrg.id,
+              organizationSlug,
+              orgMemberId: orgMember.id,
+            },
+          };
         });
 
-        if (orgs === 0) {
+        // Outside the critical transaction: perform Keycloak seeding and group wiring idempotently
+        if (onboarding.created) {
+          await opts.keycloakClient.authenticateClient();
+          const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
+            userID: userId,
+            organizationSlug: onboarding.created.organizationSlug,
+            realm: opts.keycloakRealm,
+          });
+
+          await opts.db.transaction(async (tx) => {
+            const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+            const orgGroupRepo = new OrganizationGroupRepository(tx);
+
+            await orgRepo.updateOrganization({
+              id: onboarding.created.organizationId,
+              kcGroupId: kcRootGroupId,
+            });
+
+            await orgGroupRepo.importKeycloakGroups({
+              organizationId: onboarding.created.organizationId,
+              kcGroups: kcCreatedGroups,
+            });
+
+            const orgAdminGroup = await orgGroupRepo.byName({
+              organizationId: onboarding.created.organizationId,
+              name: 'admin',
+            });
+            if (orgAdminGroup) {
+              await orgGroupRepo.addUserToGroup({
+                organizationMemberId: onboarding.created.orgMemberId,
+                groupId: orgAdminGroup.groupId,
+              });
+            }
+          });
+        }
+
+        if (onboarding.created) {
           // Send a notification to the platform that a new user has been created
           opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
             user_id: userId,
             user_email: userEmail,
             user_first_name: firstName,
             user_last_name: lastName,
           });
         }
@@
-        } else if (orgs === 0) {
+        } else if (onboarding.created) {
           res.redirect(opts.webBaseUrl + '?migrate=true');

Optional hardening: wrap the post-tx Keycloak block in try/catch and log failures; consider a background repair job if updating kcGroupId/importing groups fails after org creation.

Do you want me to open a follow-up PR with this refactor and an integration test that fires two concurrent callbacks to prove single org creation?

Also applies to: 329-332, 334-335, 366-366

🧹 Nitpick comments (3)
controlplane/src/core/controllers/auth.ts (3)

271-273: Nit: numeric sentinel is obscure — consider a clearer shape.

Returning existingMemberships.length vs 0 makes orgs a numeric sentinel. For readability, prefer returning a boolean (created or not) or a small object so downstream code reads intent clearly. If you keep the current shape, at least rename orgs to membershipCount.

-        const orgs = await opts.db.transaction(async (tx) => {
+        const membershipCount = await opts.db.transaction(async (tx) => {
@@
-          if (existingMemberships.length > 0) {
-            return existingMemberships.length;
-          }
+          if (existingMemberships.length > 0) return existingMemberships.length;
@@
-        if (orgs === 0) {
+        if (membershipCount === 0) {
@@
-        } else if (orgs === 0) {
+        } else if (membershipCount === 0) {

318-327: Avoid nested transactions in NamespaceRepository.create when already inside a transaction.

You instantiate NamespaceRepository with tx (good), but create() starts its own transaction internally, resulting in a nested tx. While Postgres subtransactions are fine, it’s unnecessary and prolongs the advisory-locked scope. Prefer a version of create that operates on the provided tx without starting a new one.

Using the learning from PR #1919 (transactions centralized at service layer), consider refactoring NamespaceRepository.create:

// repositories/NamespaceRepository.ts
export class NamespaceRepository {
  constructor(private db: PostgresJsDatabase<typeof schema>, private organizationId: string) {}

  // New: operate within an existing tx (no new transaction)
  public async createInTx(tx: PostgresJsDatabase<typeof schema>, data: { name: string; createdBy: string }) {
    const ns = await tx
      .insert(schema.namespaces)
      .values({ name: data.name, organizationId: this.organizationId, createdBy: data.createdBy })
      .returning();

    if (ns.length === 0) return;
    await tx.insert(schema.namespaceConfig).values({ namespaceId: ns[0].id });
    return ns[0];
  }
}

Then call createInTx(tx, ...) here instead of create(...).


329-332: Stale comment: it says “return an empty” but returns 0.

Update the comment to avoid confusion for future maintainers.

-          // We return an empty even when we just created the organization, that way we can still send the
-          // user registered webhook and prompt the user to migrate
-          return 0;
+          // Return a sentinel that indicates onboarding occurred (0 interpreted as "created")
+          return 0;
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between af7ebe1 and 1e29919.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (3 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-07-01T13:53:54.146Z
Learnt from: wilsonrivera
PR: wundergraph/cosmo#1919
File: controlplane/src/core/repositories/OrganizationGroupRepository.ts:193-224
Timestamp: 2025-07-01T13:53:54.146Z
Learning: In the Cosmo codebase, database transactions are typically managed at the service layer (e.g., in buf services like deleteOrganizationGroup.ts), where repositories are instantiated with the transaction handle and all operations within those repositories are automatically part of the same transaction.

Applied to files:

  • controlplane/src/core/controllers/auth.ts
🧬 Code graph analysis (1)
controlplane/src/core/controllers/auth.ts (4)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
  • OrganizationRepository (50-1661)
controlplane/src/db/schema.ts (1)
  • organizationsMembers (1318-1339)
controlplane/src/core/repositories/OrganizationGroupRepository.ts (1)
  • OrganizationGroupRepository (9-356)
controlplane/src/core/repositories/NamespaceRepository.ts (2)
  • NamespaceRepository (9-184)
  • DefaultNamespace (7-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (2)
  • GitHub Check: build_test
  • GitHub Check: Analyze (go)
🔇 Additional comments (2)
controlplane/src/core/controllers/auth.ts (2)

4-4: LGTM: importing sql alongside eq is correct and used below for advisory lock and existence checks.


258-269: Good use of per-user advisory lock to serialize onboarding.

Taking pg_advisory_xact_lock(hashtext(userId)) inside the transaction is a sound cross-instance guard against the “create two orgs” race. The fast existence probe via select 1 is also efficient.

@github-actions
Copy link
Copy Markdown

This PR was marked stale due to lack of activity. It will be closed in 14 days.

@github-actions github-actions bot added the Stale label Sep 25, 2025
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
controlplane/src/core/controllers/auth.ts (1)

282-290: Critical: Keycloak operations still inside advisory-locked transaction (unresolved from previous review).

External Keycloak API calls (authenticateClient() and seedGroup()) remain inside the advisory-locked DB transaction, despite being flagged as addressed in previous reviews. This creates severe operational risks:

  1. Lock contention: Keycloak latency directly extends advisory lock hold time, blocking concurrent onboarding attempts for the same user unnecessarily.
  2. Connection pool starvation: Long-running transactions under Keycloak outages can exhaust the DB connection pool.
  3. Atomicity mismatch: Keycloak operations cannot be rolled back if the DB transaction fails, creating inconsistent state.
  4. Cascading failures: Keycloak downtime can cascade into DB unavailability.

Required fix: Split into two phases:

  • Phase 1 (inside tx + advisory lock): Check membership, create org + member, create namespace, return sentinel indicating creation.
  • Phase 2 (outside tx): Perform Keycloak operations, then in a separate short transaction update kcGroupId, import groups, add admin membership.

Apply this refactor to move Keycloak operations outside the locked transaction:

 const orgs = await opts.db.transaction(async (tx) => {
   const advisoryLockRows = await tx.execute(
     sql`select pg_try_advisory_xact_lock(hashtext(${userId})) as acquired`
   );

   if (!advisoryLockRows?.[0]?.acquired) {
-    // We need to identify when we failed to acquire the lock because another request already acquired it
     return -1;
   }

   const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);

-  // Check if the user is already a member of at least one organization
   const existingMemberships = await tx
     .select({ one: sql<number>`1`.as('one') })
     .from(organizationsMembers)
     .where(eq(organizationsMembers.userId, userId))
     .limit(1)
     .execute();

   if (existingMemberships.length > 0) {
     return existingMemberships.length;
   }

-  // Authenticate on Keycloak and create the organization group
-  await opts.keycloakClient.authenticateClient();
-
-  const organizationSlug = uid(8);
-  const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
-    userID: userId,
-    organizationSlug,
-    realm: opts.keycloakRealm,
-  });
-
-  // Create the new organization and add the user as a member of the organization
+  const organizationSlug = uid(8);
   const insertedOrg = await orgRepo.createOrganization({
     organizationName: userEmail.split('@')[0],
     organizationSlug,
     ownerID: userId,
-    kcGroupId: kcRootGroupId,
   });

   const orgMember = await orgRepo.addOrganizationMember({
     organizationID: insertedOrg.id,
     userID: userId,
   });

-  // Create the organization groups
-  const orgGroupRepo = new OrganizationGroupRepository(tx);
-
-  await orgGroupRepo.importKeycloakGroups({
-    organizationId: insertedOrg.id,
-    kcGroups: kcCreatedGroups,
-  });
-
-  const orgAdminGroup = await orgGroupRepo.byName({
-    organizationId: insertedOrg.id,
-    name: 'admin',
-  });
-
-  if (orgAdminGroup) {
-    await orgGroupRepo.addUserToGroup({
-      organizationMemberId: orgMember.id,
-      groupId: orgAdminGroup.groupId,
-    });
-  }
-
   // Create the default namespace for the organization
   const namespaceRepo = new NamespaceRepository(tx, insertedOrg.id);
   const ns = await namespaceRepo.create({
     name: DefaultNamespace,
     createdBy: userId,
   });

   if (!ns) {
     throw new Error(`Could not create ${DefaultNamespace} namespace`);
   }

-  // We return an empty even when we just created the organization, that way we can still send the
-  // user registered webhook and prompt the user to migrate
-  return 0;
+  // Return payload indicating org was created
+  return { created: { orgId: insertedOrg.id, orgMemberId: orgMember.id, organizationSlug } };
 });

 if (orgs === -1) {
-  // We failed to acquire the lock, so we need to retry the request
   await res.code(429).send('Slow down');
   return;
 }

-if (orgs === 0) {
+// Phase 2: Keycloak operations outside the locked transaction
+if (typeof orgs === 'object' && orgs.created) {
+  await opts.keycloakClient.authenticateClient();
+  const [kcRootGroupId, kcCreatedGroups] = await opts.keycloakClient.seedGroup({
+    userID: userId,
+    organizationSlug: orgs.created.organizationSlug,
+    realm: opts.keycloakRealm,
+  });
+
+  // Short follow-up transaction to wire up Keycloak data
+  await opts.db.transaction(async (tx) => {
+    const orgRepo = new OrganizationRepository(req.log, tx, opts.defaultBillingPlanId);
+    const orgGroupRepo = new OrganizationGroupRepository(tx);
+
+    await orgRepo.updateOrganization({
+      id: orgs.created.orgId,
+      kcGroupId: kcRootGroupId,
+    });
+
+    await orgGroupRepo.importKeycloakGroups({
+      organizationId: orgs.created.orgId,
+      kcGroups: kcCreatedGroups,
+    });
+
+    const orgAdminGroup = await orgGroupRepo.byName({
+      organizationId: orgs.created.orgId,
+      name: 'admin',
+    });
+
+    if (orgAdminGroup) {
+      await orgGroupRepo.addUserToGroup({
+        organizationMemberId: orgs.created.orgMemberId,
+        groupId: orgAdminGroup.groupId,
+      });
+    }
+  });
+
   // Send a notification to the platform that a new user has been created
   opts.platformWebhooks.send(PlatformEventName.USER_REGISTER_SUCCESS, {
     user_id: userId,
     user_email: userEmail,
     user_first_name: firstName,
     user_last_name: lastName,
   });
 }
 
 // ... JWT creation and session cookie logic ...
 
 if (redirectURL) {
   if (redirectURL.startsWith(opts.webBaseUrl)) {
     res.redirect(redirectURL);
   } else {
     res.redirect(opts.webBaseUrl);
   }
-} else if (orgs === 0) {
+} else if (typeof orgs === 'object' && orgs.created) {
   res.redirect(opts.webBaseUrl + '?migrate=true');
 } else {
   res.redirect(opts.webBaseUrl);
 }

Optional hardening: Wrap the Phase 2 Keycloak block in try/catch, log failures, and consider a deferred repair job for eventual consistency if Keycloak operations fail after org creation.

🧹 Nitpick comments (1)
controlplane/src/core/controllers/auth.ts (1)

341-345: Improve lock failure response format.

The 429 response is appropriate for lock contention, but the plain string body should be a structured JSON response for better client handling.

Apply this improvement:

 if (orgs === -1) {
-  // We failed to acquire the lock, so we need to retry the request
-  await res.code(429).send('Slow down');
+  return res.code(429).send({ 
+    error: 'Another onboarding request is in progress for this user. Please retry.',
+    code: 'ONBOARDING_IN_PROGRESS'
+  });
-  return;
 }
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1e29919 and d0561df.

📒 Files selected for processing (1)
  • controlplane/src/core/controllers/auth.ts (3 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
controlplane/src/core/controllers/auth.ts (4)
controlplane/src/core/repositories/OrganizationRepository.ts (1)
  • OrganizationRepository (50-1661)
controlplane/src/db/schema.ts (1)
  • organizationsMembers (1387-1408)
controlplane/src/core/repositories/OrganizationGroupRepository.ts (1)
  • OrganizationGroupRepository (9-356)
controlplane/src/core/repositories/NamespaceRepository.ts (2)
  • NamespaceRepository (9-184)
  • DefaultNamespace (7-7)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: Analyze (javascript-typescript)
  • GitHub Check: Analyze (go)
  • GitHub Check: build_push_image
🔇 Additional comments (3)
controlplane/src/core/controllers/auth.ts (3)

4-4: LGTM: Import addition is correct.

The sql import is needed for the advisory lock and inline SQL queries introduced in this PR.


258-266: Advisory lock pattern is correct, but see critical issue below.

The use of pg_try_advisory_xact_lock(hashtext(userId)) correctly prevents concurrent onboarding for the same user. The lock is automatically released when the transaction commits or rolls back.

However, note the critical issue flagged below regarding Keycloak operations inside this locked transaction.


270-280: LGTM: Efficient membership check inside the lock.

The lightweight query avoids the heavy memberships() method and correctly checks for existing memberships inside the advisory lock, preventing TOCTOU races.

@github-actions github-actions bot removed the Stale label Oct 1, 2025
@wilsonrivera wilsonrivera merged commit 32483b8 into main Oct 1, 2025
9 checks passed
@wilsonrivera wilsonrivera deleted the wilson/eng-7670-organization-created-twice branch October 1, 2025 19:55
@coderabbitai coderabbitai bot mentioned this pull request Nov 27, 2025
5 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants