diff --git a/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json b/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json deleted file mode 100644 index 75aa8478d2..0000000000 --- a/apps/labrinth/.sqlx/query-1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114.json +++ /dev/null @@ -1,16 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n COALESCE(SUM(pv.amount), 0.0) sum\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n ", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Int8Array", - "Int8Array", - "TimestamptzArray" - ] - }, - "nullable": [] - }, - "hash": "1dacc8ebab576d595a1a5b6a44f1c00ed9709a76cc3a65f48e710d4e73129114" -} diff --git a/apps/labrinth/.sqlx/query-7910f8c982de4c9c8f3b40cecda741d6d43c3084b7f347e5da31c353cc9bae53.json b/apps/labrinth/.sqlx/query-7910f8c982de4c9c8f3b40cecda741d6d43c3084b7f347e5da31c353cc9bae53.json new file mode 100644 index 0000000000..d245c5434b --- /dev/null +++ b/apps/labrinth/.sqlx/query-7910f8c982de4c9c8f3b40cecda741d6d43c3084b7f347e5da31c353cc9bae53.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "\n WITH\n period_payouts AS (\n SELECT\n ids.notification_id,\n ids.user_id,\n ids.date_available,\n FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents\n FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)\n LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available\n GROUP BY ids.user_id, ids.notification_id, ids.date_available\n )\n INSERT INTO notifications (\n id, user_id, body\n )\n SELECT\n notification_id id,\n user_id,\n JSONB_BUILD_OBJECT(\n 'type', 'payout_available',\n 'date_available', to_jsonb(date_available),\n 'amount', to_jsonb(sum)\n ) body\n FROM period_payouts\n WHERE sum > 0\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Int8Array", + "Int8Array", + "TimestamptzArray" + ] + }, + "nullable": [] + }, + "hash": "7910f8c982de4c9c8f3b40cecda741d6d43c3084b7f347e5da31c353cc9bae53" +} diff --git a/apps/labrinth/.sqlx/query-6ca75db4bd4260c888a8f30fa4da6ac919a7bedf6f07d9d66a9793bf0f7171dd.json b/apps/labrinth/.sqlx/query-9f0c73fabe99d9891faaebdd3518b362437dcdcef9cd9a68b950fba61218bb4d.json similarity index 84% rename from apps/labrinth/.sqlx/query-6ca75db4bd4260c888a8f30fa4da6ac919a7bedf6f07d9d66a9793bf0f7171dd.json rename to apps/labrinth/.sqlx/query-9f0c73fabe99d9891faaebdd3518b362437dcdcef9cd9a68b950fba61218bb4d.json index 2ac429c9bf..f5dc4759fb 100644 --- a/apps/labrinth/.sqlx/query-6ca75db4bd4260c888a8f30fa4da6ac919a7bedf6f07d9d66a9793bf0f7171dd.json +++ b/apps/labrinth/.sqlx/query-9f0c73fabe99d9891faaebdd3518b362437dcdcef9cd9a68b950fba61218bb4d.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n WHERE\n charge_type = $1 AND\n (\n (status = 'cancelled' AND due < NOW()) OR\n (status = 'expiring' AND due < NOW()) OR\n (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')\n )\n ", + "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n INNER JOIN users_subscriptions us ON us.id = charges.subscription_id\n WHERE\n charges.charge_type = $1 AND\n (\n (charges.status = 'cancelled' AND charges.due < NOW()) OR\n (charges.status = 'expiring' AND charges.due < NOW()) OR\n (charges.status = 'failed' AND charges.last_attempt < NOW() - INTERVAL '2 days')\n )\n AND us.status = 'provisioned'\n ", "describe": { "columns": [ { @@ -126,5 +126,5 @@ true ] }, - "hash": "6ca75db4bd4260c888a8f30fa4da6ac919a7bedf6f07d9d66a9793bf0f7171dd" + "hash": "9f0c73fabe99d9891faaebdd3518b362437dcdcef9cd9a68b950fba61218bb4d" } diff --git a/apps/labrinth/.sqlx/query-f3f819d5761dcd562d13f98826fd2827b25fbdad4898100cf242b21ce8e9713a.json b/apps/labrinth/.sqlx/query-cd18ae8abe81a159a134923957f4cd6d0ba3b1bcbc89df349cf7b5b1897603b8.json similarity index 91% rename from apps/labrinth/.sqlx/query-f3f819d5761dcd562d13f98826fd2827b25fbdad4898100cf242b21ce8e9713a.json rename to apps/labrinth/.sqlx/query-cd18ae8abe81a159a134923957f4cd6d0ba3b1bcbc89df349cf7b5b1897603b8.json index f958998859..458f19f968 100644 --- a/apps/labrinth/.sqlx/query-f3f819d5761dcd562d13f98826fd2827b25fbdad4898100cf242b21ce8e9713a.json +++ b/apps/labrinth/.sqlx/query-cd18ae8abe81a159a134923957f4cd6d0ba3b1bcbc89df349cf7b5b1897603b8.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tINNER JOIN users u ON u.id = charges.user_id\n\t\t\tWHERE\n\t\t\t status = 'open'\n\t\t\t AND COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) < NOW() - INTERVAL '1 day'\n\t\t\t AND u.email IS NOT NULL\n\t\t\t AND due - INTERVAL '7 days' > NOW()\n\t\t\tORDER BY COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n\t\t\tLIMIT $1\n\t\t\t", + "query": "\n SELECT\n charges.id, charges.user_id, charges.price_id, charges.amount, charges.currency_code, charges.status, charges.due, charges.last_attempt,\n charges.charge_type, charges.subscription_id, charges.tax_amount, charges.tax_platform_id,\n -- Workaround for https://github.com/launchbadge/sqlx/issues/3336\n charges.subscription_interval AS \"subscription_interval?\",\n charges.payment_platform,\n charges.payment_platform_id AS \"payment_platform_id?\",\n charges.parent_charge_id AS \"parent_charge_id?\",\n charges.net AS \"net?\",\n\t\t\t\tcharges.tax_last_updated AS \"tax_last_updated?\",\n\t\t\t\tcharges.tax_drift_loss AS \"tax_drift_loss?\"\n FROM charges\n \n\t\t\tINNER JOIN users u ON u.id = charges.user_id\n\t\t\tWHERE\n\t\t\t status = 'open'\n\t\t\t AND COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) < NOW() - INTERVAL '1 day'\n\t\t\t AND u.email IS NOT NULL\n\t\t\t AND due - INTERVAL '7 days' > NOW()\n AND due - INTERVAL '14 days' < NOW() -- Due between 7 and 14 days from now\n\t\t\tORDER BY COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) ASC\n\t\t\tFOR NO KEY UPDATE SKIP LOCKED\n\t\t\tLIMIT $1\n\t\t\t", "describe": { "columns": [ { @@ -126,5 +126,5 @@ true ] }, - "hash": "f3f819d5761dcd562d13f98826fd2827b25fbdad4898100cf242b21ce8e9713a" + "hash": "cd18ae8abe81a159a134923957f4cd6d0ba3b1bcbc89df349cf7b5b1897603b8" } diff --git a/apps/labrinth/src/database/models/charge_item.rs b/apps/labrinth/src/database/models/charge_item.rs index 1ade9d375d..b619e3addc 100644 --- a/apps/labrinth/src/database/models/charge_item.rs +++ b/apps/labrinth/src/database/models/charge_item.rs @@ -260,13 +260,15 @@ impl DBCharge { let charge_type = ChargeType::Subscription.as_str(); let res = select_charges_with_predicate!( r#" + INNER JOIN users_subscriptions us ON us.id = charges.subscription_id WHERE - charge_type = $1 AND + charges.charge_type = $1 AND ( - (status = 'cancelled' AND due < NOW()) OR - (status = 'expiring' AND due < NOW()) OR - (status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days') + (charges.status = 'cancelled' AND charges.due < NOW()) OR + (charges.status = 'expiring' AND charges.due < NOW()) OR + (charges.status = 'failed' AND charges.last_attempt < NOW() - INTERVAL '2 days') ) + AND us.status = 'provisioned' "#, charge_type ) @@ -321,6 +323,7 @@ impl DBCharge { AND COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) < NOW() - INTERVAL '1 day' AND u.email IS NOT NULL AND due - INTERVAL '7 days' > NOW() + AND due - INTERVAL '14 days' < NOW() -- Due between 7 and 14 days from now ORDER BY COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) ASC FOR NO KEY UPDATE SKIP LOCKED LIMIT $1 diff --git a/apps/labrinth/src/database/models/notification_item.rs b/apps/labrinth/src/database/models/notification_item.rs index deabbdf0d4..aad147844f 100644 --- a/apps/labrinth/src/database/models/notification_item.rs +++ b/apps/labrinth/src/database/models/notification_item.rs @@ -64,7 +64,7 @@ impl NotificationBuilder { ids.notification_id, ids.user_id, ids.date_available, - COALESCE(SUM(pv.amount), 0.0) sum + FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available) LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available GROUP BY ids.user_id, ids.notification_id, ids.date_available @@ -81,6 +81,7 @@ impl NotificationBuilder { 'amount', to_jsonb(sum) ) body FROM period_payouts + WHERE sum > 0 ", ¬ification_ids[..], &users_raw_ids[..], diff --git a/apps/labrinth/src/database/models/notifications_template_item.rs b/apps/labrinth/src/database/models/notifications_template_item.rs index 563b465822..c4687dfae1 100644 --- a/apps/labrinth/src/database/models/notifications_template_item.rs +++ b/apps/labrinth/src/database/models/notifications_template_item.rs @@ -7,7 +7,8 @@ const TEMPLATES_NAMESPACE: &str = "notifications_templates"; const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data"; const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes -#[derive(Clone, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] + pub struct NotificationTemplate { pub id: i64, pub channel: NotificationChannel, diff --git a/apps/labrinth/src/models/v2/notifications.rs b/apps/labrinth/src/models/v2/notifications.rs index 56bf0d7f76..6ed5301778 100644 --- a/apps/labrinth/src/models/v2/notifications.rs +++ b/apps/labrinth/src/models/v2/notifications.rs @@ -136,7 +136,7 @@ pub enum LegacyNotificationBody { new_owner_organization_id: Option, }, PayoutAvailable { - amount: f64, + amount: u64, date_available: DateTime, }, Unknown, diff --git a/apps/labrinth/src/models/v3/notifications.rs b/apps/labrinth/src/models/v3/notifications.rs index 19b3040951..137f158543 100644 --- a/apps/labrinth/src/models/v3/notifications.rs +++ b/apps/labrinth/src/models/v3/notifications.rs @@ -27,7 +27,7 @@ pub struct Notification { pub actions: Vec, } -#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] +#[derive(Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq)] #[serde(rename_all = "snake_case")] pub enum NotificationType { // If adding a notification type, add a variant in `NotificationBody` of the same name! @@ -114,6 +114,17 @@ impl NotificationType { "email_changed" => NotificationType::EmailChanged, "payment_failed" => NotificationType::PaymentFailed, "tax_notification" => NotificationType::TaxNotification, + "payout_available" => NotificationType::PayoutAvailable, + "moderation_message_received" => { + NotificationType::ModerationMessageReceived + } + "report_status_updated" => NotificationType::ReportStatusUpdated, + "report_submitted" => NotificationType::ReportSubmitted, + "project_status_approved" => { + NotificationType::ProjectStatusApproved + } + "project_status_neutral" => NotificationType::ProjectStatusNeutral, + "project_transferred" => NotificationType::ProjectTransferred, "unknown" => NotificationType::Unknown, _ => NotificationType::Unknown, } @@ -223,7 +234,7 @@ pub enum NotificationBody { }, PayoutAvailable { date_available: DateTime, - amount: f64, + amount: u64, }, Unknown, } diff --git a/apps/labrinth/src/queue/billing.rs b/apps/labrinth/src/queue/billing.rs index 4cff820c05..8200ecf0d6 100644 --- a/apps/labrinth/src/queue/billing.rs +++ b/apps/labrinth/src/queue/billing.rs @@ -343,6 +343,8 @@ pub async fn index_subscriptions( } for mut c in charges { + processed_charges += 1; + let payment_intent_id = c .payment_platform_id .as_deref() @@ -465,8 +467,6 @@ pub async fn index_subscriptions( c.tax_drift_loss = Some(drift); c.tax_platform_id = Some(tax_platform_id); c.upsert(&mut txn).await?; - - processed_charges += 1; } txn.commit().await?; @@ -477,14 +477,17 @@ pub async fn index_subscriptions( } } - let tax_charges_index_handle = tokio::spawn(anrok_api_operations( + anrok_api_operations( pool.clone(), redis.clone(), stripe_client.clone(), anrok_client.clone(), - )); + ) + .await; let res = async { + info!("Gathering charges to unprovision"); + let mut transaction = pool.begin().await?; let mut clear_cache_users = Vec::new(); @@ -539,6 +542,8 @@ pub async fn index_subscriptions( .await?; for charge in all_charges { + info!("Indexing charge '{}'", to_base62(charge.id.0 as u64)); + let Some(subscription) = all_subscriptions .iter_mut() .find(|x| Some(x.id) == charge.subscription_id) @@ -664,12 +669,6 @@ pub async fn index_subscriptions( warn!("Error indexing subscriptions: {:?}", e); } - if let Err(error) = tax_charges_index_handle.await - && error.is_panic() - { - std::panic::resume_unwind(error.into_panic()); - } - info!("Done indexing subscriptions"); } diff --git a/apps/labrinth/src/queue/email.rs b/apps/labrinth/src/queue/email.rs index 5336cf289f..0259918af3 100644 --- a/apps/labrinth/src/queue/email.rs +++ b/apps/labrinth/src/queue/email.rs @@ -236,7 +236,7 @@ impl EmailQueue { let update_next_attempt = status == NotificationDeliveryStatus::Pending; - let mut delivery = deliveries.remove(idx); + let mut delivery = deliveries.swap_remove(idx); delivery.status = status; delivery.next_attempt += if update_next_attempt { chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS) diff --git a/apps/labrinth/src/queue/email/templates.rs b/apps/labrinth/src/queue/email/templates.rs index c29078d6ec..02a9907807 100644 --- a/apps/labrinth/src/queue/email/templates.rs +++ b/apps/labrinth/src/queue/email/templates.rs @@ -559,7 +559,7 @@ async fn collect_template_variables( map.insert( PAYOUTAVAILABLE_AMOUNT, - format!("USD${:.2}", (amount * 100.0) as i64), + format!("USD${:.2}", *amount as f64 / 100.0), ); Ok(map) diff --git a/apps/labrinth/src/queue/payouts.rs b/apps/labrinth/src/queue/payouts.rs index 86704e12f3..88feaaea28 100644 --- a/apps/labrinth/src/queue/payouts.rs +++ b/apps/labrinth/src/queue/payouts.rs @@ -24,7 +24,7 @@ use sqlx::PgPool; use sqlx::postgres::PgQueryResult; use std::collections::HashMap; use tokio::sync::RwLock; -use tracing::error; +use tracing::{error, info}; pub struct PayoutsQueue { credential: RwLock>, @@ -1091,6 +1091,8 @@ pub async fn index_payouts_notifications( pool: &PgPool, redis: &RedisPool, ) -> Result<(), ApiError> { + info!("Updating payout notifications"); + let mut transaction = pool.begin().await?; payouts_values_notifications::synchronize_future_payout_values(