Skip to content

Commit

Permalink
Init, test send task
Browse files Browse the repository at this point in the history
  • Loading branch information
qiaoruntao committed May 21, 2023
0 parents commit 6320150
Show file tree
Hide file tree
Showing 12 changed files with 469 additions and 0 deletions.
32 changes: 32 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
name: Cargo Build & Test
on:
push:
pull_request:

env:
CARGO_TERM_COLOR: always
MongoStr: mongodb://127.0.0.1:27017/test

jobs:
build_and_test:
name: Rust project - latest
runs-on: ubuntu-latest
strategy:
matrix:
toolchain:
- stable
# - beta
# - nightly
mongodb-version: [ '6.0' ]
steps:
- uses: actions/checkout@v3
- name: Start MongoDB
uses: supercharge/mongodb-github-action@1.8.0
with:
mongodb-version: ${{ matrix.mongodb-version }}
mongodb-replica-set: rs1
# - run: mongodb --eval "db.serverStatus()"
- run: rustup update ${{ matrix.toolchain }} && rustup default ${{ matrix.toolchain }}
- run: cargo build --verbose
- run: cargo test --verbose

2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/target
/Cargo.lock
16 changes: 16 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "mscheduler"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-trait = "0"
mongodb = { version = "2", features = [] }
serde = "1"
tokio = { version = "1", features = ["signal"] }
tracing = { version = "0", features = ["log"] }
[dev-dependencies]
serde_json = "1"
tokio = { version = "1", features = ["rt-multi-thread"] }
90 changes: 90 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
| Feature | Bull | Agenda |
|:---------------------------|:---------------:|:------:|
| Backend | redis | mongo |
| Priorities |||
| Concurrency |||
| Delayed jobs |||
| Global events |||
| Rate Limiter || |
| Pause/Resume |||
| Sandboxed worker |||
| Repeatable jobs |||
| Atomic ops || ~ |
| Persistence |||
| UI |||
| REST API | ||
| Central (Scalable) Queue | ||
| Supports long running jobs | ||
| Optimized for | Jobs / Messages | Jobs |
| Gracefuly stop | | |
| Remote stop | | |
| Multiple worker | | |

## 流程

1. 发布任务
1. 幂等

## worker状态

1. 执行中
worker占用任务成功后立即进入执行中状态, 并设置下次刷新时间
如果距离上次刷新时间超过worker_timeout_ms, 那么认为是超时状态
2. 失败
worker主动设置执行状态为fail. 如果任务失败时worker不在列表中, 状态设置会失败
3. 成功
worker主动设置执行状态为success. 如果任务成功时worker不在列表中, 状态设置会失败
4. 超时
根据worker_timeout_ms推导得到的状态

## 任务状态

任务状态由当前worker列表决定

1. 未启动
列表为空
2. 执行中
列表中存在执行中worker
3. 成功
列表中只有success的worker
4. 失败
列表中只有fail的worker

## 提供功能

1. 发布任务(幂等)
查找key对应的没有运行中的任务
没有找到=>setOnInsert
找到了=>set nothing, 按配置决定是否清除失败/成功的worker
2. 占用任务(多worker)
查找条件: 判断是否被指定worker_id, 任务状态不是成功, 任务没有被当前worker处理过(不能在worker列表中), 可以接受更多的worker
排序条件: priority最高, 优先被指定worker_id的
没有找到=>不做操作
找到了=>增加自己的worker对象, 并且过滤worker列表(清除超时的运行中worker)
3. 维持任务
查找条件: key相同, worker id相同,
找到了=> 更新对应的超时时间
没找到=> 结束当前任务
4. 任务执行成功
尝试更新任务状态为成功
5. 任务执行失败
主动返回任务失败, 尝试更新任务状态为失败
6. 任务执行异常
按option中设置重试, 并更新重试次数, 如果重试次数超过限制那么更新为任务失败. 重试过程只在本地发生, 所有重试次数失败后更新为失败状态

## 配置更新后的影响

1. specific_worker_ids变动
如果更新后不允许当前worker执行, 那么结束任务
2. ping_interval_ms变动
下次ping时生效
3. 其他参数变动
不影响正在执行中的任务, 变动需要在重新占用任务时体现

## 具体实现
### 发布任务
直接发送就行
### 消费任务
核心问题: 下一次什么时候去占用任务
1. 启动时计算下一次时间next_try_time
2. 使用change_stream实时更新next_try_time
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod tasker;
22 changes: 22 additions & 0 deletions src/tasker/consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
pub struct TaskConsumerFunc {}

impl TaskConsumerFunc {
pub async fn consumer() {}
}

pub struct TaskConsumerConfig {
// specific this worker's version, used to choose which task to run
worker_version: Option<u32>,
// specific this worker's id, used to remote control worker behavior, also can be used to choose which task to run
worker_id: Option<String>,
// whether this worker should continue to try to accept tasks
allow_consume: bool,

}

pub struct TaskConsumer {}

impl TaskConsumer {
pub async fn update_config(&self, config: TaskConsumerConfig) {}
pub async fn start(&self) {}
}
8 changes: 8 additions & 0 deletions src/tasker/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#[derive(Debug)]
pub enum MSchedulerError {
AddTaskFailed,
// mongo db returns an error that cannot get handled
MongoDbError(mongodb::error::Error),
}

pub type MResult<T> = Result<T, MSchedulerError>;
5 changes: 5 additions & 0 deletions src/tasker/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pub mod producer;
pub mod consumer;
pub mod error;
pub mod task;
pub mod task_common;
122 changes: 122 additions & 0 deletions src/tasker/producer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
use mongodb::bson::{Bson, DateTime, doc, to_bson, to_document};
use mongodb::Collection;
use mongodb::options::UpdateOptions;
use serde::Serialize;

use crate::tasker::error::{MResult, MSchedulerError};
use crate::tasker::task::{Task, TaskOption, TaskState};

pub struct TaskProducer<T, K> {
task_collection: Collection<Task<T, K>>,
}

#[derive(Debug, Clone)]
pub struct SendTaskOption {
// whether to update existing task params
pub update_existing_params: bool,
// specific a custom task start time
pub run_time: Option<DateTime>,
// clean up existing task's success worker states
pub clean_success: bool,
// clean up existing task's failed worker states
pub clean_failed: bool,
// TODO: more options in task_option
}

impl Default for SendTaskOption {
fn default() -> Self {
SendTaskOption {
update_existing_params: false,
run_time: None,
clean_success: false,
clean_failed: false,
}
}
}

pub struct SendTaskResult {
insert_new: bool,
update_existing: bool,
}

impl<T: Serialize, K: Serialize> TaskProducer<T, K> {
pub fn create(collection: Collection<Task<T, K>>) -> MResult<TaskProducer<T, K>> {
Ok(TaskProducer {
task_collection: collection,
})
}

/// send a task
pub async fn send_task(&self, key: impl AsRef<str>, params: Option<T>, option: Option<SendTaskOption>) -> MResult<SendTaskResult> {
let send_option = option.unwrap_or_default();

let query = doc! { "key": key.as_ref()};
let now = DateTime::now();
let start_time = send_option.run_time.clone().unwrap_or(now);
let task_state = TaskState {
create_time: now,
start_time,
worker_states: vec![],
};
let task_option = TaskOption {
priority: 0,
concurrent_worker_cnt: 1,
ping_interval_ms: 30_000,
worker_timeout_ms: 60_000,
min_worker_version: 0,
specific_worker_ids: vec![],
max_unexpected_retries: 3,
unexpected_retry_delay_ms: 10_000,
};
let mut update_part = doc! {
"$setOnInsert": doc! {
"key":key.as_ref(),
"task_state.create_time":now,
"task_state.start_time":start_time,
"task_state.worker_states":Bson::Array(vec![]),
"task_option":to_document(&task_option).unwrap(),
},
};
let mut updates = vec![];
// decide where to put params, params can appear in either $setOnInsert or $set
if send_option.update_existing_params {
updates.push(("params", to_bson(&params).unwrap()));
} else {
let set_on_insert_part = update_part.get_mut("$setOnInsert").unwrap();
let set_on_insert_doc = set_on_insert_part.as_document_mut().unwrap();
set_on_insert_doc.insert("params", to_bson(&params).unwrap());
}
// update task run time if specific
if let Some(_) = send_option.run_time {
let set_on_insert_part = update_part.get_mut("$setOnInsert").unwrap();
let set_on_insert_doc = set_on_insert_part.as_document_mut().unwrap();
let task_state_part = set_on_insert_doc.remove("task_state.start_time").unwrap();
updates.push(("task_state.start_time", task_state_part));
}
if !updates.is_empty() {
let mut document = doc! {};
for update in updates {
document.insert(update.0, update.1);
}
update_part.insert("$set", document);
}
let mut update_options = UpdateOptions::default();
update_options.upsert = Some(true);

match self.task_collection.update_one(query, update_part, Some(update_options)).await {
Ok(v) => {
if v.upserted_id.is_some() {
Ok(SendTaskResult { insert_new: true, update_existing: false })
} else if v.matched_count == 1 {
// TODO: check modified count
Ok(SendTaskResult { insert_new: false, update_existing: true })
} else {
Err(MSchedulerError::AddTaskFailed)
}
}
Err(e) => {
Err(MSchedulerError::MongoDbError(e))
}
}
}
}
60 changes: 60 additions & 0 deletions src/tasker/task.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use mongodb::bson::DateTime;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
pub struct TaskWorkerState {
// worker identifier
pub worker_id: String,
// how many unexpected retry has occurred
pub unexpected_retry_cnt: u32,
// when can other workers accept this task
pub ping_expire_time: Option<DateTime>,
// when did this task success
pub success_time: Option<DateTime>,
// when did this task failed
pub fail_time: Option<DateTime>,
}

#[derive(Deserialize, Serialize)]
pub struct TaskState {
// when did this task created
pub create_time: DateTime,
// when should this task run
pub start_time: DateTime,
// worker ids of which are running this task
pub worker_states: Vec<TaskWorkerState>,
}

#[derive(Deserialize, Serialize)]
pub struct TaskOption {
// priority
pub priority: u32,
// how many workers are allowed to run this task concurrently
pub concurrent_worker_cnt: u32,
// the interval to update worker running state
pub ping_interval_ms: u32,
// after which time other worker can replace timeout worker
pub worker_timeout_ms: u32,
// minimum worker version to run this task
pub min_worker_version: u32,
// only allow these workers to run this task
pub specific_worker_ids: Vec<String>,
// how many unexpected error are allowed to occur
pub max_unexpected_retries: u32,
// retry delay when unexpected error is occurred
pub unexpected_retry_delay_ms: u32,
}

#[derive(Deserialize, Serialize)]
pub struct Task<T, K> {
// identifier
pub key: String,
// record task running state
pub task_state: TaskState,
// task running options
pub task_option: TaskOption,
// task parameters
pub params: Option<T>,
// task return values
pub returns: Option<K>,
}

0 comments on commit 6320150

Please sign in to comment.