Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

paused jobs are not restored after app restart #107

Closed
technikamateur opened this issue Apr 28, 2019 · 19 comments
Closed

paused jobs are not restored after app restart #107

technikamateur opened this issue Apr 28, 2019 · 19 comments

Comments

@technikamateur
Copy link

Hi,
I am trying to build my own little Flask app to manage the cleaning of specific folders on my home server.
I use the sqlalchemy jobstore, to persist my jobs, so I created a file, containing diffrent methods for diffrent jobs - all are using the intervall trigger.
Now I want to be able to disable the cleaning of some folders. So, I wrote a little web app, which does this fine, but the behavior of apschedule is a bit strange:
When I disable a job, the next runtime in the database is set to Null. If I perform a restart (of my server), all paused jobs are getting a new next runtime.
After my opinion the jobstate (paused/not paused) should be persited over restarts. Is there a way to get this behavior?

@vale981
Copy link

vale981 commented May 17, 2019

To elaborate a little:

  • jobs have to be created before they can be loaded from the jobstore
  • if they are created as recurring jobs and have been stored in the paused state, this state won't be restored upon loading them from the jobstore

@viniciuschiele
Copy link
Owner

Hi,

Sorry for the delay on it.

Are you adding the jobs every time the app runs? If so, are you using the flag replace_existing=True?

@vale981
Copy link

vale981 commented May 19, 2019

Hi thanks to reply.
I think he had to add the jobs before he could load them from the jobstore. He didn't use the replace_existing flag as far as I know.

@viniciuschiele
Copy link
Owner

Would you be able to provide an example code that replicates the issue?

I'm trying to replicate the issue using the advanced example using a persisted sqlite database like SQLAlchemyJobStore(url='sqlite:///tmp.db') and when I restart the app the task is kept as paused.

The only way I was able to reproduce the issue was using adding the jobs every time the app runs and using the flag replace_existing=True, this would override the job every time the app runs.

@technikamateur
Copy link
Author

technikamateur commented May 19, 2019

Here is an example:
init.py

import dirkules.config as config

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_apscheduler import APScheduler

app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)

import dirkules.models
db.create_all()

scheduler = APScheduler()
from dirkules import tasks
scheduler.init_app(app)
scheduler.start()

import dirkules.views

tasks.py

from dirkules import scheduler
import datetime

@scheduler.task('interval', id='refresh_disks', seconds=2, next_run_time=datetime.datetime.now())
def refresh_disks():
    print("Executed")

config.py

import os
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
#from apscheduler.jobstores.memory import MemoryJobStore

baseDir = os.path.abspath(os.path.dirname(__file__))
staticDir = os.path.join(baseDir, 'static')

SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(baseDir, 'dirkules.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False

# The SCHEDULER_JOB_DEFAULTS configuration is per job, that means each job can execute at most 3 threads at the same time.
# The SCHEDULER_EXECUTORS is a global configuration, in this case, only 1 thread will be used for all the jobs.
# I believe the best way for you is to use max_workers: 1 when running locally

SCHEDULER_JOBSTORES = {'default': SQLAlchemyJobStore(url='sqlite:///' + os.path.join(baseDir, 'dirkules.db'))}
#SCHEDULER_JOBSTORES = {'default': MemoryJobStore()}

SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 3}}

SCHEDULER_JOB_DEFAULTS = {'coalesce': False, 'max_instances': 1}

SCHEDULER_API_ENABLED = True

views.py

from dirkules import scheduler
from flask import Flask, render_template
@app.route('/', methods=['GET'])
def index():
    scheduler.pause_job("refresh_disks")
    return render_template('index.html')

If I put the line from dirkules import tasks behind scheduler.start() I'm getting the following error message, because the job is unknown while reconstructing.

Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/apscheduler/jobstores/sqlalchemy.py", line 141, in _get_jobs
    jobs.append(self._reconstitute_job(row.job_state))
  File "/usr/local/lib/python3.6/dist-packages/apscheduler/jobstores/sqlalchemy.py", line 128, in _reconstitute_job
    job.__setstate__(job_state)
  File "/usr/local/lib/python3.6/dist-packages/apscheduler/job.py", line 272, in __setstate__
    self.func = ref_to_obj(self.func_ref)
  File "/usr/local/lib/python3.6/dist-packages/apscheduler/util.py", line 292, in ref_to_obj
    raise LookupError('Error resolving reference %s: error looking up object' % ref)
LookupError: Error resolving reference dirkules.tasks:refresh_disks: error looking up object
[2019-05-19 17:07:40 +0200] [4078] [INFO] Booting worker with pid: 4078
[2019-05-19 17:08:11 +0200] [4005] [CRITICAL] WORKER TIMEOUT (pid:4078)
[2019-05-19 17:08:11 +0200] [4078] [INFO] Worker exiting (pid: 4078)
Unable to restore job "refresh_disks" -- removing it
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/apscheduler/util.py", line 289, in ref_to_obj
    obj = getattr(obj, name)
AttributeError: module 'dirkules.tasks' has no attribute 'refresh_disks'

Maybe next_run_time overrides the paused job setting?

@viniciuschiele
Copy link
Owner

Thanks for the code above.

When using the decorator @scheduler.task, the task gets replaced every time you run the app, this is out of my control as this logic comes from APScheduler library.

@technikamateur
Copy link
Author

Thanks a lot for your help so far.
What's the recommended way to do it right? Should I create the job inside the config.py, like shown in your advanced example?

@viniciuschiele
Copy link
Owner

The only difference on creating the job inside the config.py is that you can control the flag replace_existing=True|False, default is False.

Using replace_existing=False would raise an error if the job is already inserted in database.

A workaround would be to change the SQLALchemyJobStore to ignore the conflict error.

from apscheduler.jobstores.base import ConflictingIdError
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

class SQLJobStore(SQLAlchemyJobStore):
    def add_job(self, job):
        try:
            super().add_job(job)
        except ConflictingIdError:
            pass

SCHEDULER_JOBSTORES = {'default': SQLJobStore(url='sqlite:///' + os.path.join(baseDir, 'dirkules.db'))}

@technikamateur
Copy link
Author

Hi just another little question @viniciuschiele :
I followed the example in advanced.py, but I am getting the following message on shutdown:
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140146139821824 and this is thread id 140146260215616.
May you say what I did wrong? The only thing I changed from advanced.py is, that I moved the function of the job to a diffrent file, named tasks.py. This shouldn't cause that issue?!

My job definition inside config.py looks like the following:

JOBS = [
    {
        'id': 'refresh_disks',
        'func': 'dirkules.tasks:refresh_disks',
        'trigger': 'interval',
        'next_run_time': datetime.datetime.now(),
        'replace_existing': True,
        'seconds': 3600
    }
]

@viniciuschiele
Copy link
Owner

viniciuschiele commented Aug 18, 2019

I'm not able to reproduce the error, I know that SQLite doesn't allow multi-threads, for some reason your code is shutting down the scheduler in a different thread/

Try disabling the check for multi-threads to see what is going to happen.

SCHEDULER_JOBSTORES = {
    'default': SQLAlchemyJobStore(url='sqlite:///test.db', engine_options={
        'connect_args': {'check_same_thread': False}
    })
}

@technikamateur
Copy link
Author

technikamateur commented Aug 18, 2019

I would like to find out why this happens if you are not able to reproduce...
Here is my config:
config.py

import os
import datetime
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore

# from apscheduler.jobstores.memory import MemoryJobStore

baseDir = os.path.abspath(os.path.dirname(__file__))
staticDir = os.path.join(baseDir, 'static')

SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(baseDir, 'dirkules.db')
SQLALCHEMY_TRACK_MODIFICATIONS = False

# The SCHEDULER_JOB_DEFAULTS configuration is per job, that means each job can execute at most 3 threads at the same time.
# The SCHEDULER_EXECUTORS is a global configuration, in this case, only 1 thread will be used for all the jobs.
# I believe the best way for you is to use max_workers: 1 when running locally

SCHEDULER_JOBSTORES = {'default': SQLAlchemyJobStore(url='sqlite:///' + os.path.join(baseDir, 'dirkules_tasks.db'))}
# SCHEDULER_JOBSTORES = {'default': MemoryJobStore()}

SCHEDULER_EXECUTORS = {'default': {'type': 'threadpool', 'max_workers': 3}}

SCHEDULER_JOB_DEFAULTS = {'coalesce': False, 'max_instances': 1}

SCHEDULER_API_ENABLED = True

# should not be here in final version
SECRET_KEY = b'gf3iz3V!R83@Ny!ri'

JOBS = [
    {
        'id': 'refresh_disks',
        'func': 'dirkules.tasks:refresh_disks',
        'trigger': 'interval',
        'next_run_time': datetime.datetime.now(),
        'replace_existing': True,
        'seconds': 3600
    }
]

init.py

import dirkules.config as config

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_apscheduler import APScheduler

app = Flask(__name__)
app.config.from_object(config)
db = SQLAlchemy(app)

import dirkules.models

# create db if not exists
db.create_all()
# start scheduler
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
# import views
import dirkules.views

tasks.py

import dirkules.manager.driveManager as drive_man


def refresh_disks():
    drive_man.get_drives()
    print("Drives haha refreshed")

and finally the get_drives function used in tasks. It reads some drives and stores them in dirkules.db

def get_drives():
    current_time = datetime.datetime.now()
    drive_dict = hardware_drives.get_all_drives()
    for drive in drive_dict:
        drive_obj = Drive(
            drive.get("name"), drive.get("model"), drive.get("serial"),
            drive.get("size"), drive.get("rota"), drive.get("rm"),
            drive.get("hotplug"), drive.get("state"), drive.get("smart"), current_time)
      db.session.add(drive_obj)
      db.session.commit()

May the problem occur because I'm accessing a diffrent database inside the job from another database? With MemoryJobStore everything works fine.

Thanks for your help!

@viniciuschiele
Copy link
Owner

Your code above works just fine for me.

Do you see this error when the task runs or when you shutdown the scheduler?

@technikamateur
Copy link
Author

Only on shutdown (using ctrl + c). Here is the hole message. Maybe it is more helpful.

Drives haha refreshed
^CException during reset or similar
Traceback (most recent call last):
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 680, in _finalize_fairy
    fairy._reset(pool)
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 867, in _reset
    pool._dialect.do_rollback(self)
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 502, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140400320026368 and this is thread id 140400438904640.
Exception closing connection <sqlite3.Connection object at 0x7fb17e9fec70>
Traceback (most recent call last):
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 680, in _finalize_fairy
    fairy._reset(pool)
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 867, in _reset
    pool._dialect.do_rollback(self)
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 502, in do_rollback
    dbapi_connection.rollback()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140400320026368 and this is thread id 140400438904640.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/pool/base.py", line 270, in _close_connection
    self._dialect.do_close(connection)
  File "/home/daniel/.local/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 508, in do_close
    dbapi_connection.close()
sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140400320026368 and this is thread id 140400438904640.

You could just clone my repo. I'm using gunicorn3 to start the project. But that shouldn't cause the issue.

@technikamateur
Copy link
Author

technikamateur commented Aug 30, 2019

I just took a look of the threads, using threading.enumerate()
After Startup, the following threads are running:

[<_MainThread(MainThread, started 140396238772032)>, <Thread(APScheduler, started daemon 140396127033088)>, <Thread(ThreadPoolExecutor-0_0, started daemon 140396118640384)>]

On shutdown:

sqlite3.ProgrammingError: SQLite objects created in a thread can only be used in that same thread. The object was created in thread id 140396118640384 and this is thread id 140396238772032.

The Main Thread is trying to handle an object which was created in ThreadPoolExecutor

If I enable DEBUG logging using:

import logging

logging.basicConfig()
logging.getLogger('apscheduler').setLevel(logging.DEBUG)

I see the following in my trace:
ERROR:sqlalchemy.pool.impl.NullPool:Exception closing connection <sqlite3.Connection object at 0x7f35dcebde30> instead of Exception closing connection <sqlite3.Connection object at 0x7f35dcebde30>

@technikamateur
Copy link
Author

Adding db.session.close() to the get_drives method solves the problem. But I don't know exactly why?!

@viniciuschiele
Copy link
Owner

The error happens because in the last lines, you leave a session uncommitted when the variable old_drives is empty. Then when you shut down the app another thread is going to roll back this session and SQLite doesn't allow that.

So instead of this:

    old_drives = db.session.query(Drive).filter(Drive.last_update != current_time).all()
    if old_drives:
        for drive in old_drives:
            drive.missing = True
            db.session.commit()

Use that:

    old_drives = db.session.query(Drive).filter(Drive.last_update != current_time).all()
    if old_drives:
        for drive in old_drives:
            drive.missing = True
    db.session.commit()

This error also happens when an unhandled exception occurs within the task leaving the session uncommitted, so to prevent this error you need to have a try/except that rollback the session.

try:
    ...
except:
    db.session.rollback()

@technikamateur
Copy link
Author

Okay! Thanks a lot for your help!
Maybe a dumb question, but why do I need to commit the session? I just performed a query without modifying something if old_drives is empty.

@viniciuschiele
Copy link
Owner

Well, you have to always either commit or rollback a session even in queries, those actions are going to release the session (state/resources) from memory.

Flask-SQLAlchemy already does that for you after a request, it calls .rollback if you haven't committed it so that you don't need to worry about it.

WIthin tasks (when using APScheduler), sessions are kept alive if you don't call .rollback or .commit and when the app shutdowns the main thread will try to close all open sessions.

SQLite is "single-thread", only the thread that creates the session is able to close it, that is why you get this error, the thread that creates the sessions is not the main one.

I hope that helps.

@technikamateur
Copy link
Author

Thanks for your explanation!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants