# Create Job


## by api


In [None]:
import schedule
import time


def job():
    print("I'm working...")


schedule.every(3).seconds.do(job)
schedule.every(3).minutes.do(job)
schedule.every(3).hours.do(job)
schedule.every(3).days.do(job)
schedule.every(3).weeks.do(job)
while True:
    schedule.run_pending()
    time.sleep(1)

## by decorator


In [None]:
from schedule import every, repeat, run_pending
import time


@repeat(every(3).seconds)
def job():
    print("I am a scheduled job")


while True:
    run_pending()
    time.sleep(1)

## arg for job


In [None]:
import schedule
import time


def greet(name):
    print("Hello", name)


schedule.every(2).seconds.do(greet, name="Alice")

while True:
    schedule.run_pending()
    time.sleep(1)

In [None]:
import schedule
import time


@schedule.repeat(schedule.every(4).seconds, "World")
@schedule.repeat(schedule.every(8).seconds, "Mars")
def hello(planet):
    print("Hello", planet)


while True:
    schedule.run_pending()
    time.sleep(1)

## random intervals job


In [None]:
import schedule
import time


def my_job():
    print("Foo")


# Run every 2 to 5 seconds.
schedule.every(2).to(5).seconds.do(my_job)

while True:
    schedule.run_pending()
    time.sleep(1)

## run until


In [None]:
import schedule
from datetime import datetime, timedelta, time


def job():
    print("Boo")


# run job until a 18:30 today
schedule.every(1).hours.until("18:30").do(job)

# run job until a 2030-01-01 18:33 today
schedule.every(1).hours.until("2030-01-01 18:33").do(job)

# Schedule a job to run for the next 8 hours
schedule.every(1).hours.until(timedelta(hours=8)).do(job)

# Run my_job until today 11:33:42
schedule.every(1).hours.until(time(11, 33, 42)).do(job)

# run job until a specific datetime
schedule.every(1).hours.until(datetime(2020, 5, 17, 11, 36, 20)).do(job)

## Run Once


In [None]:
import schedule
import time


def job_that_executes_once():
    print("once run,once bloom")
    return schedule.CancelJob


schedule.every(5).seconds.do(job_that_executes_once)

while True:
    schedule.run_pending()
    time.sleep(1)

## run all jobs now,not schedule


In [2]:
import schedule


def job_1():
    print("Foo")


def job_2():
    print("Bar")


schedule.every().monday.at("12:40").do(job_1)
schedule.every().tuesday.at("16:40").do(job_2)

schedule.run_all()

Hello
Hello
Foo
Bar


# Get Job


## get all


In [2]:
import schedule


job = schedule.every().day.at("22:30").do(lambda x: print("gogo"))
print(len(schedule.get_jobs()))
print(schedule.get_jobs())

1
[Every 1 day at 22:30:00 do <lambda>() (last run: [never], next run: 2024-09-06 22:30:00)]


## get by job tag


In [1]:
import schedule


def greet(name):
    print("Hello {}".format(name))


schedule.every().day.do(greet, "Andrea").tag("daily-tasks", "friend")
schedule.every().hour.do(greet, "John").tag("hourly-tasks", "friend")
schedule.every().hour.do(greet, "Monica").tag("hourly-tasks", "customer")
schedule.every().day.do(greet, "Derek").tag("daily-tasks", "guest")

friends = schedule.get_jobs("friend")
friends

[Every 1 day do greet('Andrea') (last run: [never], next run: 2024-09-07 11:12:53),
 Every 1 hour do greet('John') (last run: [never], next run: 2024-09-06 12:12:53)]

## get next execution

获取下一个作业计划运行前的秒数


In [None]:
import schedule
import time


def job():
    print("Hello")


schedule.every(15).seconds.do(job)
schedule.every(5).seconds.do(job)

while 1:
    n = schedule.idle_seconds()
    print(n)
    if n is None:
        # no more jobs
        break
    elif n > 0:
        # sleep exactly the right amount of time
        time.sleep(n)
    schedule.run_pending()

# Cancel Job


## Cancel One


In [1]:
import schedule


def some_task():
    print("Hello world")


job = schedule.every().day.at("22:30").do(some_task)
print(type(job))
print(len(schedule.get_jobs()))
schedule.cancel_job(job)
print(len(schedule.get_jobs()))

<class 'schedule.Job'>
1
0


## Cancel All


In [None]:
import schedule


def greet(name):
    print("Hello {}".format(name))


schedule.every().second.do(greet)

schedule.clear()

# Exception Handling

1. 任何被 schedule 执行的 job，抛出异常后,都会打断其他 job 执行，因此需要捕获 job 异常
2. ,异常 job 本身也不会继续定时执行


In [None]:
import functools
import time
import schedule


def catch_exceptions(cancel_on_failure=False):
    def catch_exceptions_decorator(job_func):
        @functools.wraps(job_func)
        def wrapper(*args, **kwargs):
            try:
                return job_func(*args, **kwargs)
            except Exception:
                import traceback

                print(traceback.format_exc())
                if cancel_on_failure:
                    return schedule.CancelJob

        return wrapper

    return catch_exceptions_decorator


@catch_exceptions(cancel_on_failure=True)
def bad_task():
    return 1 / 0


schedule.every(5).seconds.do(bad_task)

while True:
    time.sleep(1)
    schedule.run_pending()

# Multiple schedulers


In [None]:
import schedule


def fooJob():
    print("Foo")


scheduler1 = schedule.Scheduler()
scheduler2 = schedule.Scheduler()

scheduler1.every().hour.do(fooJob)
scheduler2.every().second.do(fooJob)