**Montréal-Python 57: POTM (Project of the Month)**

APScheduler: Advanced Python Scheduler


Monday April 14th 2016




**What is APScheduler**

APScheduler is a task scheduling and management system written in Python.

Think of it as a cron/at daemon running inside your application is not far off, but APScheduler also provides management and monitoring of jobs, and much more.


**The author**

Alex Grönholm, the main author of APScheduler. is self employed as a contractor. 

He is living in Finland.

** Why **

Alex built APScheduler while he was working on a Java project which was relying on Quartz (https://quartz-scheduler.org/) 

<img width="400px" src="https://raw.githubusercontent.com/mlhamel/mp57.apscheduler/master/6018_3568.jpeg" />

He wanted to migrate everything to Python, but the only scheduling alternative on Python was Celery

<img width="400px" src="https://raw.githubusercontent.com/mlhamel/mp57.apscheduler/master/celery2.jpeg">




Celery is a distributed task queue with basic scheduling capabilities, while APScheduler is a full featured scheduler with basic task queuing capabilities. 

APScheduler tend to be easier to setup and to integrate with an existing solution.



**How to Install**

``` bash
$ pip install apscheduler
```

**How does it work**

**Interval**

In [4]:
from apscheduler.schedulers.background import BackgroundScheduler

def myfunc():
    pass

def myfunc2():
    pass

scheduler = BackgroundScheduler()
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')

**Cron-like**

In [None]:
scheduler.add_job(myfunc, trigger='cron', minute='*', second="*/5")
scheduler.add_job(myfunc2, trigger='cron', day_of_week="2")
scheduler.add_job(myfunc2, trigger='cron', day_of_week="sat,sun")

# More informations at http://apscheduler.readthedocs.org/en/latest/modules/triggers/cron.html?highlight=cron

**Date-based**

In [None]:
sched.add_job(myfunc, 'date', run_date=date(2009, 11, 6), args=['text'])

**Thread and Process Pool**

APScheduler could also be used to run long running task in the background

In [7]:
from pytz import utc
from apscheduler.executors.pool import ProcessPoolExecutor
from apscheduler.schedulers.background import BackgroundScheduler

executors = {
    'default': {'type': 'threadpool', 'max_workers': 20},
    'processpool': ProcessPoolExecutor(max_workers=5)
}
job_defaults = {
    'coalesce': False,
    'max_instances': 3
}

def my_producer_job():
    while True:
        pass # produce stuff and save it the database
        
def my_consumer_job():
    pass # read from the database
        
                
scheduler = BackgroundScheduler()

producer = scheduler.add_job(my_producer_job, name="producer", executor="default")
consumer = scheduler.add_job(my_consumer_job, name="consumer", executor="processpool")

scheduler.configure(executors=executors, job_defaults=job_defaults, timezone=utc)

scheduler.start()

**Advanced functionalities**

In [10]:
# event listener
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR

def my_listener(event):
    if event.exception:
        print('The job crashed :(')
    else:
        print('The job worked :)')

scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

In [16]:
# custom trigger

import random

from apscheduler.triggers.base import BaseTrigger

def myfunc():
    pass


class MyTrigger(BaseTrigger):
    def get_next_fire_time(self, previous_fire_time, now):
        if previous_fire_time is None:
            return
        random_value = random.randint(1, 10)
        if (now - previous_fire_time).seconds > random_value:
            return datetime.datetime(now.replace(minute=random_value))
        
        
trigger = MyTrigger()

job = scheduler.add_job(myfunc, MyTrigger(), id="my_job_id")

In [17]:
# Limiting the number of concurrently executing instances

job.modify(max_instances=6)

In [None]:
# reschedule an existing job

scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')

In [None]:
# to get all the running jobs

scheduler.get_jobs()

In [None]:
# shuting down the scheduler

scheduler.shutdown()

# or more violently

scheduler.shutdown(wait=False)

**Jobstores**

By default, the memory jobstore is used but you can always choose a different one like.

In [None]:
from datetime import datetime, timedelta

from apscheduler.schedulers.blocking import BlockingScheduler

def alarm(time):
    print('Alarm! This alarm was scheduled at %s.' % time)
    
scheduler = BlockingScheduler()
scheduler.add_jobstore('sqlalchemy', url='sqlite:///example.sqlite')

scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
scheduler.start()

``` sql
CREATE TABLE apscheduler_jobs (
	id VARCHAR(191) NOT NULL,
	next_run_time FLOAT,
	job_state BLOB NOT NULL,
	PRIMARY KEY (id)
);

CREATE INDEX ix_apscheduler_jobs_next_run_time 
ON apscheduler_jobs(next_run_time);
```

**Thanks**

Mathieu Leduc-Hamel<br/>
@mlhamel<br/>
mathieu@montrealpython.org<br/>
mathieu.lh@shopify.com<br/>

More informations about APScheduler and sources of this presentation:

* https://github.com/mlhamel/mp57.apscheduler
* https://apscheduler.readthedocs.org/en/latest/
* https://github.com/agronholm/apscheduler
* https://github.com/asphalt-framework/asphalt