Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 7 additions & 4 deletions apps/labrinth/src/database/models/charge_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,13 +260,15 @@ impl DBCharge {
let charge_type = ChargeType::Subscription.as_str();
let res = select_charges_with_predicate!(
r#"
INNER JOIN users_subscriptions us ON us.id = charges.subscription_id
WHERE
charge_type = $1 AND
charges.charge_type = $1 AND
(
(status = 'cancelled' AND due < NOW()) OR
(status = 'expiring' AND due < NOW()) OR
(status = 'failed' AND last_attempt < NOW() - INTERVAL '2 days')
(charges.status = 'cancelled' AND charges.due < NOW()) OR
(charges.status = 'expiring' AND charges.due < NOW()) OR
(charges.status = 'failed' AND charges.last_attempt < NOW() - INTERVAL '2 days')
)
AND us.status = 'provisioned'
"#,
charge_type
)
Expand Down Expand Up @@ -321,6 +323,7 @@ impl DBCharge {
AND COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) < NOW() - INTERVAL '1 day'
AND u.email IS NOT NULL
AND due - INTERVAL '7 days' > NOW()
AND due - INTERVAL '14 days' < NOW() -- Due between 7 and 14 days from now
ORDER BY COALESCE(tax_last_updated, '-infinity' :: TIMESTAMPTZ) ASC
FOR NO KEY UPDATE SKIP LOCKED
LIMIT $1
Expand Down
3 changes: 2 additions & 1 deletion apps/labrinth/src/database/models/notification_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ impl NotificationBuilder {
ids.notification_id,
ids.user_id,
ids.date_available,
COALESCE(SUM(pv.amount), 0.0) sum
FLOOR(COALESCE(SUM(pv.amount), 0.0) * 100) :: BIGINT sum -- Convert to cents
FROM UNNEST($1::bigint[], $2::bigint[], $3::timestamptz[]) AS ids(notification_id, user_id, date_available)
LEFT JOIN payouts_values pv ON pv.user_id = ids.user_id AND pv.date_available = ids.date_available
GROUP BY ids.user_id, ids.notification_id, ids.date_available
Expand All @@ -81,6 +81,7 @@ impl NotificationBuilder {
'amount', to_jsonb(sum)
) body
FROM period_payouts
WHERE sum > 0
",
&notification_ids[..],
&users_raw_ids[..],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ const TEMPLATES_NAMESPACE: &str = "notifications_templates";
const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data";
const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes

#[derive(Clone, Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct NotificationTemplate {
pub id: i64,
pub channel: NotificationChannel,
Expand Down
2 changes: 1 addition & 1 deletion apps/labrinth/src/models/v2/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub enum LegacyNotificationBody {
new_owner_organization_id: Option<OrganizationId>,
},
PayoutAvailable {
amount: f64,
amount: u64,
date_available: DateTime<Utc>,
},
Unknown,
Expand Down
15 changes: 13 additions & 2 deletions apps/labrinth/src/models/v3/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ pub struct Notification {
pub actions: Vec<NotificationAction>,
}

#[derive(Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
#[derive(Debug, Serialize, Deserialize, Clone, Copy, Eq, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum NotificationType {
// If adding a notification type, add a variant in `NotificationBody` of the same name!
Expand Down Expand Up @@ -114,6 +114,17 @@ impl NotificationType {
"email_changed" => NotificationType::EmailChanged,
"payment_failed" => NotificationType::PaymentFailed,
"tax_notification" => NotificationType::TaxNotification,
"payout_available" => NotificationType::PayoutAvailable,
"moderation_message_received" => {
NotificationType::ModerationMessageReceived
}
"report_status_updated" => NotificationType::ReportStatusUpdated,
"report_submitted" => NotificationType::ReportSubmitted,
"project_status_approved" => {
NotificationType::ProjectStatusApproved
}
"project_status_neutral" => NotificationType::ProjectStatusNeutral,
"project_transferred" => NotificationType::ProjectTransferred,
"unknown" => NotificationType::Unknown,
_ => NotificationType::Unknown,
}
Expand Down Expand Up @@ -223,7 +234,7 @@ pub enum NotificationBody {
},
PayoutAvailable {
date_available: DateTime<Utc>,
amount: f64,
amount: u64,
},
Unknown,
}
Expand Down
19 changes: 9 additions & 10 deletions apps/labrinth/src/queue/billing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,8 @@ pub async fn index_subscriptions(
}

for mut c in charges {
processed_charges += 1;

let payment_intent_id = c
.payment_platform_id
.as_deref()
Expand Down Expand Up @@ -465,8 +467,6 @@ pub async fn index_subscriptions(
c.tax_drift_loss = Some(drift);
c.tax_platform_id = Some(tax_platform_id);
c.upsert(&mut txn).await?;

processed_charges += 1;
}

txn.commit().await?;
Expand All @@ -477,14 +477,17 @@ pub async fn index_subscriptions(
}
}

let tax_charges_index_handle = tokio::spawn(anrok_api_operations(
anrok_api_operations(
pool.clone(),
redis.clone(),
stripe_client.clone(),
anrok_client.clone(),
));
)
.await;

let res = async {
info!("Gathering charges to unprovision");

let mut transaction = pool.begin().await?;
let mut clear_cache_users = Vec::new();

Expand Down Expand Up @@ -539,6 +542,8 @@ pub async fn index_subscriptions(
.await?;

for charge in all_charges {
info!("Indexing charge '{}'", to_base62(charge.id.0 as u64));

let Some(subscription) = all_subscriptions
.iter_mut()
.find(|x| Some(x.id) == charge.subscription_id)
Expand Down Expand Up @@ -664,12 +669,6 @@ pub async fn index_subscriptions(
warn!("Error indexing subscriptions: {:?}", e);
}

if let Err(error) = tax_charges_index_handle.await
&& error.is_panic()
{
std::panic::resume_unwind(error.into_panic());
}

info!("Done indexing subscriptions");
}

Expand Down
2 changes: 1 addition & 1 deletion apps/labrinth/src/queue/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ impl EmailQueue {
let update_next_attempt =
status == NotificationDeliveryStatus::Pending;

let mut delivery = deliveries.remove(idx);
let mut delivery = deliveries.swap_remove(idx);
delivery.status = status;
delivery.next_attempt += if update_next_attempt {
chrono::Duration::seconds(EMAIL_RETRY_DELAY_SECONDS)
Expand Down
2 changes: 1 addition & 1 deletion apps/labrinth/src/queue/email/templates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ async fn collect_template_variables(

map.insert(
PAYOUTAVAILABLE_AMOUNT,
format!("USD${:.2}", (amount * 100.0) as i64),
format!("USD${:.2}", *amount as f64 / 100.0),
);

Ok(map)
Expand Down
4 changes: 3 additions & 1 deletion apps/labrinth/src/queue/payouts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use sqlx::PgPool;
use sqlx::postgres::PgQueryResult;
use std::collections::HashMap;
use tokio::sync::RwLock;
use tracing::error;
use tracing::{error, info};

pub struct PayoutsQueue {
credential: RwLock<Option<PayPalCredentials>>,
Expand Down Expand Up @@ -1091,6 +1091,8 @@ pub async fn index_payouts_notifications(
pool: &PgPool,
redis: &RedisPool,
) -> Result<(), ApiError> {
info!("Updating payout notifications");

let mut transaction = pool.begin().await?;

payouts_values_notifications::synchronize_future_payout_values(
Expand Down