Skip to content

Commit

Permalink
feat(feature): scheduler support
Browse files Browse the repository at this point in the history
feat(marco): scheduler proc macro support
  • Loading branch information
rcoplo committed May 12, 2023
1 parent dda3d45 commit e637217
Show file tree
Hide file tree
Showing 11 changed files with 222 additions and 10 deletions.
3 changes: 3 additions & 0 deletions proc_qq/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ tokio-socks = { version = "0.5", optional = true }
trust-dns-resolver = { version = "0.22", optional = true }
url = { version = "2.3", optional = true }

tokio-cron-scheduler = {version = "0.9.4",optional = true}

[target.'cfg(any(target_os = "windows",target_os = "linux",target_os = "macos"))'.dependencies]
opener = "0.5"

Expand All @@ -44,3 +46,4 @@ pop_window_slider = ["dep:wry"]
console_qr = ["dep:rqrr", "dep:image"]
connect_handler = []
proxy = ["connect_handler", "dep:tokio-socks", "dep:trust-dns-resolver", "dep:url"]
scheduler = ["tokio-cron-scheduler"]
36 changes: 31 additions & 5 deletions proc_qq/src/client.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::handler::EventSender;
use crate::DeviceSource::{JsonFile, JsonString};
use crate::{
Authentication, ClientHandler, DeviceLockVerification, DeviceSource, EventResultHandler,
Module, SessionStore, ShowQR, ShowSlider,
};
use crate::{Authentication, ClientHandler, DeviceLockVerification, DeviceSource, EventResultHandler, Module, SessionStore, ShowQR, ShowSlider};
use anyhow::{anyhow, Context, Result};
use bytes::{Buf, BufMut, Bytes, BytesMut};
use futures::future::BoxFuture;
Expand Down Expand Up @@ -33,13 +30,20 @@ use std::path::Path;
#[cfg(feature = "connect_handler")]
use std::pin::Pin;

#[cfg(feature = "scheduler")]
use crate::{
SchedulerJob,SchedulerHandler
};

/// 客户端
pub struct Client {
pub rq_client: Arc<ricq::Client>,
pub authentication: Authentication,
pub session_store: Arc<Option<Box<dyn SessionStore + Sync + Send>>>,
pub(crate) modules: Arc<Vec<Module>>,
pub(crate) result_handlers: Arc<Vec<EventResultHandler>>,
#[cfg(feature = "scheduler")]
pub(crate) scheduler: Vec<SchedulerJob>,
pub show_qr: ShowQR,
pub show_slider: ShowSlider,
pub shutting: bool,
Expand Down Expand Up @@ -120,6 +124,17 @@ pub async fn run_client(c: Arc<Client>) -> Result<()> {
login.await?;
}
}

}
#[cfg(feature = "scheduler")]
pub async fn run_scheduler(client: Arc<Client>) -> Result<()>{
let scheduler_job = client.scheduler.clone();
let handler = SchedulerHandler {
client,
scheduler_job,
};
handler.start().await?;
Ok(())
}

pub async fn run_client_once(client: Arc<Client>) -> Result<()> {
Expand Down Expand Up @@ -535,6 +550,8 @@ pub struct ClientBuilder {
session_store: Arc<Option<Box<dyn SessionStore + Sync + Send>>>,
modules_vec: Arc<Vec<Module>>,
result_handlers_vec: Arc<Vec<EventResultHandler>>,
#[cfg(feature = "scheduler")]
scheduler: Vec<SchedulerJob>,
show_qr: Option<ShowQR>,
show_slider: Option<ShowSlider>,
device_lock_verification: Option<DeviceLockVerification>,
Expand All @@ -553,6 +570,8 @@ impl ClientBuilder {
session_store: Arc::new(None),
modules_vec: Arc::new(vec![]),
result_handlers_vec: Arc::new(vec![]),
#[cfg(feature = "scheduler")]
scheduler:vec![],
show_qr: None,
show_slider: None,
device_lock_verification: None,
Expand All @@ -573,7 +592,12 @@ impl ClientBuilder {
self.result_handlers_vec = e.into();
self
}

/// 设置定时任务
#[cfg(feature = "scheduler")]
pub fn scheduler<S :Into<Vec<SchedulerJob>>>(mut self, s: S) ->Self{
self.scheduler = s.into();
self
}
/// 设置显示二维码的方式
pub fn show_rq<E: Into<Option<ShowQR>>>(mut self, show_qr: E) -> Self {
self.show_qr = show_qr.into();
Expand Down Expand Up @@ -642,6 +666,8 @@ impl ClientBuilder {
session_store: self.session_store.clone(),
modules: self.modules_vec.clone(),
result_handlers: self.result_handlers_vec.clone(),
#[cfg(feature = "scheduler")]
scheduler: self.scheduler.clone(),
show_qr: if self.show_qr.is_some() {
self.show_qr.clone().unwrap()
} else {
Expand Down
11 changes: 11 additions & 0 deletions proc_qq/src/handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ mod events;
mod processes;
mod results;

#[cfg(feature = "scheduler")]
mod scheduler;
#[cfg(feature = "scheduler")]
pub use scheduler::*;

pub(crate) struct ClientHandler {
pub(crate) modules: Arc<Vec<Module>>,
pub(crate) result_handlers: Arc<Vec<EventResultHandler>>,
Expand Down Expand Up @@ -352,6 +357,12 @@ impl Handler for ClientHandler {
}
}

#[derive(Clone)]
pub struct SchedulerJob {
pub name: String,
pub handles: Vec<Arc<Box<dyn ScheduledJobHandler>>>,
}

pub struct Module {
pub id: String,
pub name: String,
Expand Down
39 changes: 39 additions & 0 deletions proc_qq/src/handler/scheduler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio_cron_scheduler::Job;
use crate::{Client, SchedulerJob};


pub struct SchedulerHandler {
pub(crate) client: Arc<Client>,
pub(crate) scheduler_job: Vec<SchedulerJob>,
}

impl SchedulerHandler {

/// 启动定时任务执行器
pub async fn start(&self) -> anyhow::Result<()>{
let scheduler = tokio_cron_scheduler::JobScheduler::new().await?;
let client = self.client.clone();
let scheduler_job = self.scheduler_job.clone();
for job in scheduler_job {
tracing::info!("Add {} Job",job.name);
for job in job.handles {
let _teak = Arc::clone(&job);
let client = Arc::clone(&client);
let job = Job::new_async(_teak.cron().as_str(), move |_, _| _teak.call(client.clone()))?;
scheduler.add(job).await?;
}
}
scheduler.start().await?;
Ok(())
}

}

pub trait ScheduledJobHandler: Sync + Send {
fn cron(&self) -> String;
fn call(&self, bot: Arc<Client>) -> Pin<Box<dyn Future<Output = ()> + Send + 'static>>;
}

101 changes: 101 additions & 0 deletions proc_qq_codegen/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use syn::punctuated::Punctuated;
use syn::spanned::Spanned;
use syn::{parse_macro_input, Expr, FnArg, Meta, NestedMeta, Token};


use crate::event_arg::*;

mod bot_command;
Expand Down Expand Up @@ -500,3 +501,103 @@ pub fn event_fn(args: TokenStream, input: TokenStream) -> TokenStream {
}
emit!(result)
}

#[proc_macro_error]
#[proc_macro_attribute]
pub fn scheduler_job(args: TokenStream, input: TokenStream) -> TokenStream {

// 获取#[scheduler_job]的参数
let attrs = parse_macro_input!(args as syn::AttributeArgs);
// 获取方法
let method = parse_macro_input!(input as syn::ItemFn);
// 判断是否为async方法
if method.sig.asyncness.is_none() {
abort!(&method.sig.span(), "必须是async方法");
}
// 判断事件
let sig_params = &method.sig.inputs;
let bot_params = match sig_params.first() {
None => abort!(&sig_params.span(), "需要Arc<proc_qq::Client>作为参数"),
Some(bot) => {
match bot {
FnArg::Receiver(_) => abort!(&sig_params.span(), "第一个参数不能是self"),
FnArg::Typed(t) => t
}
}
};
let block = &method.block;
let mut cron = String::new();
match attrs.first() {
None => { abort!(&method.span(), "cron参数是必须的!") }
Some(nm) => {
if let NestedMeta::Meta(meta) = nm {
if let Meta::NameValue(nv) = meta {
if nv.path.segments.len() != 1 {
abort!(&nv.path.span(), "表达式有且只能有一个片段");
}
let ident = &nv.path.segments.first().unwrap().ident;
let ident_name = nv.path.segments.first().unwrap().ident.to_string();
match ident_name.as_str() {
"cron" => match &nv.lit {
syn::Lit::Str(value) => {
cron.push_str(&value.value());
}
_ => abort!(&ident.span(), "cron只支持字符串类型参数值"),
},
_ => abort!(&ident.span(), "不支持的参数名称"),
}
}
} else {
abort!(&nm.span(), "必须要一个参数")
}
}
}
let ident = method.sig.ident;
let bot_params_pat = bot_params.pat.as_ref();
let bot_params_ty = bot_params.ty.as_ref();
quote!(
#[allow(non_camel_case_types)]
#[derive(Clone)]
pub struct #ident;

impl ::proc_qq::ScheduledJobHandler for #ident{
fn cron(&self) -> String {
#cron.to_owned()
}

fn call(&self, #bot_params_pat: #bot_params_ty) -> ::std::pin::Pin<Box<dyn ::std::future::Future<Output = ()> + Send + 'static>> {
let r = self.clone();
Box::pin(async move{
r.raw(#bot_params_pat).await;
})
}
}
impl #ident {
async fn raw(&self,#bot_params_pat: #bot_params_ty) #block
}
).into()
}

#[proc_macro_error]
#[proc_macro]
pub fn scheduler(input: TokenStream) -> TokenStream {
let params = parse_macro_input!(input as ModuleParams);
if params.expressions.len() < 1 {
abort!(params.span, "参数数量不足")
}

let name = syn::parse_str::<Expr>(&params.expressions[0]).expect("name 解析错误");
let mut handle_builder = String::new();
for i in 1..params.expressions.len() {
handle_builder.push_str(&format!("::std::sync::Arc::new(Box::new({})),", params.expressions[i]));
}

let handle_invoker =
syn::parse_str::<Expr>(&format!("vec![{handle_builder}]")).expect("handle invoker解析错误");
TokenStream::from(quote! {
::proc_qq::SchedulerJob {
name: #name.to_owned(),
handles: #handle_invoker,
}
})
}
2 changes: 1 addition & 1 deletion proc_qq_examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ name = "input-uin-password"
path = "src/input_uin_password/main.rs"

[dependencies]
proc_qq = { path = "../proc_qq" }
proc_qq = { path = "../proc_qq" ,features = ["scheduler"]}
anyhow = "1.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
Expand Down
8 changes: 6 additions & 2 deletions proc_qq_examples/src/input_uin_password/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::sync::Arc;
use proc_qq::re_exports::async_trait::async_trait;
use proc_qq::re_exports::ricq::version::ANDROID_PHONE;
use proc_qq::*;
use proc_qq_examples::hello_module;
use proc_qq_examples::{hello_module, scheduler_handlers};
use proc_qq_examples::init_tracing_subscriber;
use proc_qq_examples::result_handlers;

Expand All @@ -23,10 +23,14 @@ async fn main() {
}))
.modules(vec![hello_module::module()])
.result_handlers(vec![result_handlers::on_result {}.into()])
.scheduler(vec![scheduler_handlers::scheduler()])
.build()
.await
.unwrap();
run_client(Arc::new(client)).await.unwrap();
let client = Arc::new(client);
let copy = client.clone();
run_scheduler(copy).await.unwrap();
run_client(client).await.unwrap();
}

struct InputUinPassword;
Expand Down
1 change: 1 addition & 0 deletions proc_qq_examples/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use tracing_subscriber::util::SubscriberInitExt;

pub mod hello_module;
pub mod result_handlers;
pub mod scheduler_handlers;

pub fn init_tracing_subscriber() {
tracing_subscriber::registry()
Expand Down
7 changes: 6 additions & 1 deletion proc_qq_examples/src/password_login/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use proc_qq_examples::hello_module;
use proc_qq_examples::init_tracing_subscriber;
use proc_qq_examples::result_handlers;
use proc_qq_examples::scheduler_handlers;

#[tokio::main]
async fn main() {
Expand All @@ -19,8 +20,12 @@ async fn main() {
}))
.modules(vec![hello_module::module()])
.result_handlers(vec![result_handlers::on_result {}.into()])
.scheduler(vec![scheduler_handlers::scheduler()])
.build()
.await
.unwrap();
run_client(Arc::new(client)).await.unwrap();
let client = Arc::new(client);
let copy = client.clone();
run_scheduler(copy).await.unwrap();
run_client(client).await.unwrap();
}
7 changes: 6 additions & 1 deletion proc_qq_examples/src/qr_login/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::sync::Arc;
use proc_qq_examples::hello_module;
use proc_qq_examples::init_tracing_subscriber;
use proc_qq_examples::result_handlers;
use proc_qq_examples::scheduler_handlers;

#[tokio::main]
async fn main() {
Expand All @@ -17,8 +18,12 @@ async fn main() {
.session_store(FileSessionStore::boxed("session.token"))
.modules(vec![hello_module::module()])
.result_handlers(vec![result_handlers::on_result {}.into()])
.scheduler(vec![scheduler_handlers::scheduler()])
.build()
.await
.unwrap();
run_client(Arc::new(client)).await.unwrap();
let client = Arc::new(client);
let copy = client.clone();
run_scheduler(copy).await.unwrap();
run_client(client).await.unwrap();
}
17 changes: 17 additions & 0 deletions proc_qq_examples/src/scheduler_handlers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use std::sync::Arc;
use proc_qq::{Client, ClientTrait, scheduler, scheduler_job, SchedulerJob};

/// 每1分钟获取一次 bot uin
#[scheduler_job(cron = "0 0/1 * * * ?")]
async fn handle_scheduler(c:Arc<Client>) {
let bot_uin = c.bot_uin().await;
println!("{}", bot_uin);
}

/// scheduler
pub fn scheduler() -> SchedulerJob {
scheduler!(
"hello_jobs",
handle_scheduler
)
}

0 comments on commit e637217

Please sign in to comment.