diff --git a/proc_qq/Cargo.toml b/proc_qq/Cargo.toml index 4cf2692..0c7bc26 100644 --- a/proc_qq/Cargo.toml +++ b/proc_qq/Cargo.toml @@ -29,6 +29,8 @@ tokio-socks = { version = "0.5", optional = true } trust-dns-resolver = { version = "0.22", optional = true } url = { version = "2.3", optional = true } +tokio-cron-scheduler = {version = "0.9.4",optional = true} + [target.'cfg(any(target_os = "windows",target_os = "linux",target_os = "macos"))'.dependencies] opener = "0.5" @@ -44,3 +46,4 @@ pop_window_slider = ["dep:wry"] console_qr = ["dep:rqrr", "dep:image"] connect_handler = [] proxy = ["connect_handler", "dep:tokio-socks", "dep:trust-dns-resolver", "dep:url"] +scheduler = ["tokio-cron-scheduler"] \ No newline at end of file diff --git a/proc_qq/src/client.rs b/proc_qq/src/client.rs index ee3118f..758238c 100644 --- a/proc_qq/src/client.rs +++ b/proc_qq/src/client.rs @@ -1,9 +1,6 @@ use crate::handler::EventSender; use crate::DeviceSource::{JsonFile, JsonString}; -use crate::{ - Authentication, ClientHandler, DeviceLockVerification, DeviceSource, EventResultHandler, - Module, SessionStore, ShowQR, ShowSlider, -}; +use crate::{Authentication, ClientHandler, DeviceLockVerification, DeviceSource, EventResultHandler, Module, SessionStore, ShowQR, ShowSlider}; use anyhow::{anyhow, Context, Result}; use bytes::{Buf, BufMut, Bytes, BytesMut}; use futures::future::BoxFuture; @@ -33,6 +30,11 @@ use std::path::Path; #[cfg(feature = "connect_handler")] use std::pin::Pin; +#[cfg(feature = "scheduler")] +use crate::{ + SchedulerJob,SchedulerHandler +}; + /// 客户端 pub struct Client { pub rq_client: Arc, @@ -40,6 +42,8 @@ pub struct Client { pub session_store: Arc>>, pub(crate) modules: Arc>, pub(crate) result_handlers: Arc>, + #[cfg(feature = "scheduler")] + pub(crate) scheduler: Vec, pub show_qr: ShowQR, pub show_slider: ShowSlider, pub shutting: bool, @@ -120,6 +124,17 @@ pub async fn run_client(c: Arc) -> Result<()> { login.await?; } } + +} +#[cfg(feature = "scheduler")] +pub async fn run_scheduler(client: Arc) -> Result<()>{ + let scheduler_job = client.scheduler.clone(); + let handler = SchedulerHandler { + client, + scheduler_job, + }; + handler.start().await?; + Ok(()) } pub async fn run_client_once(client: Arc) -> Result<()> { @@ -535,6 +550,8 @@ pub struct ClientBuilder { session_store: Arc>>, modules_vec: Arc>, result_handlers_vec: Arc>, + #[cfg(feature = "scheduler")] + scheduler: Vec, show_qr: Option, show_slider: Option, device_lock_verification: Option, @@ -553,6 +570,8 @@ impl ClientBuilder { session_store: Arc::new(None), modules_vec: Arc::new(vec![]), result_handlers_vec: Arc::new(vec![]), + #[cfg(feature = "scheduler")] + scheduler:vec![], show_qr: None, show_slider: None, device_lock_verification: None, @@ -573,7 +592,12 @@ impl ClientBuilder { self.result_handlers_vec = e.into(); self } - + /// 设置定时任务 + #[cfg(feature = "scheduler")] + pub fn scheduler>>(mut self, s: S) ->Self{ + self.scheduler = s.into(); + self + } /// 设置显示二维码的方式 pub fn show_rq>>(mut self, show_qr: E) -> Self { self.show_qr = show_qr.into(); @@ -642,6 +666,8 @@ impl ClientBuilder { session_store: self.session_store.clone(), modules: self.modules_vec.clone(), result_handlers: self.result_handlers_vec.clone(), + #[cfg(feature = "scheduler")] + scheduler: self.scheduler.clone(), show_qr: if self.show_qr.is_some() { self.show_qr.clone().unwrap() } else { diff --git a/proc_qq/src/handler/mod.rs b/proc_qq/src/handler/mod.rs index a637f72..ae47d9e 100644 --- a/proc_qq/src/handler/mod.rs +++ b/proc_qq/src/handler/mod.rs @@ -15,6 +15,11 @@ mod events; mod processes; mod results; +#[cfg(feature = "scheduler")] +mod scheduler; +#[cfg(feature = "scheduler")] +pub use scheduler::*; + pub(crate) struct ClientHandler { pub(crate) modules: Arc>, pub(crate) result_handlers: Arc>, @@ -352,6 +357,12 @@ impl Handler for ClientHandler { } } +#[derive(Clone)] +pub struct SchedulerJob { + pub name: String, + pub handles: Vec>>, +} + pub struct Module { pub id: String, pub name: String, diff --git a/proc_qq/src/handler/scheduler.rs b/proc_qq/src/handler/scheduler.rs new file mode 100644 index 0000000..18a8d3f --- /dev/null +++ b/proc_qq/src/handler/scheduler.rs @@ -0,0 +1,39 @@ +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use tokio_cron_scheduler::Job; +use crate::{Client, SchedulerJob}; + + +pub struct SchedulerHandler { + pub(crate) client: Arc, + pub(crate) scheduler_job: Vec, +} + +impl SchedulerHandler { + + /// 启动定时任务执行器 + pub async fn start(&self) -> anyhow::Result<()>{ + let scheduler = tokio_cron_scheduler::JobScheduler::new().await?; + let client = self.client.clone(); + let scheduler_job = self.scheduler_job.clone(); + for job in scheduler_job { + tracing::info!("Add {} Job",job.name); + for job in job.handles { + let _teak = Arc::clone(&job); + let client = Arc::clone(&client); + let job = Job::new_async(_teak.cron().as_str(), move |_, _| _teak.call(client.clone()))?; + scheduler.add(job).await?; + } + } + scheduler.start().await?; + Ok(()) + } + +} + +pub trait ScheduledJobHandler: Sync + Send { + fn cron(&self) -> String; + fn call(&self, bot: Arc) -> Pin + Send + 'static>>; +} + diff --git a/proc_qq_codegen/src/lib.rs b/proc_qq_codegen/src/lib.rs index 0f18f41..bb696ea 100644 --- a/proc_qq_codegen/src/lib.rs +++ b/proc_qq_codegen/src/lib.rs @@ -9,6 +9,7 @@ use syn::punctuated::Punctuated; use syn::spanned::Spanned; use syn::{parse_macro_input, Expr, FnArg, Meta, NestedMeta, Token}; + use crate::event_arg::*; mod bot_command; @@ -500,3 +501,103 @@ pub fn event_fn(args: TokenStream, input: TokenStream) -> TokenStream { } emit!(result) } + +#[proc_macro_error] +#[proc_macro_attribute] +pub fn scheduler_job(args: TokenStream, input: TokenStream) -> TokenStream { + + // 获取#[scheduler_job]的参数 + let attrs = parse_macro_input!(args as syn::AttributeArgs); + // 获取方法 + let method = parse_macro_input!(input as syn::ItemFn); + // 判断是否为async方法 + if method.sig.asyncness.is_none() { + abort!(&method.sig.span(), "必须是async方法"); + } + // 判断事件 + let sig_params = &method.sig.inputs; + let bot_params = match sig_params.first() { + None => abort!(&sig_params.span(), "需要Arc作为参数"), + Some(bot) => { + match bot { + FnArg::Receiver(_) => abort!(&sig_params.span(), "第一个参数不能是self"), + FnArg::Typed(t) => t + } + } + }; + let block = &method.block; + let mut cron = String::new(); + match attrs.first() { + None => { abort!(&method.span(), "cron参数是必须的!") } + Some(nm) => { + if let NestedMeta::Meta(meta) = nm { + if let Meta::NameValue(nv) = meta { + if nv.path.segments.len() != 1 { + abort!(&nv.path.span(), "表达式有且只能有一个片段"); + } + let ident = &nv.path.segments.first().unwrap().ident; + let ident_name = nv.path.segments.first().unwrap().ident.to_string(); + match ident_name.as_str() { + "cron" => match &nv.lit { + syn::Lit::Str(value) => { + cron.push_str(&value.value()); + } + _ => abort!(&ident.span(), "cron只支持字符串类型参数值"), + }, + _ => abort!(&ident.span(), "不支持的参数名称"), + } + } + } else { + abort!(&nm.span(), "必须要一个参数") + } + } + } + let ident = method.sig.ident; + let bot_params_pat = bot_params.pat.as_ref(); + let bot_params_ty = bot_params.ty.as_ref(); + quote!( + #[allow(non_camel_case_types)] + #[derive(Clone)] + pub struct #ident; + + impl ::proc_qq::ScheduledJobHandler for #ident{ + fn cron(&self) -> String { + #cron.to_owned() + } + + fn call(&self, #bot_params_pat: #bot_params_ty) -> ::std::pin::Pin + Send + 'static>> { + let r = self.clone(); + Box::pin(async move{ + r.raw(#bot_params_pat).await; + }) + } + } + impl #ident { + async fn raw(&self,#bot_params_pat: #bot_params_ty) #block + } + ).into() +} + +#[proc_macro_error] +#[proc_macro] +pub fn scheduler(input: TokenStream) -> TokenStream { + let params = parse_macro_input!(input as ModuleParams); + if params.expressions.len() < 1 { + abort!(params.span, "参数数量不足") + } + + let name = syn::parse_str::(¶ms.expressions[0]).expect("name 解析错误"); + let mut handle_builder = String::new(); + for i in 1..params.expressions.len() { + handle_builder.push_str(&format!("::std::sync::Arc::new(Box::new({})),", params.expressions[i])); + } + + let handle_invoker = + syn::parse_str::(&format!("vec![{handle_builder}]")).expect("handle invoker解析错误"); + TokenStream::from(quote! { + ::proc_qq::SchedulerJob { + name: #name.to_owned(), + handles: #handle_invoker, + } + }) +} \ No newline at end of file diff --git a/proc_qq_examples/Cargo.toml b/proc_qq_examples/Cargo.toml index 3df9d01..02daf2d 100644 --- a/proc_qq_examples/Cargo.toml +++ b/proc_qq_examples/Cargo.toml @@ -16,7 +16,7 @@ name = "input-uin-password" path = "src/input_uin_password/main.rs" [dependencies] -proc_qq = { path = "../proc_qq" } +proc_qq = { path = "../proc_qq" ,features = ["scheduler"]} anyhow = "1.0" tokio = { version = "1", features = ["full"] } tracing = "0.1" diff --git a/proc_qq_examples/src/input_uin_password/main.rs b/proc_qq_examples/src/input_uin_password/main.rs index b6cb9b7..13e94ed 100644 --- a/proc_qq_examples/src/input_uin_password/main.rs +++ b/proc_qq_examples/src/input_uin_password/main.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use proc_qq::re_exports::async_trait::async_trait; use proc_qq::re_exports::ricq::version::ANDROID_PHONE; use proc_qq::*; -use proc_qq_examples::hello_module; +use proc_qq_examples::{hello_module, scheduler_handlers}; use proc_qq_examples::init_tracing_subscriber; use proc_qq_examples::result_handlers; @@ -23,10 +23,14 @@ async fn main() { })) .modules(vec![hello_module::module()]) .result_handlers(vec![result_handlers::on_result {}.into()]) + .scheduler(vec![scheduler_handlers::scheduler()]) .build() .await .unwrap(); - run_client(Arc::new(client)).await.unwrap(); + let client = Arc::new(client); + let copy = client.clone(); + run_scheduler(copy).await.unwrap(); + run_client(client).await.unwrap(); } struct InputUinPassword; diff --git a/proc_qq_examples/src/lib.rs b/proc_qq_examples/src/lib.rs index 45d19df..9f93578 100644 --- a/proc_qq_examples/src/lib.rs +++ b/proc_qq_examples/src/lib.rs @@ -4,6 +4,7 @@ use tracing_subscriber::util::SubscriberInitExt; pub mod hello_module; pub mod result_handlers; +pub mod scheduler_handlers; pub fn init_tracing_subscriber() { tracing_subscriber::registry() diff --git a/proc_qq_examples/src/password_login/main.rs b/proc_qq_examples/src/password_login/main.rs index 28b993a..26a7f9c 100644 --- a/proc_qq_examples/src/password_login/main.rs +++ b/proc_qq_examples/src/password_login/main.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use proc_qq_examples::hello_module; use proc_qq_examples::init_tracing_subscriber; use proc_qq_examples::result_handlers; +use proc_qq_examples::scheduler_handlers; #[tokio::main] async fn main() { @@ -19,8 +20,12 @@ async fn main() { })) .modules(vec![hello_module::module()]) .result_handlers(vec![result_handlers::on_result {}.into()]) + .scheduler(vec![scheduler_handlers::scheduler()]) .build() .await .unwrap(); - run_client(Arc::new(client)).await.unwrap(); + let client = Arc::new(client); + let copy = client.clone(); + run_scheduler(copy).await.unwrap(); + run_client(client).await.unwrap(); } diff --git a/proc_qq_examples/src/qr_login/main.rs b/proc_qq_examples/src/qr_login/main.rs index bb102fc..9c2e662 100644 --- a/proc_qq_examples/src/qr_login/main.rs +++ b/proc_qq_examples/src/qr_login/main.rs @@ -5,6 +5,7 @@ use std::sync::Arc; use proc_qq_examples::hello_module; use proc_qq_examples::init_tracing_subscriber; use proc_qq_examples::result_handlers; +use proc_qq_examples::scheduler_handlers; #[tokio::main] async fn main() { @@ -17,8 +18,12 @@ async fn main() { .session_store(FileSessionStore::boxed("session.token")) .modules(vec![hello_module::module()]) .result_handlers(vec![result_handlers::on_result {}.into()]) + .scheduler(vec![scheduler_handlers::scheduler()]) .build() .await .unwrap(); - run_client(Arc::new(client)).await.unwrap(); + let client = Arc::new(client); + let copy = client.clone(); + run_scheduler(copy).await.unwrap(); + run_client(client).await.unwrap(); } diff --git a/proc_qq_examples/src/scheduler_handlers.rs b/proc_qq_examples/src/scheduler_handlers.rs new file mode 100644 index 0000000..1235e34 --- /dev/null +++ b/proc_qq_examples/src/scheduler_handlers.rs @@ -0,0 +1,17 @@ +use std::sync::Arc; +use proc_qq::{Client, ClientTrait, scheduler, scheduler_job, SchedulerJob}; + +/// 每1分钟获取一次 bot uin +#[scheduler_job(cron = "0 0/1 * * * ?")] +async fn handle_scheduler(c:Arc) { + let bot_uin = c.bot_uin().await; + println!("{}", bot_uin); +} + +/// scheduler +pub fn scheduler() -> SchedulerJob { + scheduler!( + "hello_jobs", + handle_scheduler + ) +} \ No newline at end of file