Skip to content
49 changes: 47 additions & 2 deletions apps/labrinth/src/database/models/notifications_template_item.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
use crate::database::models::DatabaseError;
use crate::database::redis::RedisPool;
use crate::models::v3::notifications::{NotificationChannel, NotificationType};
use crate::routes::ApiError;
use serde::{Deserialize, Serialize};

const TEMPLATES_NAMESPACE: &str = "notifications_templates";
const TEMPLATES_HTML_DATA_NAMESPACE: &str = "notifications_templates_html_data";
const TEMPLATES_DYNAMIC_HTML_NAMESPACE: &str =
"notifications_templates_dynamic_html";

const HTML_DATA_CACHE_EXPIRY: i64 = 60 * 15; // 15 minutes
const TEMPLATES_CACHE_EXPIRY: i64 = 60 * 30; // 30 minutes

#[derive(Debug, Clone, Serialize, Deserialize)]

pub struct NotificationTemplate {
pub id: i64,
pub channel: NotificationChannel,
Expand Down Expand Up @@ -75,7 +79,7 @@ impl NotificationTemplate {
TEMPLATES_NAMESPACE,
channel.as_str(),
&templates,
None,
Some(TEMPLATES_CACHE_EXPIRY),
)
.await?;

Expand Down Expand Up @@ -111,3 +115,44 @@ impl NotificationTemplate {
.await
}
}

pub async fn get_or_set_cached_dynamic_html<F>(
redis: &RedisPool,
key: &str,
get: impl FnOnce() -> F,
) -> Result<String, ApiError>
where
F: Future<Output = Result<String, ApiError>>,
{
#[derive(Debug, Clone, Serialize, Deserialize)]
struct HtmlBody {
html: String,
}

let mut redis_conn = redis.connect().await?;
if let Some(body) = redis_conn
.get_deserialized_from_json::<HtmlBody>(
TEMPLATES_DYNAMIC_HTML_NAMESPACE,
key,
)
.await?
{
return Ok(body.html);
}

drop(redis_conn);

let cached = HtmlBody { html: get().await? };
let mut redis_conn = redis.connect().await?;

redis_conn
.set_serialized_to_json(
TEMPLATES_DYNAMIC_HTML_NAMESPACE,
key,
&cached,
Some(HTML_DATA_CACHE_EXPIRY),
)
.await?;

Ok(cached.html)
}
15 changes: 15 additions & 0 deletions apps/labrinth/src/models/v2/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ pub enum LegacyNotificationBody {
amount: u64,
date_available: DateTime<Utc>,
},
Custom {
key: String,
title: String,
body_md: String,
},
Unknown,
}

Expand Down Expand Up @@ -217,6 +222,7 @@ impl LegacyNotification {
NotificationBody::PayoutAvailable { .. } => {
Some("payout_available".to_string())
}
NotificationBody::Custom { .. } => Some("custom".to_string()),
NotificationBody::LegacyMarkdown {
notification_type, ..
} => notification_type.clone(),
Expand Down Expand Up @@ -378,6 +384,15 @@ impl LegacyNotification {
service,
currency,
},
NotificationBody::Custom {
title,
body_md,
key,
} => LegacyNotificationBody::Custom {
title,
body_md,
key,
},
NotificationBody::PaymentFailed { amount, service } => {
LegacyNotificationBody::PaymentFailed { amount, service }
}
Expand Down
15 changes: 15 additions & 0 deletions apps/labrinth/src/models/v3/notifications.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pub enum NotificationType {
ProjectStatusNeutral,
ProjectTransferred,
PayoutAvailable,
Custom,
Unknown,
}

Expand Down Expand Up @@ -89,6 +90,7 @@ impl NotificationType {
NotificationType::ProjectStatusApproved => {
"project_status_approved"
}
NotificationType::Custom => "custom",
NotificationType::ProjectStatusNeutral => "project_status_neutral",
NotificationType::ProjectTransferred => "project_transferred",
NotificationType::Unknown => "unknown",
Expand Down Expand Up @@ -125,6 +127,7 @@ impl NotificationType {
}
"project_status_neutral" => NotificationType::ProjectStatusNeutral,
"project_transferred" => NotificationType::ProjectTransferred,
"custom" => NotificationType::Custom,
"unknown" => NotificationType::Unknown,
_ => NotificationType::Unknown,
}
Expand Down Expand Up @@ -236,6 +239,11 @@ pub enum NotificationBody {
date_available: DateTime<Utc>,
amount: u64,
},
Custom {
key: String,
title: String,
body_md: String,
},
Unknown,
}

Expand Down Expand Up @@ -313,6 +321,7 @@ impl NotificationBody {
NotificationBody::PayoutAvailable { .. } => {
NotificationType::PayoutAvailable
}
NotificationBody::Custom { .. } => NotificationType::Custom,
NotificationBody::Unknown => NotificationType::Unknown,
}
}
Expand Down Expand Up @@ -557,6 +566,12 @@ impl From<DBNotification> for Notification {
"#".to_string(),
vec![],
),
NotificationBody::Custom { title, .. } => (
"Notification".to_string(),
title.clone(),
"#".to_string(),
vec![],
),
NotificationBody::Unknown => {
("".to_string(), "".to_string(), "#".to_string(), vec![])
}
Expand Down
49 changes: 38 additions & 11 deletions apps/labrinth/src/queue/email.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::database::models::notifications_deliveries_item::DBNotificationDelive
use crate::database::models::notifications_template_item::NotificationTemplate;
use crate::database::models::user_item::DBUser;
use crate::database::redis::RedisPool;
use crate::models::notifications::NotificationBody;
use crate::models::notifications::{NotificationBody, NotificationType};
use crate::models::v3::notifications::{
NotificationChannel, NotificationDeliveryStatus,
};
Expand All @@ -20,6 +20,7 @@ use sqlx::PgPool;
use std::sync::Arc;
use thiserror::Error;
use tokio::sync::Mutex as TokioMutex;
use tokio::sync::Semaphore;
use tracing::{error, info, instrument, warn};

const EMAIL_RETRY_DELAY_SECONDS: i64 = 10;
Expand Down Expand Up @@ -187,13 +188,19 @@ impl EmailQueue {

// For all notifications we collected, fill out the template
// and send it via SMTP in parallel.

let mut futures = FuturesUnordered::new();

// Some email notifications should still be processed sequentially. This is to avoid cache stampede in the
// case that processing the email can be heavy. For example, custom emails always make a POST request to modrinth.com,
// which, while not necessarily slow, is subject to heavy rate limiting.
let sequential_processing = Arc::new(Semaphore::new(1));

for notification in notifications {
let this = self.clone();
let transport = Arc::clone(&transport);

let seq = Arc::clone(&sequential_processing);

futures.push(async move {
let mut txn = this.pg.begin().await?;

Expand All @@ -214,15 +221,35 @@ impl EmailQueue {
));
};

this.send_one_with_transport(
&mut txn,
transport,
notification.body,
notification.user_id,
mailbox,
)
.await
.map(|status| (notification.id, status))
// For the cache stampede reasons mentioned above, we process custom emails exclusively sequentially.
// This could cause unnecessary slowness if we're sending a lot of custom emails with the same key in one go,
// and the cache is already populated (thus the sequential processing would not be needed).
let maybe_permit = if notification.body.notification_type()
== NotificationType::Custom
{
Some(
seq.acquire()
.await
.expect("Semaphore should never be closed"),
)
} else {
None
};

let result = this
.send_one_with_transport(
&mut txn,
transport,
notification.body,
notification.user_id,
mailbox,
)
.await
.map(|status| (notification.id, status));

drop(maybe_permit);

result
});
}

Expand Down
Loading