Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 90 additions & 3 deletions src/gateway.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::adapter::{AdapterRouter, ChannelRef, ChatAdapter, MessageRef, SenderContext};
use crate::acp::ContentBlock;
use crate::config::SttConfig;
use crate::media;
use anyhow::Result;
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
Expand Down Expand Up @@ -44,12 +47,22 @@ struct GwSender {
is_bot: bool,
}

#[derive(Clone, Debug, Deserialize)]
struct GwAttachment {
url: String,
content_type: Option<String>,
filename: Option<String>,
size: Option<u64>,
}

#[derive(Clone, Debug, Deserialize)]
struct GwContent {
#[allow(dead_code)]
#[serde(rename = "type")]
content_type: String,
text: String,
text: Option<String>,
#[serde(default)]
attachments: Vec<GwAttachment>,
}

#[derive(Serialize)]
Expand Down Expand Up @@ -268,6 +281,7 @@ pub struct GatewayParams {
pub allowed_channels: Vec<String>,
pub allow_all_users: bool,
pub allowed_users: Vec<String>,
pub stt_config: SttConfig,
}

pub async fn run_gateway_adapter(
Expand Down Expand Up @@ -414,9 +428,82 @@ pub async fn run_gateway_adapter(

let adapter = adapter.clone();
let router = router.clone();
let prompt = event.content.text.clone();
let prompt = event.content.text.clone().unwrap_or_default();
let attachments = event.content.attachments.clone();
let stt_config = params.stt_config.clone();

tasks.spawn(async move {
let mut extra_blocks = Vec::new();
let mut text_file_bytes: u64 = 0;
let mut text_file_count: u32 = 0;
const TEXT_TOTAL_CAP: u64 = 1024 * 1024; // 1 MB total for all text file attachments
const TEXT_FILE_COUNT_CAP: u32 = 5;

for attachment in attachments {
let mime = attachment.content_type.as_deref().unwrap_or("");
let filename = attachment.filename.as_deref().unwrap_or("attachment");
let size = attachment.size.unwrap_or(0);

if media::is_audio_mime(mime) {
if stt_config.enabled {
let mime_clean = mime.split(';').next().unwrap_or(mime).trim();
if let Some(transcript) = media::download_and_transcribe(
&attachment.url,
filename,
mime_clean,
size,
&stt_config,
None,
).await {
tracing::debug!(filename = %filename, chars = transcript.len(), "voice transcript injected");
extra_blocks.insert(0, ContentBlock::Text {
text: format!("[Voice message transcript]: {transcript}"),
});
}
} else {
tracing::warn!(filename = %filename, "skipping audio attachment (STT disabled)");
}
} else if media::is_text_file(filename, attachment.content_type.as_deref()) {
if text_file_count >= TEXT_FILE_COUNT_CAP {
tracing::warn!(filename = %filename, count = text_file_count, "text file count cap reached, skipping");
continue;
}
if text_file_bytes + size > TEXT_TOTAL_CAP {
tracing::warn!(filename = %filename, total = text_file_bytes, "text attachments total exceeds 1MB cap, skipping remaining");
continue;
}
if let Some((block, actual_bytes)) = media::download_and_read_text_file(
&attachment.url,
filename,
size,
None,
).await {
text_file_bytes += actual_bytes;
text_file_count += 1;
tracing::debug!(filename = %filename, "adding text file attachment");
extra_blocks.push(block);
}
} else if let Some(block) = media::download_and_encode_image(
&attachment.url,
attachment.content_type.as_deref(),
filename,
size,
None,
).await {
tracing::debug!(url = %attachment.url, filename = %filename, "adding image attachment");
extra_blocks.push(block);
} else {
tracing::debug!(url = %attachment.url, filename = %filename, mime = %mime, "ignoring unsupported attachment type");
extra_blocks.push(ContentBlock::Text {
text: format!("[User uploaded a file: {filename} ({mime})]"),
});
}
}

if prompt.is_empty() && extra_blocks.is_empty() {
return;
}

// If supergroup with no thread_id, create a forum topic
let thread_channel = if event.channel.channel_type == "supergroup"
&& channel.thread_id.is_none()
Expand All @@ -439,7 +526,7 @@ pub async fn run_gateway_adapter(
&thread_channel,
&sender_json,
&prompt,
vec![],
extra_blocks,
&trigger_msg,
false,
)
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ async fn main() -> anyhow::Result<()> {
allowed_channels: gw_cfg.allowed_channels,
allow_all_users: config::resolve_allow_all(gw_cfg.allow_all_users, &gw_cfg.allowed_users),
allowed_users: gw_cfg.allowed_users,
stt_config: cfg.stt.clone(),
};
Some(tokio::spawn(async move {
if let Err(e) = gateway::run_gateway_adapter(params, router, shutdown_rx).await {
Expand Down
Loading