diff --git a/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json b/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json deleted file mode 100644 index 7b19133922..0000000000 --- a/apps/labrinth/.sqlx/query-f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n UPDATE users\n SET badges = $1\n WHERE (id = $2)\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "f2525e9be3b90fc0c42c8333ca795ff0b6eb1d3c4350d8e025d39d927d4547fc" -} diff --git a/apps/labrinth/src/queue/billing.rs b/apps/labrinth/src/queue/billing.rs index 8200ecf0d6..ae3e474f54 100644 --- a/apps/labrinth/src/queue/billing.rs +++ b/apps/labrinth/src/queue/billing.rs @@ -29,647 +29,444 @@ use futures::stream::{FuturesUnordered, StreamExt}; use sqlx::PgPool; use std::collections::HashSet; use std::str::FromStr; +use std::time::Instant; use stripe::{self, Currency}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, warn}; -pub async fn index_subscriptions( - pool: PgPool, - redis: RedisPool, - stripe_client: stripe::Client, - anrok_client: anrok::Client, -) { - info!("Indexing subscriptions"); - - async fn anrok_api_operations( - pool: PgPool, - redis: RedisPool, - stripe_client: stripe::Client, - anrok_client: anrok::Client, - ) { - let then = std::time::Instant::now(); - let result = update_tax_amounts( - pool.clone(), - redis.clone(), - anrok_client.clone(), - stripe_client.clone(), - 100, - ) - .await; +/// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching +/// Anrok API limits. +/// +/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up +/// to the specified limit of processed charges. +async fn update_tax_amounts( + pg: &PgPool, + redis: &RedisPool, + anrok_client: &anrok::Client, + stripe_client: &stripe::Client, + limit: i64, +) -> Result<(), ApiError> { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - if let Err(e) = result { - warn!("Error updating tax amount on charges: {:?}", e); - } + let mut processed_charges = 0; - let result = update_tax_transactions( - pool, - redis, - anrok_client, - stripe_client, - 100, - ) - .await; + loop { + interval.tick().await; - if let Err(e) = result { - warn!("Error updating tax transactions: {:?}", e); - } + let mut txn = pg.begin().await?; - info!( - "Updating tax amounts and Anrok transactions took {:?}", - then.elapsed() - ); - } + let charges = DBCharge::get_updateable_lock(&mut *txn, 6).await?; - /// Updates charges which need to have their tax amount updated. This is done within a timer to avoid reaching - /// Anrok API limits. - /// - /// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up - /// to the specified limit of processed charges. - async fn update_tax_amounts( - pg: PgPool, - redis: RedisPool, - anrok_client: anrok::Client, - stripe_client: stripe::Client, - limit: i64, - ) -> Result<(), ApiError> { - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(1)); - interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - let mut processed_charges = 0; - - loop { - interval.tick().await; + if charges.is_empty() { + info!("No more charges to process"); + break Ok(()); + } - let mut txn = pg.begin().await?; + let anrok_client_ref = anrok_client.clone(); + let stripe_client_ref = stripe_client.clone(); + let pg_ref = pg.clone(); + let redis_ref = redis.clone(); - let charges = DBCharge::get_updateable_lock(&mut *txn, 6).await?; + struct ProcessedCharge { + new_tax_amount: i64, + product_name: String, + } - if charges.is_empty() { - info!("No more charges to process"); - break Ok(()); - } + let mut futures = charges + .into_iter() + .map(|charge| { + let stripe_client = stripe_client_ref.clone(); + let anrok_client = anrok_client_ref.clone(); + let pg = pg_ref.clone(); + let redis = redis_ref.clone(); - let anrok_client_ref = anrok_client.clone(); - let stripe_client_ref = stripe_client.clone(); - let pg_ref = pg.clone(); - let redis_ref = redis.clone(); + let charge_clone = charge.clone(); - struct ProcessedCharge { - new_tax_amount: i64, - product_name: String, - } + let op_fut = async move { + let tax_id = DBProductsTaxIdentifier::get_price( + charge.price_id, + &pg, + ) + .await? + .ok_or_else(|| { + DatabaseError::Database(sqlx::Error::RowNotFound) + })?; - let mut futures = charges - .into_iter() - .map(|charge| { - let stripe_client = stripe_client_ref.clone(); - let anrok_client = anrok_client_ref.clone(); - let pg = pg_ref.clone(); - let redis = redis_ref.clone(); - - let charge_clone = charge.clone(); - - let op_fut = async move { - let tax_id = DBProductsTaxIdentifier::get_price( - charge.price_id, - &pg, - ) - .await? - .ok_or_else(|| { - DatabaseError::Database(sqlx::Error::RowNotFound) + let product = + DBProduct::get_price(charge.price_id, &pg) + .await? + .ok_or_else(|| { + DatabaseError::Database( + sqlx::Error::RowNotFound, + ) })?; - let product = - DBProduct::get_price(charge.price_id, &pg) + let stripe_address = 'a: { + let stripe_customer_id = + DBUser::get_id(charge.user_id, &pg, &redis) .await? .ok_or_else(|| { - DatabaseError::Database( - sqlx::Error::RowNotFound, - ) - })?; - - let stripe_address = 'a: { - let stripe_customer_id = - DBUser::get_id(charge.user_id, &pg, &redis) - .await? - .ok_or_else(|| { - ApiError::from(DatabaseError::Database( - sqlx::Error::RowNotFound, - )) - }) - .and_then(|user| { - user.stripe_customer_id.ok_or_else( - || { - ApiError::InvalidInput( - "User has no Stripe customer ID" - .to_owned(), - ) - }, - ) - })? - .parse() - .map_err(|_| { - ApiError::InvalidInput( - "User Stripe customer ID was invalid".to_owned(), - ) - })?; - - let customer = stripe::Customer::retrieve( - &stripe_client, - &stripe_customer_id, - &["invoice_settings.default_payment_method"], - ) - .await?; - - let payment_method = customer - .invoice_settings - .and_then(|x| { - x.default_payment_method.and_then(|x| x.into_object()) + ApiError::from(DatabaseError::Database( + sqlx::Error::RowNotFound, + )) }) - .ok_or_else(|| { + .and_then(|user| { + user.stripe_customer_id.ok_or_else( + || { + ApiError::InvalidInput( + "User has no Stripe customer ID" + .to_owned(), + ) + }, + ) + })? + .parse() + .map_err(|_| { ApiError::InvalidInput( - "Customer has no default payment method!".to_string(), + "User Stripe customer ID was invalid".to_owned(), ) })?; - let stripe_address = payment_method.billing_details.address; + let customer = stripe::Customer::retrieve( + &stripe_client, + &stripe_customer_id, + &["invoice_settings.default_payment_method"], + ) + .await?; - // Attempt the default payment method's address first, then the customer's address. - match stripe_address { - Some(address) => break 'a address, - None => { - warn!("PaymentMethod had no address"); - } - }; + // A customer should have a default payment method if they have an active subscription. - customer.address.ok_or_else(|| { + let payment_method = customer + .invoice_settings + .and_then(|x| { + x.default_payment_method.and_then(|x| x.into_object()) + }) + .ok_or_else(|| { ApiError::InvalidInput( - "Couldn't get an address for the Stripe customer" - .to_owned(), + "Customer has no default payment method!".to_string(), ) - })? + })?; + + let stripe_address = payment_method.billing_details.address; + + // Attempt the default payment method's address first, then the customer's address. + match stripe_address { + Some(address) => break 'a address, + None => { + warn!("PaymentMethod had no address"); + } }; - let customer_address = - anrok::Address::from_stripe_address( - &stripe_address, - ); - - let tax_amount = anrok_client - .create_ephemeral_txn(&anrok::TransactionFields { - customer_address, - currency_code: charge.currency_code.clone(), - accounting_time: charge.due, - accounting_time_zone: - anrok::AccountingTimeZone::Utc, - line_items: vec![anrok::LineItem::new( - tax_id.tax_processor_id, - charge.amount, - )], - }) - .await? - .tax_amount_to_collect; - - Result::::Ok( - ProcessedCharge { - new_tax_amount: tax_amount, - product_name: product - .name - .unwrap_or_else(|| "Modrinth".to_owned()), - }, - ) + customer.address.ok_or_else(|| { + ApiError::InvalidInput( + "Couldn't get an address for the Stripe customer" + .to_owned(), + ) + })? }; - op_fut.then(move |res| async move { (charge_clone, res) }) - }) - .collect::>(); - - while let Some(result) = futures.next().await { - processed_charges += 1; - - let mut charge = match result { - ( - mut charge, - Ok(ProcessedCharge { - new_tax_amount, - product_name, - }), - ) => { - if new_tax_amount != charge.tax_amount { - // The price of the subscription has changed, we need to insert a notification - // for this. - - let subscription_id = - charge.subscription_id.ok_or_else(|| { - ApiError::InvalidInput( - "Charge has no subscription ID" - .to_owned(), - ) - })?; + let customer_address = + anrok::Address::from_stripe_address( + &stripe_address, + ); - NotificationBuilder { - body: NotificationBody::TaxNotification { - subscription_id: subscription_id.into(), - new_amount: charge.amount, - new_tax_amount, - old_amount: charge.amount, - old_tax_amount: charge.tax_amount, - billing_interval: charge - .subscription_interval - .unwrap_or(PriceDuration::Monthly), - due: charge.due, - service: product_name, - currency: charge.currency_code.clone(), - }, - } - .insert(charge.user_id, &mut txn, &redis) - .await?; + let tax_amount = anrok_client + .create_ephemeral_txn(&anrok::TransactionFields { + customer_address, + currency_code: charge.currency_code.clone(), + accounting_time: charge.due, + accounting_time_zone: + anrok::AccountingTimeZone::Utc, + line_items: vec![anrok::LineItem::new( + tax_id.tax_processor_id, + charge.amount, + )], + }) + .await? + .tax_amount_to_collect; + + Result::::Ok( + ProcessedCharge { + new_tax_amount: tax_amount, + product_name: product + .name + .unwrap_or_else(|| "Modrinth".to_owned()), + }, + ) + }; - charge.tax_amount = new_tax_amount; + op_fut.then(move |res| async move { (charge_clone, res) }) + }) + .collect::>(); + + while let Some(result) = futures.next().await { + processed_charges += 1; + + let mut charge = match result { + ( + mut charge, + Ok(ProcessedCharge { + new_tax_amount, + product_name, + }), + ) => { + if new_tax_amount != charge.tax_amount { + // The price of the subscription has changed, we need to insert a notification + // for this. + + let subscription_id = + charge.subscription_id.ok_or_else(|| { + ApiError::InvalidInput( + "Charge has no subscription ID".to_owned(), + ) + })?; + + NotificationBuilder { + body: NotificationBody::TaxNotification { + subscription_id: subscription_id.into(), + new_amount: charge.amount, + new_tax_amount, + old_amount: charge.amount, + old_tax_amount: charge.tax_amount, + billing_interval: charge + .subscription_interval + .unwrap_or(PriceDuration::Monthly), + due: charge.due, + service: product_name, + currency: charge.currency_code.clone(), + }, } + .insert(charge.user_id, &mut txn, redis) + .await?; - charge - } - (charge, Err(error)) => { - error!(%error, "Error indexing tax amount on charge"); - charge + charge.tax_amount = new_tax_amount; } - }; - charge.tax_last_updated = Some(Utc::now()); - charge.upsert(&mut txn).await?; - } + charge + } + (charge, Err(error)) => { + error!(%error, "Error indexing tax amount on charge"); + charge + } + }; - txn.commit().await?; + charge.tax_last_updated = Some(Utc::now()); + charge.upsert(&mut txn).await?; + } - if processed_charges >= limit { - break Ok(()); - } + txn.commit().await?; + + if processed_charges >= limit { + break Ok(()); } } +} - /// Registers Anrok transactions for charges which are missing a tax identifier. - /// - /// Same as update_tax_amounts, this is done within a timer to avoid reaching Anrok API limits. - /// - /// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up - /// to the specified limit of processed charges. - async fn update_tax_transactions( - pg: PgPool, - redis: RedisPool, - anrok_client: anrok::Client, - stripe_client: stripe::Client, - limit: i64, +/// Registers Anrok transactions for charges which are missing a tax identifier. +/// +/// Same as update_tax_amounts, this is done within a timer to avoid reaching Anrok API limits. +/// +/// The global rate limit for Anrok API operations is 10 RPS, so we run ~6 requests every second up +/// to the specified limit of processed charges. +async fn update_anrok_transactions( + pg: &PgPool, + redis: &RedisPool, + anrok_client: &anrok::Client, + stripe_client: &stripe::Client, + limit: i64, +) -> Result<(), ApiError> { + async fn process_charge( + stripe_client: &stripe::Client, + txn: &mut sqlx::PgTransaction<'_>, + redis: &RedisPool, + anrok_client: &anrok::Client, + mut c: DBCharge, ) -> Result<(), ApiError> { - let mut interval = - tokio::time::interval(std::time::Duration::from_secs(1)); - interval - .set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - let mut processed_charges = 0; - - loop { - interval.tick().await; - - let mut txn = pg.begin().await?; - - let charges = - DBCharge::get_missing_tax_identifier_lock(&mut *txn, 6).await?; + let (customer_address, tax_platform_id) = 'a: { + let (pi, tax_platform_id) = if c.type_ == ChargeType::Refund { + // the payment_platform_id should be an re or a pyr - if charges.is_empty() { - info!("No more charges to process"); - break Ok(()); - } - - for mut c in charges { - processed_charges += 1; - - let payment_intent_id = c + let refund_id: stripe::RefundId = c .payment_platform_id - .as_deref() + .as_ref() .and_then(|x| x.parse().ok()) .ok_or_else(|| { ApiError::InvalidInput( - "Charge has no or an invalid payment platform ID" + "Refund charge has no or an invalid refund ID" .to_owned(), ) })?; - let customer_address = 'a: { - let stripe_id: stripe::PaymentIntentId = c - .payment_platform_id - .as_ref() - .and_then(|x| x.parse().ok()) - .ok_or_else(|| { - ApiError::InvalidInput( - "Charge has no payment platform ID".to_owned(), - ) - })?; + let refund = stripe::Refund::retrieve( + stripe_client, + &refund_id, + &["payment_intent.payment_method"], + ) + .await?; - // Attempt retrieving the address via the payment intent's payment method - - let pi = stripe::PaymentIntent::retrieve( - &stripe_client, - &stripe_id, - &["payment_method"], - ) - .await?; - - let pi_stripe_address = pi - .payment_method - .and_then(|x| x.into_object()) - .and_then(|x| x.billing_details.address); + let pi = refund + .payment_intent + .and_then(|x| x.into_object()) + .ok_or_else(|| { + ApiError::InvalidInput( + "Refund charge has no payment intent".to_owned(), + ) + })?; - match pi_stripe_address { - Some(address) => break 'a address, - None => { - warn!("PaymentMethod had no address"); - } - }; + (pi, anrok::transaction_id_stripe_pyr(&refund_id)) + } else { + let stripe_id: stripe::PaymentIntentId = c + .payment_platform_id + .as_ref() + .and_then(|x| x.parse().ok()) + .ok_or_else(|| { + ApiError::InvalidInput( + "Charge has no payment platform ID".to_owned(), + ) + })?; - let stripe_customer_id = - DBUser::get_id(c.user_id, &pg, &redis) - .await? - .ok_or_else(|| { - ApiError::from(DatabaseError::Database( - sqlx::Error::RowNotFound, - )) - }) - .and_then(|user| { - user.stripe_customer_id.ok_or_else(|| { - ApiError::InvalidInput( - "User has no Stripe customer ID" - .to_owned(), - ) - }) - })?; + // Attempt retrieving the address via the payment intent's payment method - let customer_id = - stripe_customer_id.parse().map_err(|e| { - ApiError::InvalidInput(format!( - "Charge's Stripe customer ID was invalid ({e})" - )) - })?; + let pi = stripe::PaymentIntent::retrieve( + stripe_client, + &stripe_id, + &["payment_method"], + ) + .await?; - let customer = stripe::Customer::retrieve( - &stripe_client, - &customer_id, - &[], - ) - .await?; + let anrok_id = anrok::transaction_id_stripe_pi(&stripe_id); - customer.address.ok_or_else(|| { - ApiError::InvalidInput( - "Stripe customer had no address".to_owned(), - ) - })? - }; + (pi, anrok_id) + }; - let tax_id = - DBProductsTaxIdentifier::get_price(c.price_id, &pg) - .await? - .ok_or_else(|| { - DatabaseError::Database(sqlx::Error::RowNotFound) - })?; + let pi_stripe_address = pi + .payment_method + .and_then(|x| x.into_object()) + .and_then(|x| x.billing_details.address); + + match pi_stripe_address { + Some(address) => break 'a (address, tax_platform_id), + None => { + warn!( + "A PaymentMethod for '{:?}' has no address; falling back to the customer's address", + pi.customer.map(|x| x.id()) + ); + } + }; - let tax_platform_id = - anrok::transaction_id_stripe_pi(&payment_intent_id); - - // Note: if the tax amount that was charged to the customer is *different* than - // what it *should* be NOW, we will take on a loss here. - - let should_have_collected = anrok_client - .create_or_update_txn(&anrok::Transaction { - id: tax_platform_id.clone(), - fields: anrok::TransactionFields { - customer_address: - anrok::Address::from_stripe_address( - &customer_address, - ), - currency_code: c.currency_code.clone(), - accounting_time: c.due, - accounting_time_zone: - anrok::AccountingTimeZone::Utc, - line_items: vec![ - anrok::LineItem::new_including_tax_amount( - tax_id.tax_processor_id, - c.tax_amount + c.amount, - ), - ], - }, - }) + let stripe_customer_id = + DBUser::get_id(c.user_id, &mut **txn, redis) .await? - .tax_amount_to_collect; + .ok_or_else(|| { + ApiError::from(DatabaseError::Database( + sqlx::Error::RowNotFound, + )) + }) + .and_then(|user| { + user.stripe_customer_id.ok_or_else(|| { + ApiError::InvalidInput( + "User has no Stripe customer ID".to_owned(), + ) + }) + })?; - let drift = should_have_collected - c.tax_amount; + let customer_id = stripe_customer_id.parse().map_err(|e| { + ApiError::InvalidInput(format!( + "Charge's Stripe customer ID was invalid ({e})" + )) + })?; - c.tax_drift_loss = Some(drift); - c.tax_platform_id = Some(tax_platform_id); - c.upsert(&mut txn).await?; - } + let customer = + stripe::Customer::retrieve(stripe_client, &customer_id, &[]) + .await?; - txn.commit().await?; + let address = customer.address.ok_or_else(|| { + ApiError::InvalidInput( + format!("Could not find any address for Stripe customer of user '{}'", to_base62(c.user_id.0 as u64)) + ) + })?; - if processed_charges >= limit { - break Ok(()); - } - } - } + (address, tax_platform_id) + }; - anrok_api_operations( - pool.clone(), - redis.clone(), - stripe_client.clone(), - anrok_client.clone(), - ) - .await; + let tax_id = DBProductsTaxIdentifier::get_price(c.price_id, &mut **txn) + .await? + .ok_or_else(|| DatabaseError::Database(sqlx::Error::RowNotFound))?; - let res = async { - info!("Gathering charges to unprovision"); + // Note: if the tax amount that was charged to the customer is *different* than + // what it *should* be NOW, we will take on a loss here. - let mut transaction = pool.begin().await?; - let mut clear_cache_users = Vec::new(); - - // If an active subscription has: - // - A canceled charge due now - // - An expiring charge due now - // - A failed charge more than two days ago - // It should be unprovisioned. - let all_charges = DBCharge::get_unprovision(&pool).await?; - - let mut all_subscriptions = - user_subscription_item::DBUserSubscription::get_many( - &all_charges - .iter() - .filter_map(|x| x.subscription_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - ) - .await?; - let subscription_prices = product_item::DBProductPrice::get_many( - &all_subscriptions - .iter() - .map(|x| x.price_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - ) - .await?; - let subscription_products = product_item::DBProduct::get_many( - &subscription_prices - .iter() - .map(|x| x.product_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - ) - .await?; - let users = DBUser::get_many_ids( - &all_subscriptions - .iter() - .map(|x| x.user_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, - &redis, - ) - .await?; + let should_have_collected = anrok_client + .create_or_update_txn(&anrok::Transaction { + id: tax_platform_id.clone(), + fields: anrok::TransactionFields { + customer_address: anrok::Address::from_stripe_address( + &customer_address, + ), + currency_code: c.currency_code.clone(), + accounting_time: c.due, + accounting_time_zone: anrok::AccountingTimeZone::Utc, + line_items: vec![ + anrok::LineItem::new_including_tax_amount( + tax_id.tax_processor_id, + c.tax_amount + c.amount, + ), + ], + }, + }) + .await? + .tax_amount_to_collect; - for charge in all_charges { - info!("Indexing charge '{}'", to_base62(charge.id.0 as u64)); + let drift = should_have_collected - c.tax_amount; - let Some(subscription) = all_subscriptions - .iter_mut() - .find(|x| Some(x.id) == charge.subscription_id) - else { - continue; - }; + c.tax_drift_loss = Some(drift); + c.tax_platform_id = Some(tax_platform_id); + c.upsert(txn).await?; - if subscription.status == SubscriptionStatus::Unprovisioned { - continue; - } + Ok(()) + } - let Some(product_price) = subscription_prices - .iter() - .find(|x| x.id == subscription.price_id) - else { - continue; - }; + let mut interval = tokio::time::interval(std::time::Duration::from_secs(1)); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - let Some(product) = subscription_products - .iter() - .find(|x| x.id == product_price.product_id) - else { - continue; - }; + let mut processed_charges = 0; - let Some(user) = - users.iter().find(|x| x.id == subscription.user_id) - else { - continue; - }; + loop { + interval.tick().await; - let unprovisioned = match product.metadata { - ProductMetadata::Midas => { - let badges = user.badges - Badges::MIDAS; - - sqlx::query!( - " - UPDATE users - SET badges = $1 - WHERE (id = $2) - ", - badges.bits() as i64, - user.id as DBUserId, - ) - .execute(&mut *transaction) - .await?; + let mut txn = pg.begin().await?; - true - } + let charges = + DBCharge::get_missing_tax_identifier_lock(&mut *txn, 6).await?; - ProductMetadata::Pyro { .. } - | ProductMetadata::Medal { .. } => 'server: { - let server_id = match &subscription.metadata { - Some(SubscriptionMetadata::Pyro { id, region: _ }) => { - id - } - Some(SubscriptionMetadata::Medal { id }) => id, - _ => break 'server true, - }; + if charges.is_empty() { + info!("No more charges to process"); + break Ok(()); + } - let res = reqwest::Client::new() - .post(format!( - "{}/modrinth/v0/servers/{}/suspend", - dotenvy::var("ARCHON_URL")?, - server_id - )) - .header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?) - .json(&serde_json::json!({ - "reason": if charge.status == ChargeStatus::Cancelled || charge.status == ChargeStatus::Expiring { - "cancelled" - } else { - "paymentfailed" - } - })) - .send() - .await; - - if let Err(e) = res { - warn!("Error suspending pyro server: {:?}", e); - false - } else { - true - } - } - }; + for c in charges { + processed_charges += 1; - if unprovisioned { - subscription.status = SubscriptionStatus::Unprovisioned; - subscription.upsert(&mut transaction).await?; - } + let charge_id = to_base62(c.id.0 as u64); + let user_id = to_base62(c.user_id.0 as u64); - clear_cache_users.push(user.id); - } + let result = + process_charge(stripe_client, &mut txn, redis, anrok_client, c) + .await; - crate::database::models::DBUser::clear_caches( - &clear_cache_users - .into_iter() - .map(|x| (x, None)) - .collect::>(), - &redis, - ) - .await?; - transaction.commit().await?; - - // If an offer redeemal has been processing for over 5 minutes, it should be set pending. - UserRedeemal::update_stuck_5_minutes(&pool).await?; - - // If an offer redeemal is pending, try processing it. - // Try processing it. - let pending_redeemals = UserRedeemal::get_pending(&pool, 100).await?; - for redeemal in pending_redeemals { - if let Err(error) = - try_process_user_redeemal(&pool, &redis, redeemal).await - { - warn!(%error, "Failed to process a redeemal.") + if let Err(e) = result { + warn!( + "Error processing charge '{charge_id}' for user '{user_id}': {e}" + ); } } - Ok::<(), ApiError>(()) - }; + txn.commit().await?; - if let Err(e) = res.await { - warn!("Error indexing subscriptions: {:?}", e); + if processed_charges >= limit { + break Ok(()); + } } - - info!("Done indexing subscriptions"); } /// Attempts to process a user redeemal. @@ -832,141 +629,393 @@ pub async fn try_process_user_redeemal( Ok(()) } -pub async fn index_billing( - stripe_client: stripe::Client, - anrok_client: anrok::Client, - pool: PgPool, - redis: RedisPool, -) { - info!("Indexing billing queue"); - let res = async { - // If a charge has continuously failed for more than a month, it should be cancelled - let charges_to_cancel = DBCharge::get_cancellable(&pool).await?; +pub async fn cancel_failing_charges(pool: &PgPool) -> Result<(), ApiError> { + let charges_to_cancel = DBCharge::get_cancellable(pool).await?; - for mut charge in charges_to_cancel { - charge.status = ChargeStatus::Cancelled; + for mut charge in charges_to_cancel { + charge.status = ChargeStatus::Cancelled; - let mut transaction = pool.begin().await?; - charge.upsert(&mut transaction).await?; - transaction.commit().await?; - } + let mut transaction = pool.begin().await?; + charge.upsert(&mut transaction).await?; + transaction.commit().await?; + } - // If a charge is open and due or has been attempted more than two days ago, it should be processed - let charges_to_do = DBCharge::get_chargeable(&pool).await?; + Ok(()) +} - let prices = product_item::DBProductPrice::get_many( - &charges_to_do - .iter() - .map(|x| x.price_id) - .collect::>() - .into_iter() - .collect::>(), - &pool, +pub async fn process_chargeable_charges( + pool: &PgPool, + redis: &RedisPool, + stripe_client: &stripe::Client, + anrok_client: &anrok::Client, +) -> Result<(), ApiError> { + let charges_to_do = DBCharge::get_chargeable(pool).await?; + + let prices = product_item::DBProductPrice::get_many( + &charges_to_do + .iter() + .map(|x| x.price_id) + .collect::>() + .into_iter() + .collect::>(), + pool, + ) + .await?; + + let users = crate::database::models::DBUser::get_many_ids( + &charges_to_do + .iter() + .map(|x| x.user_id) + .collect::>() + .into_iter() + .collect::>(), + pool, + redis, + ) + .await?; + + for mut charge in charges_to_do { + let Some(product_price) = + prices.iter().find(|x| x.id == charge.price_id) + else { + continue; + }; + + let Some(user) = users.iter().find(|x| x.id == charge.user_id) else { + continue; + }; + + let Ok(currency) = + Currency::from_str(&product_price.currency_code.to_lowercase()) + else { + warn!( + "Could not find currency for {}", + product_price.currency_code + ); + continue; + }; + + let user = User::from_full(user.clone()); + + let result = create_or_update_payment_intent( + pool, + redis, + stripe_client, + anrok_client, + PaymentBootstrapOptions { + user: &user, + payment_intent: None, + payment_session: PaymentSession::AutomatedRenewal, + attached_charge: AttachedCharge::UseExisting { + charge: charge.clone(), + }, + currency: CurrencyMode::Set(currency), + attach_payment_metadata: None, + }, ) - .await?; + .await; + + charge.status = ChargeStatus::Processing; + charge.last_attempt = Some(Utc::now()); + + let mut failure = false; + + match result { + Ok(PaymentBootstrapResults { + new_payment_intent, + payment_method: _, + price_id: _, + subtotal, + tax, + }) => { + if new_payment_intent.is_some() { + // The PI will automatically be confirmed + charge.amount = subtotal; + charge.tax_amount = tax; + charge.payment_platform = PaymentPlatform::Stripe; + } else { + error!( + "Payment bootstrap succeeded but no payment intent was created" + ); + failure = true; + } + } + + Err(error) => { + error!(%error, "Failed to bootstrap payment for renewal"); + failure = true; + } + }; + + if failure { + charge.status = ChargeStatus::Failed; + } + + let mut transaction = pool.begin().await?; + charge.upsert(&mut transaction).await?; + transaction.commit().await?; + } - let users = crate::database::models::DBUser::get_many_ids( - &charges_to_do + Ok(()) +} + +async fn unprovision_subscriptions( + pool: &PgPool, + redis: &RedisPool, +) -> Result<(), ApiError> { + info!("Gathering charges to unprovision"); + + let mut transaction = pool.begin().await?; + let mut clear_cache_users = Vec::new(); + + // If an active subscription has: + // - A canceled charge due now + // - An expiring charge due now + // - A failed charge more than two days ago + // It should be unprovisioned + let all_charges = DBCharge::get_unprovision(pool).await?; + + let mut all_subscriptions = + user_subscription_item::DBUserSubscription::get_many( + &all_charges .iter() - .map(|x| x.user_id) + .filter_map(|x| x.subscription_id) .collect::>() .into_iter() .collect::>(), - &pool, - &redis, + pool, ) .await?; + let subscription_prices = product_item::DBProductPrice::get_many( + &all_subscriptions + .iter() + .map(|x| x.price_id) + .collect::>() + .into_iter() + .collect::>(), + pool, + ) + .await?; + let subscription_products = product_item::DBProduct::get_many( + &subscription_prices + .iter() + .map(|x| x.product_id) + .collect::>() + .into_iter() + .collect::>(), + pool, + ) + .await?; + let users = DBUser::get_many_ids( + &all_subscriptions + .iter() + .map(|x| x.user_id) + .collect::>() + .into_iter() + .collect::>(), + pool, + redis, + ) + .await?; - for mut charge in charges_to_do { - let Some(product_price) = - prices.iter().find(|x| x.id == charge.price_id) - else { - continue; - }; + for charge in all_charges { + debug!("Unprovisioning charge '{}'", to_base62(charge.id.0 as u64)); - let Some(user) = users.iter().find(|x| x.id == charge.user_id) - else { - continue; - }; + let Some(subscription) = all_subscriptions + .iter_mut() + .find(|x| Some(x.id) == charge.subscription_id) + else { + continue; + }; - let Ok(currency) = - Currency::from_str(&product_price.currency_code.to_lowercase()) - else { - warn!( - "Could not find currency for {}", - product_price.currency_code - ); - continue; - }; + if subscription.status == SubscriptionStatus::Unprovisioned { + continue; + } - let user = User::from_full(user.clone()); - - let result = create_or_update_payment_intent( - &pool, - &redis, - &stripe_client, - &anrok_client, - PaymentBootstrapOptions { - user: &user, - payment_intent: None, - payment_session: PaymentSession::AutomatedRenewal, - attached_charge: AttachedCharge::UseExisting { - charge: charge.clone(), - }, - currency: CurrencyMode::Set( - currency, - ), - attach_payment_metadata: None, - }, - ) - .await; - - charge.status = ChargeStatus::Processing; - charge.last_attempt = Some(Utc::now()); - - let mut failure = false; - - match result { - Ok(PaymentBootstrapResults { - new_payment_intent, - payment_method: _, - price_id: _, - subtotal, - tax, - }) => { - if new_payment_intent.is_some() { - // The PI will automatically be confirmed - charge.amount = subtotal; - charge.tax_amount = tax; - charge.payment_platform = PaymentPlatform::Stripe; - } else { - error!("Payment bootstrap succeeded but no payment intent was created"); - failure = true; - } - } + let Some(product_price) = subscription_prices + .iter() + .find(|x| x.id == subscription.price_id) + else { + continue; + }; + + let Some(product) = subscription_products + .iter() + .find(|x| x.id == product_price.product_id) + else { + continue; + }; + + let Some(user) = users.iter().find(|x| x.id == subscription.user_id) + else { + continue; + }; + + let unprovisioned = match product.metadata { + ProductMetadata::Midas => { + let badges = user.badges - Badges::MIDAS; + + sqlx::query!( + " + UPDATE users + SET badges = $1 + WHERE (id = $2) + ", + badges.bits() as i64, + user.id as DBUserId, + ) + .execute(&mut *transaction) + .await?; + + true + } - Err(error) => { - error!(%error, "Failed to bootstrap payment for renewal"); - failure = true; - } - }; + ProductMetadata::Pyro { .. } + | ProductMetadata::Medal { .. } => 'server: { + let server_id = match &subscription.metadata { + Some(SubscriptionMetadata::Pyro { id, region: _ }) => id, + Some(SubscriptionMetadata::Medal { id }) => id, + _ => break 'server true, + }; - if failure { - charge.status = ChargeStatus::Failed; + let res = reqwest::Client::new() + .post(format!( + "{}/modrinth/v0/servers/{}/suspend", + dotenvy::var("ARCHON_URL")?, + server_id + )) + .header("X-Master-Key", dotenvy::var("PYRO_API_KEY")?) + .json(&serde_json::json!({ + "reason": if charge.status == ChargeStatus::Cancelled || charge.status == ChargeStatus::Expiring { + "cancelled" + } else { + "paymentfailed" + } + })) + .send() + .await; + + if let Err(e) = res { + warn!("Error suspending pyro server: {:?}", e); + false + } else { + true + } } + }; - let mut transaction = pool.begin().await?; - charge.upsert(&mut transaction).await?; - transaction.commit().await?; + if unprovisioned { + subscription.status = SubscriptionStatus::Unprovisioned; + subscription.upsert(&mut transaction).await?; } - Ok::<(), ApiError>(()) + clear_cache_users.push(user.id); } - .await; - if let Err(e) = res { - warn!("Error indexing billing queue: {:?}", e); + crate::database::models::DBUser::clear_caches( + &clear_cache_users + .into_iter() + .map(|x| (x, None)) + .collect::>(), + redis, + ) + .await?; + transaction.commit().await?; + + Ok(()) +} + +async fn process_redeemals( + pool: &PgPool, + redis: &RedisPool, +) -> Result<(), ApiError> { + // If an offer redeemal has been processing for over 5 minutes, it should be set pending. + UserRedeemal::update_stuck_5_minutes(pool).await?; + + // If an offer redeemal is pending, try processing it. + // Try processing it. + let pending_redeemals = UserRedeemal::get_pending(pool, 100).await?; + for redeemal in pending_redeemals { + if let Err(error) = + try_process_user_redeemal(pool, redis, redeemal).await + { + warn!(%error, "Failed to process a redeemal.") + } } + Ok(()) +} + +pub async fn index_billing( + stripe_client: stripe::Client, + anrok_client: anrok::Client, + pool: PgPool, + redis: RedisPool, +) { + info!("Indexing billing queue"); + + run_and_time("cancel_failing_charges", cancel_failing_charges(&pool)).await; + + run_and_time( + "process_chargeable_charges", + process_chargeable_charges( + &pool, + &redis, + &stripe_client, + &anrok_client, + ), + ) + .await; + info!("Done indexing billing queue"); } + +pub async fn index_subscriptions( + pool: PgPool, + redis: RedisPool, + stripe_client: stripe::Client, + anrok_client: anrok::Client, +) { + info!("Indexing subscriptions"); + + run_and_time( + "update_anrok_transactions", + update_anrok_transactions( + &pool, + &redis, + &anrok_client, + &stripe_client, + 250, + ), + ) + .await; + + run_and_time( + "update_tax_amounts", + update_tax_amounts(&pool, &redis, &anrok_client, &stripe_client, 100), + ) + .await; + + run_and_time("process_redeemals", process_redeemals(&pool, &redis)).await; + + run_and_time( + "unprovision_subscriptions", + unprovision_subscriptions(&pool, &redis), + ) + .await; + + info!("Done indexing subscriptions"); +} + +async fn run_and_time(name: &'static str, fut: F) +where + F: Future>, +{ + let then = Instant::now(); + + if let Err(error) = fut.await { + error!("Error in '{name}': {error}"); + } + + info!("Finished '{name}' in {}ms", then.elapsed().as_millis()); +}