diff --git a/Cargo.lock b/Cargo.lock index b0a6945..f1894ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1886,6 +1886,7 @@ dependencies = [ "futures", "handlebars", "prometheus", + "rand 0.8.5", "redis", "reqwest", "serde", diff --git a/services/api/Cargo.toml b/services/api/Cargo.toml index 8a9771b..dd7a2db 100644 --- a/services/api/Cargo.toml +++ b/services/api/Cargo.toml @@ -11,6 +11,7 @@ chrono = { version = "0.4", features = ["serde"] } futures = "0.3" handlebars = "5.1" prometheus = "0.13" +rand = "0.8" redis = { version = "0.25", features = ["tokio-comp", "connection-manager", "streams"] } reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls"] } serde = { version = "1", features = ["derive"] } diff --git a/services/api/database/migrations/009_enhance_waitlist_entries.sql b/services/api/database/migrations/009_enhance_waitlist_entries.sql new file mode 100644 index 0000000..c75aea8 --- /dev/null +++ b/services/api/database/migrations/009_enhance_waitlist_entries.sql @@ -0,0 +1,32 @@ +-- Add additional fields to waitlist_entries table for enhanced functionality +ALTER TABLE waitlist_entries +ADD COLUMN IF NOT EXISTS name VARCHAR(255), +ADD COLUMN IF NOT EXISTS role VARCHAR(50), +ADD COLUMN IF NOT EXISTS referral_code VARCHAR(50) UNIQUE, +ADD COLUMN IF NOT EXISTS referred_by_code VARCHAR(50), +ADD COLUMN IF NOT EXISTS position INTEGER, +ADD COLUMN IF NOT EXISTS invited_at TIMESTAMPTZ, +ADD COLUMN IF NOT EXISTS invitation_accepted_at TIMESTAMPTZ; + +-- Create index for referral tracking +CREATE INDEX IF NOT EXISTS idx_waitlist_entries_referral_code +ON waitlist_entries (referral_code); + +CREATE INDEX IF NOT EXISTS idx_waitlist_entries_referred_by +ON waitlist_entries (referred_by_code); + +CREATE INDEX IF NOT EXISTS idx_waitlist_entries_position +ON waitlist_entries (position); + +-- Create referral stats table +CREATE TABLE IF NOT EXISTS waitlist_referrals ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + referrer_code VARCHAR(50) NOT NULL, + referral_count INTEGER NOT NULL DEFAULT 0, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + UNIQUE(referrer_code) +); + +CREATE INDEX IF NOT EXISTS idx_waitlist_referrals_code +ON waitlist_referrals (referrer_code); diff --git a/services/api/src/db.rs b/services/api/src/db.rs index 9dfb37b..408ba0b 100644 --- a/services/api/src/db.rs +++ b/services/api/src/db.rs @@ -564,3 +564,257 @@ impl Database { Ok(analytics) } } + + // Waitlist management + pub async fn waitlist_get_by_email( + &self, + normalized_email: &str, + ) -> anyhow::Result> { + let row = sqlx::query( + "SELECT id, email, name, role, status, source, referral_code, referred_by_code, + position, priority_score, joined_at, invited_at, invitation_accepted_at, + converted_at, created_at, updated_at + FROM waitlist_entries + WHERE email = $1", + ) + .bind(normalized_email) + .fetch_optional(&self.pool) + .await?; + + if let Some(row) = row { + return Ok(Some(crate::waitlist::WaitlistEntry { + id: row.try_get("id")?, + email: row.try_get("email")?, + name: row.try_get("name")?, + role: row.try_get("role")?, + status: row.try_get("status")?, + source: row.try_get("source")?, + referral_code: row.try_get("referral_code")?, + referred_by_code: row.try_get("referred_by_code")?, + position: row.try_get("position")?, + priority_score: row.try_get("priority_score")?, + joined_at: row.try_get("joined_at")?, + invited_at: row.try_get("invited_at")?, + invitation_accepted_at: row.try_get("invitation_accepted_at")?, + converted_at: row.try_get("converted_at")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + })); + } + + Ok(None) + } + + pub async fn waitlist_create_entry( + &self, + email: &str, + name: Option<&str>, + role: Option<&str>, + source: Option<&str>, + referral_code: &str, + referred_by_code: Option<&str>, + ) -> anyhow::Result { + // Get current max position + let position_row = sqlx::query( + "SELECT COALESCE(MAX(position), 0) + 1 as next_position FROM waitlist_entries" + ) + .fetch_one(&self.pool) + .await?; + let position: i32 = position_row.try_get("next_position")?; + + // Calculate priority score based on referral + let mut priority_score = 0; + if let Some(ref_code) = referred_by_code { + // Increment referral count for referrer + let _ = sqlx::query( + "INSERT INTO waitlist_referrals (referrer_code, referral_count) + VALUES ($1, 1) + ON CONFLICT (referrer_code) DO UPDATE SET + referral_count = waitlist_referrals.referral_count + 1, + updated_at = NOW()" + ) + .bind(ref_code) + .execute(&self.pool) + .await; + + priority_score = 10; // Bonus for being referred + } + + let row = sqlx::query( + "INSERT INTO waitlist_entries + (email, name, role, status, source, referral_code, referred_by_code, position, priority_score) + VALUES ($1, $2, $3, 'pending', $4, $5, $6, $7, $8) + RETURNING id, email, name, role, status, source, referral_code, referred_by_code, + position, priority_score, joined_at, invited_at, invitation_accepted_at, + converted_at, created_at, updated_at", + ) + .bind(email) + .bind(name) + .bind(role) + .bind(source) + .bind(referral_code) + .bind(referred_by_code) + .bind(position) + .bind(priority_score) + .fetch_one(&self.pool) + .await?; + + Ok(crate::waitlist::WaitlistEntry { + id: row.try_get("id")?, + email: row.try_get("email")?, + name: row.try_get("name")?, + role: row.try_get("role")?, + status: row.try_get("status")?, + source: row.try_get("source")?, + referral_code: row.try_get("referral_code")?, + referred_by_code: row.try_get("referred_by_code")?, + position: row.try_get("position")?, + priority_score: row.try_get("priority_score")?, + joined_at: row.try_get("joined_at")?, + invited_at: row.try_get("invited_at")?, + invitation_accepted_at: row.try_get("invitation_accepted_at")?, + converted_at: row.try_get("converted_at")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) + } + + pub async fn waitlist_get_stats(&self) -> anyhow::Result { + let row = sqlx::query( + "SELECT + COUNT(*)::BIGINT as total_entries, + COUNT(*) FILTER (WHERE status = 'pending')::BIGINT as pending_entries, + COUNT(*) FILTER (WHERE status = 'invited')::BIGINT as invited_entries, + COUNT(*) FILTER (WHERE invitation_accepted_at IS NOT NULL)::BIGINT as accepted_entries, + (SELECT COALESCE(SUM(referral_count), 0)::BIGINT FROM waitlist_referrals) as total_referrals + FROM waitlist_entries" + ) + .fetch_one(&self.pool) + .await?; + + Ok(crate::waitlist::WaitlistStats { + total_entries: row.try_get("total_entries")?, + pending_entries: row.try_get("pending_entries")?, + invited_entries: row.try_get("invited_entries")?, + accepted_entries: row.try_get("accepted_entries")?, + total_referrals: row.try_get("total_referrals")?, + }) + } + + pub async fn waitlist_get_all_for_export(&self) -> anyhow::Result> { + let rows = sqlx::query( + "SELECT + w.email, w.name, w.role, w.status, w.position, w.referral_code, + w.joined_at, w.invited_at, w.invitation_accepted_at, + COALESCE(r.referral_count, 0) as referral_count + FROM waitlist_entries w + LEFT JOIN waitlist_referrals r ON w.referral_code = r.referrer_code + ORDER BY w.position ASC" + ) + .fetch_all(&self.pool) + .await?; + + let mut entries = Vec::new(); + for row in rows { + entries.push(crate::waitlist::WaitlistExportEntry { + email: row.try_get("email")?, + name: row.try_get("name")?, + role: row.try_get("role")?, + status: row.try_get("status")?, + position: row.try_get("position")?, + referral_code: row.try_get("referral_code")?, + referral_count: row.try_get("referral_count")?, + joined_at: row.try_get("joined_at")?, + invited_at: row.try_get("invited_at")?, + invitation_accepted_at: row.try_get("invitation_accepted_at")?, + }); + } + + Ok(entries) + } + + pub async fn waitlist_invite_by_positions(&self, positions: Vec) -> anyhow::Result { + let result = sqlx::query( + "UPDATE waitlist_entries + SET status = 'invited', invited_at = NOW(), updated_at = NOW() + WHERE position = ANY($1) AND status = 'pending' + RETURNING id" + ) + .bind(&positions) + .fetch_all(&self.pool) + .await?; + + Ok(result.len() as i32) + } + + pub async fn waitlist_invite_top_n(&self, count: i32) -> anyhow::Result> { + let rows = sqlx::query( + "UPDATE waitlist_entries + SET status = 'invited', invited_at = NOW(), updated_at = NOW() + WHERE id IN ( + SELECT id FROM waitlist_entries + WHERE status = 'pending' + ORDER BY priority_score DESC, position ASC + LIMIT $1 + ) + RETURNING id, email, name, role, status, source, referral_code, referred_by_code, + position, priority_score, joined_at, invited_at, invitation_accepted_at, + converted_at, created_at, updated_at" + ) + .bind(count) + .fetch_all(&self.pool) + .await?; + + let mut entries = Vec::new(); + for row in rows { + entries.push(crate::waitlist::WaitlistEntry { + id: row.try_get("id")?, + email: row.try_get("email")?, + name: row.try_get("name")?, + role: row.try_get("role")?, + status: row.try_get("status")?, + source: row.try_get("source")?, + referral_code: row.try_get("referral_code")?, + referred_by_code: row.try_get("referred_by_code")?, + position: row.try_get("position")?, + priority_score: row.try_get("priority_score")?, + joined_at: row.try_get("joined_at")?, + invited_at: row.try_get("invited_at")?, + invitation_accepted_at: row.try_get("invitation_accepted_at")?, + converted_at: row.try_get("converted_at")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }); + } + + Ok(entries) + } + + pub async fn waitlist_mark_invitation_accepted(&self, email: &str) -> anyhow::Result { + let result = sqlx::query( + "UPDATE waitlist_entries + SET invitation_accepted_at = NOW(), updated_at = NOW() + WHERE email = $1 AND status = 'invited' AND invitation_accepted_at IS NULL" + ) + .bind(email) + .execute(&self.pool) + .await?; + + Ok(result.rows_affected() > 0) + } + + pub async fn waitlist_get_referral_count(&self, referral_code: &str) -> anyhow::Result { + let row = sqlx::query( + "SELECT COALESCE(referral_count, 0) as count FROM waitlist_referrals WHERE referrer_code = $1" + ) + .bind(referral_code) + .fetch_optional(&self.pool) + .await?; + + if let Some(row) = row { + Ok(row.try_get("count")?) + } else { + Ok(0) + } + } +} diff --git a/services/api/src/handlers.rs b/services/api/src/handlers.rs index 1e49783..b332905 100644 --- a/services/api/src/handlers.rs +++ b/services/api/src/handlers.rs @@ -736,3 +736,359 @@ pub async fn sendgrid_webhook( ) -> Result { sendgrid_webhook_handler(State(Arc::new(state.webhook_handler.clone())), Json(events)).await } + +// Waitlist handlers + +pub async fn waitlist_join( + State(state): State>, + headers: HeaderMap, + Json(payload): Json, +) -> Result { + let ip = client_ip(&headers); + let allowed = state + .newsletter_rate_limiter + .allow(&ip, 5, Duration::from_secs(15 * 60)) + .await; + + if !allowed { + return Ok(( + StatusCode::TOO_MANY_REQUESTS, + Json(crate::waitlist::WaitlistJoinResponse { + success: false, + position: 0, + referral_code: String::new(), + message: "Too many requests, please try again later.".to_string(), + }), + )); + } + + let email = match normalized_email(&payload.email) { + Some(value) => value, + None => { + return Ok(( + StatusCode::BAD_REQUEST, + Json(crate::waitlist::WaitlistJoinResponse { + success: false, + position: 0, + referral_code: String::new(), + message: "Invalid email address.".to_string(), + }), + )); + } + }; + + if is_disposable_email(&email) { + return Ok(( + StatusCode::BAD_REQUEST, + Json(crate::waitlist::WaitlistJoinResponse { + success: false, + position: 0, + referral_code: String::new(), + message: "Disposable emails are not allowed.".to_string(), + }), + )); + } + + // Check if already on waitlist + if let Some(existing) = state + .db + .waitlist_get_by_email(&email) + .await + .map_err(into_api_error)? + { + return Ok(( + StatusCode::CONFLICT, + Json(crate::waitlist::WaitlistJoinResponse { + success: false, + position: existing.position, + referral_code: existing.referral_code, + message: "Email already on waitlist.".to_string(), + }), + )); + } + + // Validate role if provided + let role = payload.user_role.as_deref().and_then(|r| { + let normalized = r.trim().to_lowercase(); + match normalized.as_str() { + "trader" | "developer" | "institution" => Some(normalized), + _ => None, + } + }); + + // Validate referral code if provided + let referred_by = if let Some(ref_code) = payload.referral_code.as_deref() { + let code = ref_code.trim().to_uppercase(); + if !code.is_empty() { + // Check if referral code exists + let count = state + .db + .waitlist_get_referral_count(&code) + .await + .map_err(into_api_error)?; + if count >= 0 { + Some(code) + } else { + None + } + } else { + None + } + } else { + None + }; + + // Generate unique referral code + let mut referral_code = crate::waitlist::generate_referral_code(); + let mut attempts = 0; + while attempts < 10 { + if state + .db + .waitlist_get_referral_count(&referral_code) + .await + .map_err(into_api_error)? + == 0 + { + break; + } + referral_code = crate::waitlist::generate_referral_code(); + attempts += 1; + } + + let name = payload.name.as_deref().map(|n| { + n.trim() + .chars() + .take(255) + .collect::() + }); + + let entry = state + .db + .waitlist_create_entry( + &email, + name.as_deref(), + role.as_deref(), + Some("direct"), + &referral_code, + referred_by.as_deref(), + ) + .await + .map_err(into_api_error)?; + + // Queue confirmation email + let template_data = serde_json::json!({ + "email": email, + "position": entry.position, + "referral_code": entry.referral_code, + "referral_url": format!("{}/waitlist?ref={}", state.config.base_url.trim_end_matches('/'), entry.referral_code) + }); + + state + .email_queue + .enqueue( + crate::email::types::EmailJobType::WaitlistConfirmation, + &email, + "waitlist_confirmation", + template_data, + 0, + ) + .await + .map_err(into_api_error)?; + + tracing::info!( + "[waitlist] new entry email={} position={} referral_code={} ip={}", + email, + entry.position, + entry.referral_code, + ip + ); + + Ok(( + StatusCode::OK, + Json(crate::waitlist::WaitlistJoinResponse { + success: true, + position: entry.position, + referral_code: entry.referral_code, + message: "Successfully joined the waitlist!".to_string(), + }), + )) +} + +pub async fn waitlist_stats( + State(state): State>, +) -> Result { + let stats = state + .db + .waitlist_get_stats() + .await + .map_err(into_api_error)?; + + Ok((StatusCode::OK, Json(stats))) +} + +pub async fn waitlist_export( + State(state): State>, +) -> Result { + let entries = state + .db + .waitlist_get_all_for_export() + .await + .map_err(into_api_error)?; + + // Convert to CSV + let mut csv = String::from("email,name,role,status,position,referral_code,referral_count,joined_at,invited_at,invitation_accepted_at\n"); + + for entry in entries { + csv.push_str(&format!( + "{},{},{},{},{},{},{},{},{},{}\n", + entry.email, + entry.name.unwrap_or_default(), + entry.role.unwrap_or_default(), + entry.status, + entry.position, + entry.referral_code, + entry.referral_count, + entry.joined_at.to_rfc3339(), + entry.invited_at.map(|d| d.to_rfc3339()).unwrap_or_default(), + entry.invitation_accepted_at.map(|d| d.to_rfc3339()).unwrap_or_default() + )); + } + + Ok(( + StatusCode::OK, + [(axum::http::header::CONTENT_TYPE, "text/csv")], + csv, + )) +} + +pub async fn waitlist_batch_invite( + State(state): State>, + Json(payload): Json, +) -> Result { + let invited_entries = if let Some(positions) = payload.positions { + if positions.is_empty() { + return Ok(( + StatusCode::BAD_REQUEST, + Json(crate::waitlist::BatchInviteResponse { + success: false, + invited_count: 0, + message: "No positions provided.".to_string(), + }), + )); + } + + let count = state + .db + .waitlist_invite_by_positions(positions) + .await + .map_err(into_api_error)?; + + Vec::new() // We don't need the entries for position-based invites + } else if let Some(count) = payload.count { + if count <= 0 || count > 1000 { + return Ok(( + StatusCode::BAD_REQUEST, + Json(crate::waitlist::BatchInviteResponse { + success: false, + invited_count: 0, + message: "Count must be between 1 and 1000.".to_string(), + }), + )); + } + + state + .db + .waitlist_invite_top_n(count) + .await + .map_err(into_api_error)? + } else { + return Ok(( + StatusCode::BAD_REQUEST, + Json(crate::waitlist::BatchInviteResponse { + success: false, + invited_count: 0, + message: "Either 'count' or 'positions' must be provided.".to_string(), + }), + )); + }; + + let invited_count = invited_entries.len() as i32; + + // Queue invitation emails for each invited user + for entry in invited_entries { + let template_data = serde_json::json!({ + "email": entry.email, + "name": entry.name.unwrap_or_else(|| "there".to_string()), + "dashboard_url": format!("{}/dashboard", state.config.base_url.trim_end_matches('/')), + "help_url": format!("{}/help", state.config.base_url.trim_end_matches('/')), + }); + + let _ = state + .email_queue + .enqueue( + crate::email::types::EmailJobType::WelcomeEmail, + &entry.email, + "welcome_email", + template_data, + 1, // Higher priority for invitations + ) + .await; + } + + tracing::info!("[waitlist] batch invite sent to {} users", invited_count); + + Ok(( + StatusCode::OK, + Json(crate::waitlist::BatchInviteResponse { + success: true, + invited_count, + message: format!("Successfully invited {} users.", invited_count), + }), + )) +} + +#[derive(Debug, Clone, Deserialize)] +pub struct WaitlistAcceptanceRequest { + pub email: String, +} + +pub async fn waitlist_track_acceptance( + State(state): State>, + Json(payload): Json, +) -> Result { + let Some(email) = normalized_email(&payload.email) else { + return Ok(( + StatusCode::BAD_REQUEST, + Json(NewsletterResponse { + success: false, + message: "Invalid email address.".to_string(), + }), + )); + }; + + let updated = state + .db + .waitlist_mark_invitation_accepted(&email) + .await + .map_err(into_api_error)?; + + if !updated { + return Ok(( + StatusCode::NOT_FOUND, + Json(NewsletterResponse { + success: false, + message: "No pending invitation found.".to_string(), + }), + )); + } + + tracing::info!("[waitlist] invitation accepted email={}", email); + + Ok(( + StatusCode::OK, + Json(NewsletterResponse { + success: true, + message: "Invitation acceptance tracked.".to_string(), + }), + )) +} diff --git a/services/api/src/main.rs b/services/api/src/main.rs index a35593d..606745b 100644 --- a/services/api/src/main.rs +++ b/services/api/src/main.rs @@ -6,6 +6,7 @@ mod email; mod handlers; mod metrics; mod newsletter; +mod waitlist; use std::sync::Arc; @@ -156,6 +157,27 @@ async fn main() -> anyhow::Result<()> { "/webhooks/sendgrid", post(handlers::sendgrid_webhook), ) + // Waitlist endpoints + .route( + "/api/v1/waitlist/join", + post(handlers::waitlist_join), + ) + .route( + "/api/v1/waitlist/stats", + get(handlers::waitlist_stats), + ) + .route( + "/api/v1/waitlist/export", + get(handlers::waitlist_export), + ) + .route( + "/api/v1/waitlist/invite", + post(handlers::waitlist_batch_invite), + ) + .route( + "/api/v1/waitlist/acceptance", + post(handlers::waitlist_track_acceptance), + ) .layer(TraceLayer::new_for_http()) .with_state(state); diff --git a/services/api/src/waitlist.rs b/services/api/src/waitlist.rs new file mode 100644 index 0000000..9df6f92 --- /dev/null +++ b/services/api/src/waitlist.rs @@ -0,0 +1,91 @@ +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WaitlistEntry { + pub id: Uuid, + pub email: String, + pub name: Option, + pub role: Option, + pub status: String, + pub source: Option, + pub referral_code: String, + pub referred_by_code: Option, + pub position: i32, + pub priority_score: i32, + pub joined_at: DateTime, + pub invited_at: Option>, + pub invitation_accepted_at: Option>, + pub converted_at: Option>, + pub created_at: DateTime, + pub updated_at: DateTime, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WaitlistJoinRequest { + pub email: String, + pub name: Option, + #[serde(rename = "role")] + pub user_role: Option, + #[serde(rename = "referralCode")] + pub referral_code: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WaitlistJoinResponse { + pub success: bool, + pub position: i32, + #[serde(rename = "referralCode")] + pub referral_code: String, + pub message: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WaitlistExportEntry { + pub email: String, + pub name: Option, + pub role: Option, + pub status: String, + pub position: i32, + pub referral_code: String, + pub referral_count: i32, + pub joined_at: DateTime, + pub invited_at: Option>, + pub invitation_accepted_at: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WaitlistStats { + pub total_entries: i64, + pub pending_entries: i64, + pub invited_entries: i64, + pub accepted_entries: i64, + pub total_referrals: i64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchInviteRequest { + pub count: Option, + pub positions: Option>, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct BatchInviteResponse { + pub success: bool, + pub invited_count: i32, + pub message: String, +} + +pub fn generate_referral_code() -> String { + use rand::Rng; + const CHARSET: &[u8] = b"ABCDEFGHJKLMNPQRSTUVWXYZ23456789"; + let mut rng = rand::thread_rng(); + + (0..8) + .map(|_| { + let idx = rng.gen_range(0..CHARSET.len()); + CHARSET[idx] as char + }) + .collect() +}