diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ce81892 --- /dev/null +++ b/.env.example @@ -0,0 +1 @@ +DATABASE_URL= diff --git a/Cargo.lock b/Cargo.lock index 403fde2..cdd0ce8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -28,9 +28,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.36" +version = "1.0.38" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68803225a7b13e47191bab76f2687382b60d259e8cf37f6e1893658b84bb9479" +checksum = "afddf7f520a80dbf76e6f50a35bca42a2331ef227a28b3b6dc5c2e2338d114b1" [[package]] name = "arrayvec" @@ -38,6 +38,17 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "async-trait" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "atoi" version = "0.4.0" @@ -202,9 +213,12 @@ checksum = "ad1f8e949d755f9d79112b5bb46938e0ef9d3804a0b16dfab13aafcaa5f0fa72" name = "cafecoder-rs" version = "0.1.0" dependencies = [ + "anyhow", + "async-trait", "bollard", "chrono", "dotenv", + "futures", "hyper 0.14.1", "rand 0.8.0", "sqlx", @@ -448,9 +462,9 @@ checksum = "0ba62103ce691c2fd80fbae2213dfdda9ce60804973ac6b6e97de818ea7f52c8" [[package]] name = "futures" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0" +checksum = "da9052a1a50244d8d5aa9bf55cbc2fb6f357c86cc52e46c62ed390a7180cf150" dependencies = [ "futures-channel", "futures-core", @@ -463,9 +477,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +checksum = "f2d31b7ec7efab6eefc7c57233bb10b847986139d88cc2f5a02a1ae6871a1846" dependencies = [ "futures-core", "futures-sink", @@ -473,15 +487,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" +checksum = "79e5145dde8da7d1b3892dad07a9c98fc04bc39892b1ecc9692cf53e2b780a65" [[package]] name = "futures-executor" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65" +checksum = "e9e59fdc009a4b3096bf94f740a0f2424c082521f20a9b08c5c07c48d90fd9b9" dependencies = [ "futures-core", "futures-task", @@ -490,15 +504,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" +checksum = "28be053525281ad8259d47e4de5de657b25e7bac113458555bb4b70bc6870500" [[package]] name = "futures-macro" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +checksum = "c287d25add322d9f9abdcdc5927ca398917996600182178774032e9f8258fedd" dependencies = [ "proc-macro-hack", "proc-macro2", @@ -508,24 +522,24 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" +checksum = "caf5c69029bda2e743fddd0582d1083951d65cc9539aebf8812f36c3491342d6" [[package]] name = "futures-task" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +checksum = "13de07eb8ea81ae445aca7b69f5f7bf15d7bf4912d8ca37d6645c77ae8a58d86" dependencies = [ "once_cell", ] [[package]] name = "futures-util" -version = "0.3.8" +version = "0.3.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +checksum = "632a8cd0f2a4b3fdea1657f08bde063848c3bd00f9bbf6e256b8be78802e624b" dependencies = [ "futures-channel", "futures-core", @@ -534,7 +548,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project 1.0.2", + "pin-project-lite 0.2.4", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -1206,9 +1220,9 @@ checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" [[package]] name = "pin-project-lite" -version = "0.2.0" +version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b063f57ec186e6140e2b8b6921e5f1bd89c7356dda5b33acc5401203ca6131c" +checksum = "439697af366c49a6d0a010c56a0d97685bc140ce0d377b13a2ea2aa42d64a827" [[package]] name = "pin-utils" @@ -1864,7 +1878,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f4bfdcbd00fa893ac0549b38aa27080636a0104b0d0c38475a99439405e1df8" dependencies = [ "autocfg 1.0.1", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.4", ] [[package]] @@ -1918,7 +1932,7 @@ checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" dependencies = [ "cfg-if 1.0.0", "log", - "pin-project-lite 0.2.0", + "pin-project-lite 0.2.4", "tracing-core", ] diff --git a/Cargo.toml b/Cargo.toml index 9e34a0a..99e1b46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,3 +15,6 @@ rand = "0.8.0" # TODO(magurotuna): Upgrade tokio to v1 after sqlx gets compatible with that tokio = { version = "0.2.4", features = [ "full" ] } hyper = "0.14" +anyhow = "1.0.37" +async-trait = "0.1.42" +futures = "0.3.12" diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..934884d --- /dev/null +++ b/src/config.rs @@ -0,0 +1,10 @@ +use anyhow::Result; + +pub struct Config { + pub database_url: String, +} + +pub fn load_config() -> Result { + let database_url = dotenv::var("DATABASE_URL")?; + Ok(Config { database_url }) +} diff --git a/src/db.rs b/src/db.rs index 8dbc3ee..1bcf74d 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,12 +1,10 @@ -use dotenv::dotenv; +use crate::config::Config; +use anyhow::Result; use sqlx::{mysql::MySqlPool, MySql}; -use std::env; -pub async fn new_pool() -> Result, sqlx::Error> { - dotenv().ok(); - let url = env::var("DATABASE_URL").expect("DATABASE_URL must be set"); - - let pool = MySqlPool::connect(&url).await?; +pub type DbPool = sqlx::Pool; +pub async fn new_pool(config: &Config) -> Result { + let pool = MySqlPool::connect(&config.database_url).await?; Ok(pool) } diff --git a/src/docker_lib.rs b/src/docker_lib.rs deleted file mode 100644 index fb08eb5..0000000 --- a/src/docker_lib.rs +++ /dev/null @@ -1,68 +0,0 @@ -use bollard::{ - container::{Config, CreateContainerOptions, RemoveContainerOptions}, - errors::Error, - models::{ContainerCreateResponse, HostConfig}, -}; - -const IMAGE: &str = "cafecoder"; - -// see https://github.com/fussybeaver/bollard - -#[derive(Clone)] -pub struct Docker { - docker: bollard::Docker, - pub container_name: String, - pub ip_address: String, -} - -impl Docker { - pub fn new() -> Result { - let docker = bollard::Docker::connect_with_unix_defaults()?; - Ok(Docker { - docker, - container_name: String::new(), - ip_address: String::new(), - }) - } - - pub async fn container_create(&mut self, name: &str) -> Result { - let options = Some(CreateContainerOptions { name }); - let config = Config { - image: Some(IMAGE), - host_config: Some(HostConfig { - memory: Some(2_147_483_648_i64), - pids_limit: Some(512_i64), - ..Default::default() - }), - ..Default::default() - }; - - self.container_name = name.to_string(); - - let inspect = self - .docker - .inspect_container(&self.container_name, None) - .await?; - - let network_settings = inspect - .network_settings - .expect("couldn't get network_settings"); - - self.ip_address = network_settings - .ip_address - .expect("couldn't get IP address"); - - self.docker.create_container(options, config).await - } - - pub async fn container_remove(&self) -> Result<(), Error> { - let options = RemoveContainerOptions { - force: true, - ..Default::default() - }; - - self.docker - .remove_container(&self.container_name, Some(options)) - .await - } -} diff --git a/src/error.rs b/src/error.rs deleted file mode 100644 index 057c2eb..0000000 --- a/src/error.rs +++ /dev/null @@ -1,24 +0,0 @@ -#[derive(Debug)] -pub enum Error { - SqlxError { e: sqlx::Error }, - BollardError { e: bollard::errors::Error }, - TokioJoinError { e: tokio::task::JoinError }, -} - -impl From for Error { - fn from(e: sqlx::Error) -> Self { - Error::SqlxError { e } - } -} - -impl From for Error { - fn from(e: bollard::errors::Error) -> Self { - Error::BollardError { e } - } -} - -impl From for Error { - fn from(e: tokio::task::JoinError) -> Self { - Error::TokioJoinError { e } - } -} diff --git a/src/judge.rs b/src/judge.rs deleted file mode 100644 index e69de29..0000000 diff --git a/src/lib.rs b/src/lib.rs deleted file mode 100644 index 0d64580..0000000 --- a/src/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -pub mod db; -pub mod docker_lib; -pub mod error; -pub mod models; -pub mod utils; diff --git a/src/main.rs b/src/main.rs index cb24ba4..d9236cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,77 +1,33 @@ -use cafecoder_rs::{db, docker_lib, error::Error, models::Submits, utils}; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, - thread::sleep, - time::Duration, -}; -use tokio::task; - -const MAX_THREADS: i32 = 2; - -#[tokio::main] -async fn main() -> Result<(), Error> { - let pool = db::new_pool().await?; - let now = Arc::new(Mutex::new(0)); - #[allow(unused_mut, unused)] - let mut json_map: HashMap = HashMap::new(); - - // let mut handles = Vec::new(); - - #[allow(clippy::never_loop)] - loop { - #[allow(unused)] - let submits: Vec = sqlx::query_as( - r#" - SELECT * FROM submits - WHERE status = 'WJ' OR status = 'WR' AND deleted_at IS NULL - ORDER BY updated_at ASC - LIMIT 2 - "#, - ) - .fetch_all(&pool) - .await?; - - let submits: Vec = Vec::new(); - - #[allow(unused)] - for submit in submits { - while *now.lock().expect("couldn't lock {now}") < MAX_THREADS {} - *now.lock().expect("couldn't lock {now}") += 1; - - let now = now.clone(); - - let _: task::JoinHandle> = task::spawn(async move { - let docker = Arc::new(Mutex::new(docker_lib::Docker::new()?)); - let mut rt = tokio::runtime::Runtime::new().unwrap(); - - let docker_ = docker.clone(); - let thread_result: Result<(), Error> = task::spawn(async move { - let name = utils::gen_rand_string(32).await; - let mut rt = tokio::runtime::Runtime::new().unwrap(); - - #[allow(unused)] - let container = rt.block_on(async { - docker_.lock().unwrap().container_create(&name).await - })?; - - rt.block_on(async { docker_.lock().unwrap().container_remove().await })?; - - Ok(()) - }) - .await?; - thread_result?; - - *now.lock().expect("couldn't lock {now}") += 1; - rt.block_on(async move { docker.lock().unwrap().container_remove().await })?; - - Ok(()) - }); - } - - sleep(Duration::from_secs(329329)); - break; +mod config; +mod db; +mod models; +mod repository; +mod task; +mod utils; + +use anyhow::Result; +use futures::future::join_all; +use std::sync::Arc; + +// TODO(magurotuna): ここの値も要検討 +const JOB_THREADS: usize = 3; + +// TODO(magurotuna): スレッド数指定を柔軟に行うため、Tokio の RuntimeBuilder を使うよう書き換える +#[tokio::main(core_threads = 4)] +async fn main() -> Result<()> { + let config = config::load_config()?; + let db_conn = Arc::new(db::new_pool(&config).await?); + let docker_conn = Arc::new(bollard::Docker::connect_with_unix_defaults()?); + + let mut handles = Vec::new(); + for _ in 0..JOB_THREADS { + let db = Arc::clone(&db_conn); + let docker = Arc::clone(&docker_conn); + let handle = tokio::spawn(task::gen_job(db, docker)); + handles.push(handle); } + join_all(handles).await; + Ok(()) } diff --git a/src/models.rs b/src/models.rs index a3c2dc6..0ef0b7c 100644 --- a/src/models.rs +++ b/src/models.rs @@ -1,7 +1,7 @@ use chrono::NaiveDateTime; #[derive(Debug, sqlx::FromRow)] -pub struct Submits { +pub struct Submit { pub id: i64, pub user_id: i32, pub problem_id: i64, diff --git a/src/repository.rs b/src/repository.rs new file mode 100644 index 0000000..2eb6d61 --- /dev/null +++ b/src/repository.rs @@ -0,0 +1,27 @@ +use crate::db::DbPool; +use crate::models::Submit; +use anyhow::Result; +use async_trait::async_trait; + +#[async_trait] +pub trait SubmitRepository { + async fn get_submits(&self) -> Result>; +} + +#[async_trait] +impl SubmitRepository for DbPool { + async fn get_submits(&self) -> Result> { + let submits = sqlx::query_as( + r#" + SELECT * FROM submits + WHERE status = 'WJ' OR status = 'WR' AND deleted_at IS NULL + ORDER BY updated_at ASC + LIMIT 2 + "#, + ) + .fetch_all(self) + .await?; + + Ok(submits) + } +} diff --git a/src/task.rs b/src/task.rs new file mode 100644 index 0000000..ff53d7f --- /dev/null +++ b/src/task.rs @@ -0,0 +1,146 @@ +// TODO: 実装が終わったらこのallowディレクティブを削除する +#![allow(unused)] + +use crate::db::DbPool; +use crate::models::Submit; +use anyhow::{bail, Result}; +use bollard::container::{Config, CreateContainerOptions, RemoveContainerOptions}; +use bollard::models::HostConfig; +use bollard::service::ContainerCreateResponse; +use bollard::Docker; +use futures::future::FutureExt; +use futures::stream::{self, StreamExt}; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::delay_for; + +// submit が取得できなかったときの次の取得までの間隔 +const INTERVAL: Duration = Duration::from_secs(1); + +pub async fn gen_job(db_conn: Arc, docker_conn: Arc) { + // この `task` が 1 実行単位 + let task = || { + let db_conn = Arc::clone(&db_conn); + let docker_conn = Arc::clone(&docker_conn); + async move { + let task = JudgeTask::new(db_conn, docker_conn); + + // 提出を取得 + let submit = match task.fetch_submit().await { + Ok(s) => s, + Err(_) => { + // TODO(magurotuna): 提出が取得できなかった場合は 1 秒待って次の実行に移る + delay_for(INTERVAL).await; + bail!("Couldn't find an unjudged submit."); + } + }; + + // コンテナ作成 + // TODO(magurotuna): コンテナ名をちゃんとする UUIDを発行? + let container_name = "DUMMY_NAME"; + let container = task.create_container(container_name).await?; + + ///////////////////////////////////////////////////////////////// + // + // ここでコンテナに対するリクエストなどの処理を行う + // + ///////////////////////////////////////////////////////////////// + + // コンテナを削除 + task.remove_container(container_name).await?; + // ジャッジ終了としてDBを更新 + task.save_as_finished(submit.id).await?; + + Ok(()) + } + }; + + // stream::unfold をすることで、1 実行単位である `task` を延々と繰り返すような Stream を作る + let mut stream = stream::unfold((), move |_| { + // カッコが続いて見づらくなるので Unit に置き換えて多少見やすくなるようにしている + type Unit = (); + const UNIT: () = (); + fn mapper(task_result: Result) -> Option<(Result, Unit)> { + Some((task_result, UNIT)) + } + task().map(mapper) + }) + .boxed(); + + while let Some(_task_result) = stream.next().await { + // 1回1回の task の実行結果を使って何かやりたければここに書く + // ログ出力とか? + } +} + +/// 1つの submit に対するジャッジの処理を担当する +struct JudgeTask { + db_conn: Arc, + docker_conn: Arc, +} + +impl JudgeTask { + fn new(db_conn: Arc, docker_conn: Arc) -> Self { + Self { + db_conn, + docker_conn, + } + } + + /// 未ジャッジの提出のうち、もっとも古いもの1件を取得する。 + /// その1件のステータスを「ジャッジ中」にする + async fn fetch_submit(&self) -> Result { + todo!() + } + + /// Docker コンテナを指定された名前で立ち上げる + async fn create_container(&self, name: &str) -> Result<(ContainerCreateResponse, String)> { + const IMAGE: &str = "cafecoder"; + let options = Some(CreateContainerOptions { name }); + let config = Config { + image: Some(IMAGE), + host_config: Some(HostConfig { + memory: Some(2_147_483_648_i64), + pids_limit: Some(512_i64), + ..Default::default() + }), + ..Default::default() + }; + + let inspect = self.docker_conn.inspect_container(name, None).await?; + + let network_settings = inspect + .network_settings + .expect("couldn't get network_settings"); + let ip_addr = network_settings + .ip_address + .expect("couldn't get IP address"); + + let res = self.docker_conn.create_container(options, config).await?; + Ok((res, ip_addr)) + } + + /// ジャッジの進捗をDBに保存する + /// TODO: 引数に追加情報が必要だと思うので、必要に応じて追加する + async fn save_progress(&self, submit_id: i64) -> Result<()> { + todo!() + } + + /// Docker コンテナを削除する + async fn remove_container(&self, name: &str) -> Result<()> { + let options = RemoveContainerOptions { + force: true, + ..Default::default() + }; + + self.docker_conn + .remove_container(name, Some(options)) + .await?; + Ok(()) + } + + /// ジャッジが正常に終了した旨を `submit_id` のレコードに保存する + async fn save_as_finished(&self, submit_id: i64) -> Result<()> { + todo!() + } +} diff --git a/src/utils.rs b/src/utils.rs index d407577..dab4493 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,9 +1,11 @@ use rand::{distributions::Alphanumeric, prelude::*}; +#[allow(unused)] pub fn print_type_of(_: &T) { println!("{}", std::any::type_name::()) } +#[allow(unused)] pub async fn gen_rand_string(digits: usize) -> String { rand::thread_rng() .sample_iter(&Alphanumeric)