Skip to content

Commit

Permalink
staging, linter: linter dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
xtexChooser committed Jan 31, 2024
1 parent dc81118 commit df1bcfa
Show file tree
Hide file tree
Showing 10 changed files with 249 additions and 30 deletions.
8 changes: 6 additions & 2 deletions src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ use std::{

use anyhow::{bail, Result};
use lru::LruCache;
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use tokio::sync::Notify;

pub use mwbot::Bot as MwBot;
pub use mwbot::Page as MwPage;

use tracing::info;

use crate::{db::DatabaseManager, site, web};
use crate::{db::DatabaseManager, linter::LinterState, site, web};

pub const USER_AGENT: &str = concat!(
env!("CARGO_PKG_NAME"),
Expand All @@ -31,6 +31,8 @@ pub struct App {
pub linter_notify: Notify,
pub resync_pages_notify: Notify,
pub login_lru: RwLock<LruCache<String, web::auth::AuthResult>>,
pub linter_selector_mutex: Mutex<()>,
pub linters: RwLock<Vec<Arc<RwLock<LinterState>>>>,
}

static GLOBAL_APP: OnceLock<Arc<App>> = OnceLock::new();
Expand All @@ -43,6 +45,8 @@ impl App {
linter_notify: Notify::new(),
resync_pages_notify: Notify::new(),
login_lru: RwLock::new(LruCache::new(NonZeroUsize::new(30).unwrap())),
linter_selector_mutex: Mutex::new(()),
linters: RwLock::new(Vec::new()),
}))
}

Expand Down
8 changes: 4 additions & 4 deletions src/db/model/page.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@ pub struct Model {
pub title: String,
#[sea_orm(column_type = "Timestamp")]
pub last_checked: DateTimeUtc,
#[sea_orm(column_type = "Timestamp", nullable)]
#[sea_orm(column_type = "Timestamp", nullable, default_value = "None")]
pub need_check: Option<DateTimeUtc>,
#[sea_orm(column_type = "Unsigned", default = 0)]
#[sea_orm(column_type = "Unsigned", default_value = "0")]
pub check_errors: u32,
#[sea_orm(column_type = "Unsigned", default = 0)]
#[sea_orm(column_type = "Unsigned", default_value = "0")]
pub issues: u32,
#[sea_orm(column_type = "Unsigned", default = 0)]
#[sea_orm(column_type = "Unsigned", default_value = "0")]
pub suggests: u32,
}

Expand Down
2 changes: 1 addition & 1 deletion src/db/model/rcsyncer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub struct Model {
pub id: Uuid,
#[sea_orm(column_type = "Timestamp")]
pub last_synced_at: DateTimeUtc,
#[sea_orm(column_type = "Unsigned", default = 0)]
#[sea_orm(column_type = "Unsigned", default_value = 0)]
pub last_rc_id: u32,
}

Expand Down
4 changes: 2 additions & 2 deletions src/db/model/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ pub struct Model {
pub salt: String,
#[sea_orm(column_type = "String(None)")]
pub modrinth_id: String,
#[sea_orm(column_type = "Boolean", default = false)]
#[sea_orm(column_type = "Boolean", default_value = "false")]
pub sysop: bool,
#[sea_orm(column_type = "Timestamp", nullable, default = None)]
#[sea_orm(column_type = "Timestamp", nullable, default_value = "None")]
pub blocked: Option<DateTimeUtc>,
}

Expand Down
114 changes: 108 additions & 6 deletions src/linter/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,117 @@
use crate::app::App;
use std::{
env,
sync::{Arc, LazyLock},
};

pub async fn run_linter() {
let app = App::get();
use anyhow::{bail, Result};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use tokio::task::JoinSet;
use tracing::{error, info_span, Instrument};
use uuid::Uuid;

use crate::{app::App, page::Page};

pub static LINTER_WORKERS: LazyLock<u32> = LazyLock::new(|| {
env::var("SPOCK_LINTER_WORKERS")
.ok()
.and_then(|s| s.parse::<u32>().ok())
.unwrap_or(5)
});

#[derive(Debug, Serialize, Deserialize, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct LinterState {
pub page: Option<Uuid>,
}

impl LinterState {
pub fn new() -> Self {
Self { page: None }
}
}

pub async fn run_linters() {
let app = App::get();
let _ = app.mwbot("zh").await.unwrap();
let _ = app.mwbot("en").await.unwrap();

let mut handles = JoinSet::new();
for _ in 0..*LINTER_WORKERS {
let state = Arc::new(RwLock::new(LinterState::new()));
app.linters.write().push(state.clone());
handles.spawn(run_linter(state));
}

loop {
tokio::select! {
_ = app.linter_notify.notified()=>{},
_ = tokio::time::sleep(std::time::Duration::from_secs(120))=>{}
tokio::time::sleep(std::time::Duration::from_secs(120)).await;
match Page::count_for_check().await.unwrap_or(0) {
0 => {}
1 => app.linter_notify.notify_one(),
_ => app.linter_notify.notify_waiters(),
}
}
}

async fn select_page(state: &RwLock<LinterState>) -> Result<Option<Page>> {
let app = App::get();
let _ = app.linter_selector_mutex.lock();
let other_pages = app
.linters
.read()
.iter()
.filter_map(|l| l.read().page.to_owned())
.collect::<Vec<_>>();
let page = Page::find_for_check()
.await?
.into_iter()
.filter(|s| !other_pages.contains(s.id()))
.next();
if let Some(page) = &page {
state.write().page = Some(page.id().to_owned());
}
Ok(page)
}

pub async fn run_linter(state: Arc<RwLock<LinterState>>) {
let app = App::get();
loop {
app.linter_notify.notified().await;
loop {
assert!(state.read().page.is_none());
let page = select_page(&*state).await;
match page {
Err(error) => error!(%error,"error selecting page for linting"),
Ok(Some(page)) => {
let title = page.title().to_owned();
async {
let start_time = page
.check_requested_time()
.expect("select_page returned a page that is not requested for check");
match do_lint(page.id().to_owned()).await {
Ok((issues, suggests)) => {
if let Err(error) =
page.set_checked(start_time, issues, suggests).await
{
error!(%error, "failed to mark page as checked");
}
}
Err(error) => {
error!(%error, %page, "failed to check page");
if let Err(error) = page.defer_check().await {
error!(%error, "failed to defer checking page");
}
}
}
}
.instrument(info_span!("lint_page", page = title))
.await
}
Ok(None) => {}
}
}
}
}

pub async fn do_lint(id: Uuid) -> Result<(u32, u32)> {
bail!("not implemented")
}
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ async fn main() -> Result<()> {
tokio::spawn(rcsyncer::run_rc_syncer());
tokio::spawn(db::run_sqlite_interval_optimizer());

linter::run_linter().await;
linter::run_linters().await;

Ok(())
}
3 changes: 3 additions & 0 deletions src/mcw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,6 @@ pub const SYNC_RC: phf::Set<&str> = phf_set!["zh", "en"];
pub const SYNC_RC_PEROID: u64 = 30;

pub const SQLITE_INTERVAL_OPTIMIZE_PEROID: u64 = 60 * 60 * 24;

pub const LINTER_MAX_RETRIES: u32 = 5;
pub const LINTER_RETRY_DELAY: i64 = 10 * 60;
86 changes: 77 additions & 9 deletions src/page.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
use std::collections::BTreeSet;
use std::{collections::BTreeSet, fmt::Display};

use anyhow::{anyhow, bail, Result};
use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use mwbot::generators::{AllPages, Generator};
use sea_orm::{prelude::*, ActiveValue, IntoActiveModel, QuerySelect};
use sea_orm::{prelude::*, ActiveValue, Condition, IntoActiveModel, QuerySelect};
use tokio::sync::Mutex;
use tracing::{error, info, info_span, trace, Instrument};
use uuid::Uuid;

use crate::{
app::App,
db::{self},
linter::LINTER_WORKERS,
site,
};

Expand All @@ -29,6 +30,12 @@ impl Ord for Page {
}
}

impl Display for Page {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.0.id.fmt(f)
}
}

static CREATION_LOCK: Mutex<()> = Mutex::const_new(());

impl Page {
Expand Down Expand Up @@ -111,7 +118,11 @@ impl Page {
}

pub fn check_requested_time(&self) -> Option<DateTime<Utc>> {
self.0.need_check
self.0.need_check.map(|t|t)
}

pub fn check_errors(&self) -> u32 {
self.0.check_errors
}

pub fn issues_count(&self) -> u32 {
Expand All @@ -125,8 +136,9 @@ impl Page {
pub async fn mark_check(self) -> Result<()> {
let mut model = self.0.into_active_model();
model.need_check = ActiveValue::Set(Some(Utc::now()));
model.check_errors = ActiveValue::Set(0);
model.update(&*db::get()).await?;
App::get().linter_notify.notify_waiters();
App::get().linter_notify.notify_one();
Ok(())
}

Expand All @@ -136,23 +148,51 @@ impl Page {
issues: u32,
suggests: u32,
) -> Result<()> {
if self.check_requested_time() != Some(start_time) {
let requested_time = self.check_requested_time();
let drop_result = if requested_time != Some(start_time) {
info!(
lang = self.lang(),
title = self.title(),
"drop check result for page to be checked again"
);
return Ok(());
}
true
} else {
info!(
lang = self.lang(),
title = self.title(),
time = %(Utc::now() - start_time),
issues,
suggests,
"succeeded checking page"
);
false
};
let mut model = self.0.into_active_model();
model.last_checked = ActiveValue::Set(Utc::now());
model.need_check = ActiveValue::Set(None);
if !drop_result {
model.need_check = ActiveValue::Set(None);
}
model.check_errors = ActiveValue::Set(0);
model.issues = ActiveValue::Set(issues);
model.suggests = ActiveValue::Set(suggests);
model.update(&*db::get()).await?;
Ok(())
}

pub async fn defer_check(self) -> Result<()> {
let check_time = self.check_requested_time();
let check_errors = self.check_errors();
let mut model = self.0.into_active_model();
model.need_check = ActiveValue::Set(Some(
check_time
.ok_or_else(|| anyhow!("trying to defer a page not requested for checking"))?
+ Duration::seconds(site::LINTER_RETRY_DELAY),
));
model.check_errors = ActiveValue::Set(check_errors + 1);
model.update(&*db::get()).await?;
Ok(())
}

pub async fn delete(self) -> Result<()> {
self.0.into_active_model().delete(&*db::get()).await?;
Ok(())
Expand All @@ -170,6 +210,34 @@ impl Page {
App::get().linter_notify.notify_waiters();
Ok(())
}

pub async fn count_for_check() -> Result<u64> {
Ok(db::page::Entity::find()
.filter(
Condition::all()
.add(db::page::Column::NeedCheck.is_not_null())
.add(db::page::Column::NeedCheck.lte(Utc::now()))
.add(db::page::Column::CheckErrors.lt(site::LINTER_MAX_RETRIES)),
)
.count(&*db::get())
.await?)
}

pub async fn find_for_check() -> Result<Vec<Self>> {
Ok(db::page::Entity::find()
.filter(
Condition::all()
.add(db::page::Column::NeedCheck.is_not_null())
.add(db::page::Column::NeedCheck.lte(Utc::now()))
.add(db::page::Column::CheckErrors.lt(site::LINTER_MAX_RETRIES)),
)
.limit(Some(*LINTER_WORKERS as u64 * 2))
.all(&*db::get())
.await?
.into_iter()
.map(|p| Self(p))
.collect())
}
}

pub async fn sync_all_pages(lang: &str) -> Result<()> {
Expand Down
Loading

0 comments on commit df1bcfa

Please sign in to comment.