From 0749bc50934b4666608cf59d886ce79cddd69619 Mon Sep 17 00:00:00 2001 From: Harold John Date: Mon, 27 Apr 2026 11:32:13 +0000 Subject: [PATCH] fix(#485-#488): sync API_SPEC, add email DLQ, validate templates at startup #485: Correct error code values in API_SPEC.md to match #[repr(u32)] enum (100-148). Expand events table with 7 missing events (ref_dist, mkt_prune, upg_init, upg_vote, upg_exec, upg_rej, mkt_state), fix OracleResultSet data payload to include oracle_id, correct topic layouts for mon_reset/rep_set/ dep_set. Bump spec version to 1.1.0. #486: Exponential backoff already present in mark_failed (2^n * 60s). #487: Add EMAIL_DEAD_LETTER_KEY Redis sorted set. mark_failed now pushes permanently-failed jobs to the DLQ. Add list_dead_letter() and requeue_dead_letter() methods. Expose via admin API: GET /api/v1/email/queue/dead-letter POST /api/v1/email/queue/dead-letter/:job_id/requeue QueueStats gains dead_letter field. Audit middleware covers new routes. #488: EmailTemplateEngine::new() calls validate_all_templates() which renders all 4 templates with representative fixture data. Invalid syntax or missing variables now cause a startup error instead of a runtime send failure. --- API_SPEC.md | 162 ++++++++++++++------------- services/api/src/audit_middleware.rs | 5 + services/api/src/email/queue.rs | 59 +++++++++- services/api/src/email/templates.rs | 41 ++++++- services/api/src/handlers.rs | 29 +++++ services/api/src/main.rs | 8 ++ 6 files changed, 227 insertions(+), 77 deletions(-) diff --git a/API_SPEC.md b/API_SPEC.md index b855b71..4d028e4 100644 --- a/API_SPEC.md +++ b/API_SPEC.md @@ -1,6 +1,7 @@ # PredictIQ Contract API Specification > Reflects the on-chain implementation as of the current `contracts/predict-iq` source. +> **Spec version:** 1.1.0 — updated 2026-04-27 (issues #485: error code values corrected to match `#[repr(u32)]` enum; events table expanded with missing events and corrected topic layouts) --- @@ -341,56 +342,55 @@ Filters by `Active | PendingResolution | Disputed | Resolved | Cancelled`. Itera | Code | Value | Description | |------|-------|-------------| -| `AlreadyInitialized` | 1 | Contract already initialized | -| `NotAuthorized` | 2 | Caller lacks required authorization | -| `GuardianNotSet` | 3 | Guardian account not configured | -| `MarketNotFound` | 4 | No market with the given ID | -| `MarketClosed` | 5 | Market deadline has passed | -| `MarketStillActive` | 6 | Market is still accepting bets | -| `MarketNotActive` | 7 | Market is not in Active state | -| `MarketNotResolved` | 8 | Market has not been resolved yet | -| `MarketNotDisputed` | 9 | Market is not in Disputed state | -| `MarketNotPendingResolution` | 10 | Market is not in PendingResolution state | -| `CannotChangeOutcome` | 11 | Outcome is already finalized | -| `InvalidDeadline` | 12 | Deadline is in the past or malformed | -| `DeadlinePassed` | 13 | Action attempted after deadline | -| `ResolutionDeadlinePassed` | 14 | Resolution deadline has elapsed | -| `ResolutionNotReady` | 15 | Conditions for resolution not yet met | -| `GracePeriodActive` | 16 | Grace period has not elapsed | -| `MarketIdOverflow` | 17 | Market ID counter overflowed | -| `MarketIdCollision` | 18 | Market ID already in use | -| `InvalidOutcome` | 19 | Outcome index out of range | -| `TooManyOutcomes` | 20 | Exceeds `MAX_OUTCOMES_PER_MARKET` (32) | -| `InvalidBetAmount` | 21 | Bet amount is zero or below minimum | -| `InsufficientBalance` | 22 | Caller token balance too low | -| `InsufficientDeposit` | 23 | Creation deposit not met | -| `InvalidAmount` | 24 | Generic invalid amount | -| `BetNotFound` | 25 | No bet record for this bettor/market | -| `NoWinnings` | 26 | Bettor did not back the winning outcome | -| `AlreadyClaimed` | 27 | Winnings or refund already claimed | -| `OracleFailure` | 28 | Oracle cross-contract call failed | -| `StalePrice` | 29 | Price feed `publish_time` older than `max_staleness_seconds` | -| `ConfidenceTooLow` | 30 | Oracle confidence interval exceeds `max_confidence_bps` | -| `InvalidTimestamp` | 31 | Timestamp value is invalid | -| `AssetClawedBack` | 32 | SAC token clawback reduced contract balance unexpectedly | -| `TransferFailed` | 33 | SAC token transfer failed programmatically | -| `DisputeWindowClosed` | 34 | Dispute window has expired | -| `DisputeWindowStillOpen` | 35 | Dispute window has not yet closed | -| `AlreadyVoted` | 36 | Address has already cast a vote | -| `InsufficientVotes` | 37 | Not enough votes to proceed | -| `InsufficientVotingWeight` | 38 | Voter's governance token balance too low | -| `NoMajorityReached` | 39 | No outcome reached the 60% majority threshold | -| `GovernanceTokenNotSet` | 40 | Governance token address not configured | -| `TimelockActive` | 41 | Upgrade timelock has not elapsed | -| `UpgradeNotInitiated` | 42 | No pending upgrade to act on | -| `AlreadyVotedOnUpgrade` | 43 | Address already voted on this upgrade | -| `UpgradeAlreadyPending` | 44 | An upgrade proposal is already pending | -| `UpgradeHashInCooldown` | 45 | This wasm hash is in the 7-day cooldown period | -| `ParentMarketNotResolved` | 46 | Conditional market's parent is not yet resolved | -| `ParentMarketInvalidOutcome` | 47 | Parent market resolved to a different outcome | -| `ContractPaused` | 48 | Contract is paused via circuit breaker | -| `InvalidReferrer` | 49 | Referrer address is invalid or self-referral | -| `VotingPeriodNotElapsed` | 50 | Admin fallback called before 72-hour voting window elapsed | +| `AlreadyInitialized` | 100 | Contract already initialized | +| `NotAuthorized` | 101 | Caller lacks required authorization | +| `MarketNotFound` | 102 | No market with the given ID | +| `MarketClosed` | 103 | Market deadline has passed | +| `MarketStillActive` | 104 | Market is still accepting bets | +| `InvalidOutcome` | 105 | Outcome index out of range | +| `InvalidBetAmount` | 106 | Bet amount is zero or below minimum | +| `InsufficientBalance` | 107 | Caller token balance too low | +| `OracleFailure` | 108 | Oracle cross-contract call failed | +| `CircuitBreakerOpen` | 109 | Circuit breaker is open; operation blocked | +| `DisputeWindowClosed` | 110 | Dispute window has expired | +| `VotingNotStarted` | 111 | Voting period has not begun | +| `VotingEnded` | 112 | Voting period has already ended | +| `AlreadyVoted` | 113 | Address has already cast a vote | +| `FeeTooHigh` | 114 | Proposed fee exceeds allowed maximum | +| `MarketNotActive` | 115 | Market is not in Active state | +| `DeadlinePassed` | 116 | Action attempted after deadline | +| `CannotChangeOutcome` | 117 | Outcome is already finalized | +| `MarketNotDisputed` | 118 | Market is not in Disputed state | +| `MarketNotPendingResolution` | 119 | Market is not in PendingResolution state | +| `AdminNotSet` | 120 | Admin account not configured | +| `ContractPaused` | 121 | Contract is paused via circuit breaker | +| `GuardianNotSet` | 122 | Guardian account not configured | +| `TooManyOutcomes` | 123 | Exceeds `MAX_OUTCOMES_PER_MARKET` (32) | +| `TooManyWinners` | 124 | Exceeds maximum push-payout winner threshold | +| `PayoutModeNotSupported` | 125 | Requested payout mode is not supported | +| `InsufficientDeposit` | 126 | Creation deposit not met | +| `TimelockActive` | 127 | Upgrade timelock has not elapsed | +| `UpgradeNotInitiated` | 128 | No pending upgrade to act on | +| `InsufficientVotes` | 129 | Not enough votes to proceed | +| `AlreadyVotedOnUpgrade` | 130 | Address already voted on this upgrade | +| `InvalidWasmHash` | 131 | Provided wasm hash is invalid | +| `UpgradeFailed` | 132 | Upgrade execution failed | +| `ParentMarketNotResolved` | 133 | Conditional market's parent is not yet resolved | +| `ParentMarketInvalidOutcome` | 134 | Parent market resolved to a different outcome | +| `ResolutionNotReady` | 135 | Conditions for resolution not yet met | +| `DisputeWindowStillOpen` | 136 | Dispute window has not yet closed | +| `NoMajorityReached` | 137 | No outcome reached the 60% majority threshold | +| `StalePrice` | 138 | Price feed `publish_time` older than `max_staleness_seconds` | +| `ConfidenceTooLow` | 139 | Oracle confidence interval exceeds `max_confidence_bps` | +| `InsufficientVotingWeight` | 140 | Voter's governance token balance too low | +| `MarketNotCancelled` | 141 | Market is not in Cancelled state | +| `BetNotFound` | 142 | No bet record for this bettor/market | +| `UpgradeAlreadyPending` | 143 | An upgrade proposal is already pending | +| `UpgradeHashInCooldown` | 144 | This wasm hash is in the 7-day cooldown period | +| `InvalidAmount` | 145 | Generic invalid amount | +| `GovernanceTokenNotSet` | 146 | Governance token address not configured | +| `MarketNotResolved` | 147 | Market has not been resolved yet | +| `InvalidDeadline` | 148 | Deadline is in the past or malformed | --- @@ -401,28 +401,40 @@ All events follow the topic layout: - **Topic 1:** `market_id: u64` (primary indexer key; `0` for contract-level events) - **Topic 2:** Triggering address -| Event | Topic Symbol | Data Payload | -|-------|-------------|--------------| -| MarketCreated | `mkt_creat` | `(description: String, num_outcomes: u32, deadline: u64)` | -| BetPlaced | `bet_place` | `(outcome: u32, amount: i128)` | -| DisputeFiled | `disp_file` | `new_deadline: u64` | -| ResolutionFinalized | `resolv_fx` | `(winning_outcome: u32, total_payout: i128)` | -| RewardsClaimed | `reward_fx` | `(amount: i128, token_address: Address, is_refund: bool)` | -| VoteCast | `vote_cast` | `(outcome: u32, weight: i128)` | -| CircuitBreakerTriggered | `cb_state` | `state: String` | -| OracleResultSet | `oracle_ok` | `outcome: u32` | -| OracleResolved | `orcl_res` | `outcome: u32` | -| MarketFinalized | `mkt_final` | `winning_outcome: u32` | -| DisputeResolved | `disp_res` | `winning_outcome: u32` | -| MarketCancelled (admin) | `mkt_cncl` | `()` | -| MarketCancelledVote (community) | `mk_cn_vt` | `()` | -| ReferralReward | `ref_rwrd` | `amount: i128` | -| ReferralClaimed | `ref_claim` | `amount: i128` | -| CircuitBreakerAuto | `cb_auto` | `error_count: u32` | -| MonitoringStateReset | `mon_reset` | `(previous_error_count: u32, previous_last_observation: u64)` | -| FeeCollected | `fee_colct` | `amount: i128` | -| AdminFallbackResolution | `adm_fbk` | `winning_outcome: u32` | -| CreatorReputationSet | `rep_set` | `(old_score: u32, new_score: u32)` | -| CreationDepositSet | `dep_set` | `(old_amount: i128, new_amount: i128)` | - -> **Note:** `MonitoringStateReset`, `CircuitBreakerTriggered`, `CircuitBreakerAuto`, and `FeeCollected` use `market_id = 0` and the contract address as Topic 2. `CreatorReputationSet` uses `(symbol, creator)` with no `market_id`. `CreationDepositSet` uses `(symbol,)` only. +| Event | Topic Symbol | Topics | Data Payload | +|-------|-------------|--------|--------------| +| MarketCreated | `mkt_creat` | `(mkt_creat, market_id, creator)` | `(description: String, num_outcomes: u32, deadline: u64)` | +| BetPlaced | `bet_place` | `(bet_place, market_id, bettor)` | `(outcome: u32, amount: i128)` | +| DisputeFiled | `disp_file` | `(disp_file, market_id, disciplinarian)` | `new_deadline: u64` | +| ResolutionFinalized | `resolv_fx` | `(resolv_fx, market_id, resolver)` | `(winning_outcome: u32, total_payout: i128)` | +| RewardsClaimed | `reward_fx` | `(reward_fx, market_id, claimer)` | `(amount: i128, token_address: Address, is_refund: bool)` | +| VoteCast | `vote_cast` | `(vote_cast, market_id, voter)` | `(outcome: u32, weight: i128)` | +| CircuitBreakerTriggered | `cb_state` | `(cb_state, 0, contract_address)` | `state: String` | +| OracleResultSet | `oracle_ok` | `(oracle_ok, market_id, oracle_source)` | `(oracle_id: u32, outcome: u32)` | +| OracleResolved | `orcl_res` | `(orcl_res, market_id, oracle_address)` | `outcome: u32` | +| MarketFinalized | `mkt_final` | `(mkt_final, market_id, resolver)` | `winning_outcome: u32` | +| DisputeResolved | `disp_res` | `(disp_res, market_id, resolver)` | `winning_outcome: u32` | +| MarketCancelled (admin) | `mkt_cncl` | `(mkt_cncl, market_id, admin)` | `()` | +| MarketCancelledVote (community) | `mk_cn_vt` | `(mk_cn_vt, market_id, resolver)` | `()` | +| ReferralReward | `ref_rwrd` | `(ref_rwrd, market_id, referrer)` | `amount: i128` | +| ReferralClaimed | `ref_claim` | `(ref_claim, market_id, claimer)` | `amount: i128` | +| ReferralDistribution | `ref_dist` | `(ref_dist, market_id, token)` | `()` | +| CircuitBreakerAuto | `cb_auto` | `(cb_auto, 0, contract_address)` | `error_count: u32` | +| FeeCollected | `fee_colct` | `(fee_colct, 0, contract_address)` | `amount: i128` | +| AdminFallbackResolution | `adm_fbk` | `(adm_fbk, market_id, admin)` | `winning_outcome: u32` | +| CreatorReputationSet | `rep_set` | `(rep_set, creator)` | `(old_score: u32, new_score: u32)` | +| CreationDepositSet | `dep_set` | `(dep_set,)` | `(old_amount: i128, new_amount: i128)` | +| MonitoringStateReset | `mon_reset` | `(mon_reset, resetter)` | `(previous_error_count: u32, previous_last_observation: u64)` | +| MarketPruned | `mkt_prune` | `(mkt_prune, market_id)` | `pruned_at: u64` | +| UpgradeInitiated | `upg_init` | `(upg_init, initiator)` | `wasm_hash: BytesN<32>` | +| UpgradeVoted | `upg_vote` | `(upg_vote, voter)` | `vote_for: bool` | +| UpgradeExecuted | `upg_exec` | `(upg_exec, executor)` | `wasm_hash: BytesN<32>` | +| UpgradeRejected | `upg_rej` | `(upg_rej,)` | `wasm_hash: BytesN<32>` | +| MarketStateChanged | `mkt_state` | `(mkt_state, market_id)` | `(old_status: String, new_status: String, timestamp: u64)` | + +> **Notes:** +> - `CircuitBreakerTriggered`, `CircuitBreakerAuto`, and `FeeCollected` use `market_id = 0` and the contract address as Topic 2. +> - `CreatorReputationSet` uses `(symbol, creator)` with no `market_id`. +> - `CreationDepositSet` uses `(symbol,)` only. +> - `MonitoringStateReset` uses `(symbol, resetter)` with no `market_id`. +> - `OracleResultSet` data includes `oracle_id` to identify which oracle source reported the result (multi-oracle support). diff --git a/services/api/src/audit_middleware.rs b/services/api/src/audit_middleware.rs index 76ef81e..4aa3800 100644 --- a/services/api/src/audit_middleware.rs +++ b/services/api/src/audit_middleware.rs @@ -108,6 +108,11 @@ fn parse_admin_action(path: &str, method: &axum::http::Method) -> (String, Strin ("view_email_analytics".to_string(), "email_analytics".to_string(), None) } else if path.contains("/email/queue/stats") { ("view_queue_stats".to_string(), "email_queue".to_string(), None) + } else if path.contains("/email/queue/dead-letter") && path.contains("/requeue") { + let job_id = path.split('/').nth_back(1).map(|s| s.to_string()); + ("requeue_dead_letter".to_string(), "email_queue".to_string(), job_id) + } else if path.contains("/email/queue/dead-letter") { + ("list_dead_letter".to_string(), "email_queue".to_string(), None) } else if path.contains("/audit/logs") { ("query_audit_logs".to_string(), "audit_log".to_string(), None) } else { diff --git a/services/api/src/email/queue.rs b/services/api/src/email/queue.rs index 1e1ed57..e525f8d 100644 --- a/services/api/src/email/queue.rs +++ b/services/api/src/email/queue.rs @@ -13,6 +13,7 @@ use crate::email::types::{EmailJobStatus, EmailJobType}; const EMAIL_QUEUE_KEY: &str = "email:queue"; const EMAIL_PROCESSING_KEY: &str = "email:processing"; const EMAIL_RETRY_KEY: &str = "email:retry"; +const EMAIL_DEAD_LETTER_KEY: &str = "email:dead_letter"; #[derive(Clone)] pub struct EmailQueue { @@ -153,11 +154,18 @@ impl EmailQueue { error ); } else { - // Max attempts reached, mark as permanently failed + // Max attempts reached — mark as permanently failed and move to dead-letter set. self.db .email_update_job_status(job_id, EmailJobStatus::Failed.as_str(), Some(error)) .await?; + let mut conn = self.cache.manager.clone(); + let failed_at = chrono::Utc::now().timestamp() as f64; + let _: () = conn + .zadd(EMAIL_DEAD_LETTER_KEY, job_id.to_string(), failed_at) + .await + .context("Failed to add job to dead-letter set")?; + tracing::error!( "Email job {} permanently failed after {} attempts: {}", job_id, @@ -207,6 +215,48 @@ impl EmailQueue { Ok(count) } + /// List all job IDs currently in the dead-letter set (oldest-failed first). + pub async fn list_dead_letter(&self) -> Result> { + let mut conn = self.cache.manager.clone(); + let items: Vec = conn + .zrange(EMAIL_DEAD_LETTER_KEY, 0isize, -1isize) + .await + .context("Failed to list dead-letter set")?; + + items + .iter() + .map(|s| Uuid::parse_str(s).context("Invalid UUID in dead-letter set")) + .collect() + } + + /// Move a job from the dead-letter set back to the main queue for reprocessing. + pub async fn requeue_dead_letter(&self, job_id: Uuid) -> Result { + let mut conn = self.cache.manager.clone(); + + let removed: usize = conn + .zrem(EMAIL_DEAD_LETTER_KEY, job_id.to_string()) + .await + .context("Failed to remove job from dead-letter set")?; + + if removed == 0 { + return Ok(false); + } + + // Reset DB status so the worker will pick it up again. + self.db + .email_update_job_status(job_id, crate::email::types::EmailJobStatus::Pending.as_str(), None) + .await?; + + let score = chrono::Utc::now().timestamp() as f64; + let _: () = conn + .zadd(EMAIL_QUEUE_KEY, job_id.to_string(), score) + .await + .context("Failed to re-enqueue dead-letter job")?; + + tracing::info!("Requeued dead-letter email job: {}", job_id); + Ok(true) + } + /// Get queue statistics pub async fn get_stats(&self) -> Result { let mut conn = self.cache.manager.clone(); @@ -226,10 +276,16 @@ impl EmailQueue { .await .context("Failed to get retry count")?; + let dead_letter: usize = conn + .zcard(EMAIL_DEAD_LETTER_KEY) + .await + .context("Failed to get dead-letter count")?; + Ok(QueueStats { pending, processing, retry, + dead_letter, }) } @@ -362,4 +418,5 @@ pub struct QueueStats { pub pending: usize, pub processing: usize, pub retry: usize, + pub dead_letter: usize, } diff --git a/services/api/src/email/templates.rs b/services/api/src/email/templates.rs index ae4cd70..40430b7 100644 --- a/services/api/src/email/templates.rs +++ b/services/api/src/email/templates.rs @@ -33,7 +33,46 @@ impl EmailTemplateEngine { include_str!("../../templates/welcome_email.html"), )?; - Ok(Self { handlebars }) + let engine = Self { handlebars }; + + // Validate all templates at startup by rendering with representative data. + // This catches missing/misspelled variable references before the first send. + engine.validate_all_templates()?; + + Ok(engine) + } + + /// Render each registered template with representative data to catch syntax + /// errors and missing variable references at startup rather than at send time. + fn validate_all_templates(&self) -> Result<()> { + let fixtures: &[(&str, Value)] = &[ + ("newsletter_confirmation", serde_json::json!({ + "confirm_url": "https://example.com/confirm?token=startup-check", + "email": "startup@example.com" + })), + ("waitlist_confirmation", serde_json::json!({ + "email": "startup@example.com" + })), + ("contact_form_auto_response", serde_json::json!({ + "name": "Startup Check", + "subject": "Startup Check", + "message": "Startup validation render." + })), + ("welcome_email", serde_json::json!({ + "name": "Startup Check", + "dashboard_url": "https://example.com/dashboard", + "help_url": "https://example.com/help", + "unsubscribe_url": "https://example.com/unsubscribe" + })), + ]; + + for (name, data) in fixtures { + self.handlebars + .render(name, data) + .with_context(|| format!("Template validation failed for '{name}': invalid syntax or missing variable"))?; + } + + Ok(()) } pub fn render(&self, template_name: &str, data: &Value) -> Result { diff --git a/services/api/src/handlers.rs b/services/api/src/handlers.rs index b1991fe..c03f128 100644 --- a/services/api/src/handlers.rs +++ b/services/api/src/handlers.rs @@ -1008,6 +1008,35 @@ pub async fn email_queue_stats( Ok((StatusCode::OK, Json(stats))) } +pub async fn email_dead_letter_list( + State(state): State>, +) -> Result { + let ids = state + .email_queue + .list_dead_letter() + .await + .map_err(into_api_error)?; + + Ok((StatusCode::OK, Json(serde_json::json!({ "jobs": ids, "count": ids.len() })))) +} + +pub async fn email_dead_letter_requeue( + State(state): State>, + Path(job_id): Path, +) -> Result { + let requeued = state + .email_queue + .requeue_dead_letter(job_id) + .await + .map_err(into_api_error)?; + + if requeued { + Ok((StatusCode::OK, Json(serde_json::json!({ "requeued": true, "job_id": job_id })))) + } else { + Err(ApiError::not_found(format!("Job {job_id} not found in dead-letter set"))) + } +} + pub async fn sendgrid_webhook( State(state): State>, headers: HeaderMap, diff --git a/services/api/src/main.rs b/services/api/src/main.rs index 5f36d70..b4cde09 100644 --- a/services/api/src/main.rs +++ b/services/api/src/main.rs @@ -255,6 +255,14 @@ async fn main() -> anyhow::Result<()> { "/api/v1/email/queue/stats", get(handlers::email_queue_stats), ) + .route( + "/api/v1/email/queue/dead-letter", + get(handlers::email_dead_letter_list), + ) + .route( + "/api/v1/email/queue/dead-letter/:job_id/requeue", + post(handlers::email_dead_letter_requeue), + ) .route( "/api/v1/audit/logs", get(handlers::audit_logs),