Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a job scheduler. #7

Open
scott-wilson opened this issue Dec 30, 2022 · 3 comments
Open

Add a job scheduler. #7

scott-wilson opened this issue Dec 30, 2022 · 3 comments
Labels
lang-c Issues related to C lang-cpp Issues related to C++ lang-python Issues related to Python lang-rust Issues related to Rust

Comments

@scott-wilson
Copy link
Owner

The scheduler is responsible for running the checks and getting the results.

A simple scheduler would look something like this in Python:

my_checks = [
    CheckThingA(),
    CheckThingB(),
    CheckThingC(),
]

for check in my_checks:
    result = pychecks.run(check)

But, we might be able to run the checks in multiple threads/async tasks/processes depending on the context. Or, we may have a DAG scheduler so a job can run if its parents pass.

@scott-wilson scott-wilson added lang-rust Issues related to Rust lang-python Issues related to Python lang-c Issues related to C lang-cpp Issues related to C++ labels Jan 9, 2023
@QuincyForbes
Copy link

import asyncio
import croniter
import datetime
import logging
import random
import time

class Check:
    def __init__(self, name, cron, dependencies=None, max_retries=0, retry_delay=60):
        """
        A class representing a single check to be performed.
        
        Args:
            name (str): The name of the check.
            cron (str): A string in cron format that determines when the check should be run.
            dependencies (list of Check, optional): Other checks that must succeed before this check can run.
            max_retries (int, optional): The maximum number of times to retry the check if it fails.
            retry_delay (int, optional): The number of seconds to wait between retries.
        """
        self.name = name
        self.cron = cron
        self.dependencies = dependencies or []
        self.max_retries = max_retries
        self.retry_delay = retry_delay
        self.current_retries = 0
    
    async def run(self):
        """
        Perform the check and return the result.
        """
        # simulate a delay to make the check take some time
        await asyncio.sleep(random.randint(1, 3))
        return f"{self.name} check result"
    
    def should_run(self):
        """
        Determine whether the check should be run based on the current time and its dependencies.
        """
        if self.current_retries >= self.max_retries:
            # don't retry the check if it has already exceeded the maximum number of retries
            return False
        for dep in self.dependencies:
            if not dep.success:
                # don't run the check if any of its dependencies have failed
                return False
        # use croniter to determine whether the check should run based on its cron schedule
        return croniter.croniter(self.cron, datetime.datetime.now()).get_next(datetime.datetime) <= datetime.datetime.now()
    
    async def run_with_retry(self):
        """
        Run the check with retries if it fails.
        """
        while self.should_run():
            try:
                result = await self.run()
                self.success = True
                logging.info(f"{self.name} check succeeded: {result}")
            except Exception as e:
                self.success = False
                self.current_retries += 1
                logging.error(f"{self.name} check failed ({e}). Retrying in {self.retry_delay} seconds...")
                await asyncio.sleep(self.retry_delay)

class Scheduler:
    def __init__(self, checks):
        """
        A class representing a scheduler for running checks.
        
        Args:
            checks (list of Check): The list of checks to be run.
        """
        self.checks = checks
        # create a dictionary of dependencies between checks
        self.dependencies = {c.name: c.dependencies for c in checks}
    
    async def run_checks(self):
        """
        Run the checks in parallel using asyncio.
        """
        # create a dictionary of futures for each check
        futures = {c.name: asyncio.ensure_future(c.run_with_retry()) for c in self.checks}
        while futures:
            # wait for the first future to complete
            done, _ = await asyncio.wait(futures.values(), return_when=asyncio.FIRST_COMPLETED)
            for fut in done:
                # remove the completed future from the dictionary
                del futures[fut.result().name]
                # update the dependencies of other checks that depend on this one
                for c in self.checks:
                    if fut.result().name in self.dependencies[c.name]:
                        c.dependencies.remove(fut.result())
    def start(self):
        """
         Set up logging and start the event loop to run the checks.
        """
        # configure logging
        logging.basicConfig(level=logging.INFO)
        # create a new event loop
        loop = asyncio.get_event_loop()
        try:
            # run the checks using the event loop
            loop.run_until_complete(self.run_checks())
        finally:
            # close the event loop when finished
            loop.close()

@QuincyForbes
Copy link

use chrono::{DateTime, Utc};
use cron::Schedule;
use futures::future::join_all;
use std::collections::{HashMap, HashSet};
use std::error::Error;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;

struct Check {
    name: String,
    cron: Schedule,
    dependencies: Vec<Arc<Mutex<Check>>>,
    max_retries: u32,
    retry_delay: u64,
    current_retries: u32,
}

impl Check {
    async fn run(&self) -> String {
        // perform the check and return the result
        tokio::time::sleep(Duration::from_secs(rand::random::<u64>() % 3 + 1)).await;
        format!("{} check result", self.name)
    }

    fn should_run(&self) -> bool {
        if self.current_retries >= self.max_retries {
            // don't retry the check if it has already exceeded the maximum number of retries
            return false;
        }
        for dep in &self.dependencies {
            if let Ok(dep) = dep.lock() {
                if !dep.success {
                    // don't run the check if any of its dependencies have failed
                    return false;
                }
            }
        }
        // use chrono and cron to determine whether the check should run based on its cron schedule
        self.cron.upcoming(Utc).next().unwrap_or_else(|| Utc.timestamp(0, 0)) <= Utc::now()
    }

    async fn run_with_retry(&mut self) {
        // run the check with retries if it fails
        while self.should_run() {
            match self.run().await {
                Ok(result) => {
                    self.success = true;
                    println!("{} check succeeded: {}", self.name, result);
                }
                Err(e) => {
                    self.success = false;
                    self.current_retries += 1;
                    println!(
                        "{} check failed ({:?}). Retrying in {} seconds...",
                        self.name, e, self.retry_delay
                    );
                    tokio::time::sleep(Duration::from_secs(self.retry_delay)).await;
                }
            }
        }
    }
}

struct Scheduler {
    checks: Vec<Arc<Mutex<Check>>>,
    dependencies: HashMap<String, Vec<Arc<Mutex<Check>>>>,
}

impl Scheduler {
    fn new(checks: Vec<Check>) -> Result<Scheduler, Box<dyn Error>> {
        let checks = checks
            .into_iter()
            .map(|c| Arc::new(Mutex::new(c)))
            .collect();
        let dependencies = checks
            .iter()
            .map(|c| (c.lock().unwrap().name.clone(), vec![]))
            .collect();
        let mut scheduler = Scheduler {
            checks,
            dependencies,
        };
        scheduler.update_dependencies()?;
        Ok(scheduler)
    }

    fn update_dependencies(&mut self) -> Result<(), Box<dyn Error>> {
        for check in &self.checks {
            let mut check = check.lock().unwrap();
            check.dependencies = self
                .dependencies
                .get(&check.name)
                .unwrap_or(&vec![])
                .clone();
        }
        Ok(())
    }

    async fn run_checks(&mut self) -> Result<(), Box<dyn Error>> {
        let mut futures = vec![];
        for check in &self.checks {
            let check = check.clone();
            let fut = async move {
                let mut check = check.lock().unwrap();
                check.run_with_retry().await;
            };
            futures.push(fut);
        }
        join
        all(futures).await;
        Ok(())
    }

    fn start(&mut self) -> Result<(), Box<dyn Error>> {
        // start the scheduler and run the checks
        let rt = tokio::runtime::Builder::new_current_thread()
            .enable_all()
            .build()?;
        rt.block_on(self.run_checks())?;
        Ok(())
    }
}

fn main() -> Result<(), Box<dyn Error>> {
    let checks = vec![
        Check {
            name: "CheckThingA".into(),
            cron: "*/5 * * * * * *".parse()?,
            dependencies: vec![],
            max_retries: 3,
            retry_delay: 1,
            current_retries: 0,
            success: false,
        },
        Check {
            name: "CheckThingB".into(),
            cron: "*/10 * * * * * *".parse()?,
            dependencies: vec![Arc::new(Mutex::new(checks[0].clone()))],
            max_retries: 2,
            retry_delay: 2,
            current_retries: 0,
            success: false,
        },
        Check {
            name: "CheckThingC".into(),
            cron: "*/15 * * * * * *".parse()?,
            dependencies: vec![Arc::new(Mutex::new(checks[0].clone())), Arc::new(Mutex::new(checks[1].clone()))],
            max_retries: 1,
            retry_delay: 3,
            current_retries: 0,
            success: false,
        },
    ];
    let mut scheduler = Scheduler::new(checks)?;
    scheduler.start()?;
    Ok(())
}

@scott-wilson
Copy link
Owner Author

Hey, could you make this into a pull request? This looks like it may work, but easier to review as a PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
lang-c Issues related to C lang-cpp Issues related to C++ lang-python Issues related to Python lang-rust Issues related to Rust
Projects
None yet
Development

No branches or pull requests

2 participants