Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/agent-broker/templates/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ data:
max_sessions = {{ .Values.pool.maxSessions }}
session_ttl_hours = {{ .Values.pool.sessionTtlHours }}

[management]
enabled = {{ .Values.management.enabled }}
bind = "{{ .Values.management.bind }}"

[reactions]
enabled = {{ .Values.reactions.enabled }}
remove_after_reply = {{ .Values.reactions.removeAfterReply }}
Expand Down
18 changes: 18 additions & 0 deletions charts/agent-broker/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ spec:
resources:
{{- toYaml . | nindent 12 }}
{{- end }}
{{- if .Values.management.enabled }}
ports:
- name: management
containerPort: 8090
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: management
initialDelaySeconds: 10
periodSeconds: 15
readinessProbe:
httpGet:
path: /healthz
port: management
initialDelaySeconds: 5
periodSeconds: 10
{{- end }}
volumeMounts:
- name: config
mountPath: /etc/agent-broker
Expand Down
4 changes: 4 additions & 0 deletions charts/agent-broker/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ pool:
maxSessions: 10
sessionTtlHours: 24

management:
enabled: false
bind: "0.0.0.0:8090"

reactions:
enabled: true
removeAfterReply: false
Expand Down
4 changes: 4 additions & 0 deletions config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ working_dir = "/home/agent"
max_sessions = 10
session_ttl_hours = 24

[management]
enabled = false
bind = "0.0.0.0:8090"

[reactions]
enabled = true
remove_after_reply = false
Expand Down
16 changes: 16 additions & 0 deletions k8s/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ spec:
key: discord-bot-token
- name: HOME
value: /home/agent
ports:
- name: management
containerPort: 8090
protocol: TCP
livenessProbe:
httpGet:
path: /healthz
port: management
initialDelaySeconds: 10
periodSeconds: 15
readinessProbe:
httpGet:
path: /healthz
port: management
initialDelaySeconds: 5
periodSeconds: 10
volumeMounts:
- name: config
mountPath: /etc/agent-broker
Expand Down
27 changes: 27 additions & 0 deletions src/acp/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,33 @@ impl SessionPool {
f(conn).await
}

pub fn max_sessions(&self) -> usize {
self.max_sessions
}

pub async fn list_sessions(&self) -> Vec<(String, bool, u64)> {
let conns = self.connections.read().await;
conns
.iter()
.map(|(id, c)| {
let idle = c.last_active.elapsed().as_secs();
(id.clone(), c.alive(), idle)
})
.collect()
}

pub async fn remove_session(&self, thread_id: &str) -> bool {
let mut conns = self.connections.write().await;
conns.remove(thread_id).is_some()
}

pub async fn remove_all_sessions(&self) -> usize {
let mut conns = self.connections.write().await;
let count = conns.len();
conns.clear();
count
}

pub async fn cleanup_idle(&self, ttl_secs: u64) {
let cutoff = Instant::now() - std::time::Duration::from_secs(ttl_secs);
let mut conns = self.connections.write().await;
Expand Down
18 changes: 18 additions & 0 deletions src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,26 @@ pub struct Config {
pub pool: PoolConfig,
#[serde(default)]
pub reactions: ReactionsConfig,
#[serde(default)]
pub management: ManagementConfig,
}

#[derive(Debug, Deserialize)]
pub struct ManagementConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_management_bind")]
pub bind: String,
}

impl Default for ManagementConfig {
fn default() -> Self {
Self { enabled: false, bind: default_management_bind() }
}
}

fn default_management_bind() -> String { "0.0.0.0:8090".into() }

#[derive(Debug, Deserialize)]
pub struct DiscordConfig {
pub bot_token: String,
Expand Down
3 changes: 3 additions & 0 deletions src/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use serenity::model::gateway::Ready;
use serenity::model::id::{ChannelId, MessageId};
use serenity::prelude::*;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::sync::watch;
use tracing::{error, info};
Expand All @@ -16,6 +17,7 @@ pub struct Handler {
pub pool: Arc<SessionPool>,
pub allowed_channels: HashSet<u64>,
pub reactions_config: ReactionsConfig,
pub discord_connected: Arc<AtomicBool>,
}

#[async_trait]
Expand Down Expand Up @@ -152,6 +154,7 @@ impl EventHandler for Handler {
}

async fn ready(&self, _ctx: Context, ready: Ready) {
self.discord_connected.store(true, Ordering::Relaxed);
info!(user = %ready.user.name, "discord bot connected");
}
}
Expand Down
14 changes: 14 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ mod acp;
mod config;
mod discord;
mod format;
mod management;
mod reactions;

use serenity::prelude::*;
use std::collections::HashSet;
use std::path::PathBuf;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use tokio::time::Instant;
use tracing::info;

#[tokio::main]
Expand Down Expand Up @@ -43,10 +46,13 @@ async fn main() -> anyhow::Result<()> {
.filter_map(|s| s.parse().ok())
.collect();

let discord_connected = Arc::new(AtomicBool::new(false));

let handler = discord::Handler {
pool: pool.clone(),
allowed_channels,
reactions_config: cfg.reactions,
discord_connected: discord_connected.clone(),
};

let intents = GatewayIntents::GUILD_MESSAGES
Expand All @@ -57,6 +63,14 @@ async fn main() -> anyhow::Result<()> {
.event_handler(handler)
.await?;

// Spawn management server
let started = Instant::now();
if cfg.management.enabled {
let mgmt_pool = pool.clone();
let mgmt_dc = discord_connected.clone();
tokio::spawn(management::serve(cfg.management.bind, mgmt_pool, started, mgmt_dc));
}

// Spawn cleanup task
let cleanup_pool = pool.clone();
let cleanup_handle = tokio::spawn(async move {
Expand Down
116 changes: 116 additions & 0 deletions src/management.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use crate::acp::SessionPool;
use serde_json::json;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::time::Instant;
use tracing::{error, info};

pub async fn serve(
bind: String,
pool: Arc<SessionPool>,
started: Instant,
discord_connected: Arc<AtomicBool>,
) {
let listener = match TcpListener::bind(&bind).await {
Ok(l) => l,
Err(e) => {
error!(bind = %bind, error = %e, "management server bind failed");
return;
}
};
info!(bind = %bind, "management server listening");

loop {
let (stream, _) = match listener.accept().await {
Ok(s) => s,
Err(e) => {
error!(error = %e, "management accept error");
continue;
}
};
let pool = pool.clone();
let discord_connected = discord_connected.clone();
tokio::spawn(async move {
let (reader, mut writer) = stream.into_split();
let mut buf_reader = BufReader::new(reader);
let mut request_line = String::new();
if buf_reader.read_line(&mut request_line).await.is_err() {
return;
}

// Consume remaining headers
let mut header = String::new();
loop {
header.clear();
if buf_reader.read_line(&mut header).await.is_err() || header.trim().is_empty() {
break;
}
}

let parts: Vec<&str> = request_line.trim().splitn(3, ' ').collect();
if parts.len() < 2 {
let _ = write_response(&mut writer, 400, &json!({"error": "bad request"})).await;
return;
}
let method = parts[0];
let path = parts[1];

let (status, body) = match (method, path) {
("GET", "/healthz") => {
let uptime = started.elapsed().as_secs();
let connected = discord_connected.load(Ordering::Relaxed);
(200, json!({"status": "ok", "uptime_seconds": uptime, "discord_connected": connected}))
}
("GET", "/sessions") => {
let sessions = pool.list_sessions().await;
let list: Vec<_> = sessions
.iter()
.map(|(id, alive, idle)| {
json!({"thread_id": id, "alive": alive, "idle_seconds": idle})
})
.collect();
(200, json!({"active_sessions": sessions.len(), "max_sessions": pool.max_sessions(), "sessions": list}))
}
("DELETE", "/sessions") => {
let count = pool.remove_all_sessions().await;
(200, json!({"killed": count}))
}
("DELETE", p) if p.starts_with("/sessions/") => {
let thread_id = &p["/sessions/".len()..];
if thread_id.is_empty() {
(400, json!({"error": "missing thread_id"}))
} else if pool.remove_session(thread_id).await {
(200, json!({"killed": thread_id}))
} else {
(404, json!({"error": "session not found"}))
}
}
_ => (404, json!({"error": "not found"})),
};

let _ = write_response(&mut writer, status, &body).await;
});
}
}

async fn write_response(
writer: &mut tokio::net::tcp::OwnedWriteHalf,
status: u16,
body: &serde_json::Value,
) -> std::io::Result<()> {
let body = serde_json::to_string(body).unwrap_or_default();
let reason = match status {
200 => "OK",
400 => "Bad Request",
404 => "Not Found",
_ => "Error",
};
let resp = format!(
"HTTP/1.1 {status} {reason}\r\nContent-Type: application/json\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
body.len()
);
writer.write_all(resp.as_bytes()).await?;
writer.flush().await
}