Skip to content

Commit

Permalink
Add database schema update and database migration logic (#520)
Browse files Browse the repository at this point in the history
* add db migration logic and a test for it

* make Job and JobDefinition records extendable

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* make updated_job_model a fixture

* add return types to test_orm fixtures

* refactor update_db_schema logic into a separate function

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* make initial_db return a tuple

* improve naming clarity

* remove a level of intendation in update_db_schema

* Ignore nullability and default values during the db migration, document the fact via comments

* improve update_db_schema accordingly to comments

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
andrii-i and pre-commit-ci[bot] committed May 28, 2024
1 parent d690ac8 commit 5b55901
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 4 deletions.
2 changes: 1 addition & 1 deletion conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import Session, sessionmaker
from sqlalchemy.orm import sessionmaker

from jupyter_scheduler.orm import Base
from jupyter_scheduler.scheduler import Scheduler
Expand Down
42 changes: 39 additions & 3 deletions jupyter_scheduler/orm.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import json
import os
from sqlite3 import OperationalError
from uuid import uuid4

import sqlalchemy.types as types
from sqlalchemy import Boolean, Column, Integer, String, create_engine
from sqlalchemy import Boolean, Column, Integer, String, create_engine, inspect
from sqlalchemy.orm import declarative_base, declarative_mixin, registry, sessionmaker
from sqlalchemy.sql import text

from jupyter_scheduler.models import EmailNotifications, Status
from jupyter_scheduler.utils import get_utc_timestamp
Expand Down Expand Up @@ -85,12 +85,15 @@ class CommonColumns:
output_filename_template = Column(String(256))
update_time = Column(Integer, default=get_utc_timestamp, onupdate=get_utc_timestamp)
create_time = Column(Integer, default=get_utc_timestamp)
# All new columns added to this table must be nullable to ensure compatibility during database migrations.
# Any default values specified for new columns will be ignored during the migration process.
package_input_folder = Column(Boolean)
packaged_files = Column(JsonType, default=[])


class Job(CommonColumns, Base):
__tablename__ = "jobs"
__table_args__ = {"extend_existing": True}
job_id = Column(String(36), primary_key=True, default=generate_uuid)
job_definition_id = Column(String(36))
status = Column(String(64), default=Status.STOPPED)
Expand All @@ -100,20 +103,53 @@ class Job(CommonColumns, Base):
url = Column(String(256), default=generate_jobs_url)
pid = Column(Integer)
idempotency_token = Column(String(256))
# All new columns added to this table must be nullable to ensure compatibility during database migrations.
# Any default values specified for new columns will be ignored during the migration process.


class JobDefinition(CommonColumns, Base):
__tablename__ = "job_definitions"
__table_args__ = {"extend_existing": True}
job_definition_id = Column(String(36), primary_key=True, default=generate_uuid)
schedule = Column(String(256))
timezone = Column(String(36))
url = Column(String(256), default=generate_job_definitions_url)
create_time = Column(Integer, default=get_utc_timestamp)
active = Column(Boolean, default=True)
# All new columns added to this table must be nullable to ensure compatibility during database migrations.
# Any default values specified for new columns will be ignored during the migration process.


def create_tables(db_url, drop_tables=False):
def update_db_schema(engine, Base):
inspector = inspect(engine)
alter_statements = []

for table_name, model in Base.metadata.tables.items():
if not inspector.has_table(table_name):
continue
columns_db = inspector.get_columns(table_name)
columns_db_names = {col["name"] for col in columns_db}

for column_model_name, column_model in model.c.items():
if column_model_name in columns_db_names:
continue
column_type = str(column_model.type.compile(dialect=engine.dialect))
alter_statement = text(
f"ALTER TABLE {table_name} ADD COLUMN {column_model_name} {column_type} NULL"
)
alter_statements.append(alter_statement)

if not alter_statements:
return
with engine.connect() as connection:
for alter_statement in alter_statements:
connection.execute(alter_statement)


def create_tables(db_url, drop_tables=False, Base=Base):
engine = create_engine(db_url)
update_db_schema(engine, Base)

try:
if drop_tables:
Base.metadata.drop_all(engine)
Expand Down
73 changes: 73 additions & 0 deletions jupyter_scheduler/tests/test_orm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from typing import Type

import pytest
from sqlalchemy import Column, Integer, String, inspect
from sqlalchemy.orm import DeclarativeMeta, sessionmaker

from jupyter_scheduler.orm import (
create_session,
create_tables,
declarative_base,
generate_uuid,
)


@pytest.fixture
def initial_db(jp_scheduler_db_url) -> tuple[Type[DeclarativeMeta], sessionmaker, str]:
TestBase = declarative_base()

class MockInitialJob(TestBase):
__tablename__ = "jobs"
job_id = Column(String(36), primary_key=True, default=generate_uuid)
runtime_environment_name = Column(String(256), nullable=False)
input_filename = Column(String(256), nullable=False)

initial_job = MockInitialJob(runtime_environment_name="abc", input_filename="input.ipynb")

create_tables(db_url=jp_scheduler_db_url, Base=TestBase)

Session = create_session(jp_scheduler_db_url)
session = Session()

session.add(initial_job)
session.commit()
job_id = initial_job.job_id
session.close()

return (TestBase, Session, job_id)


@pytest.fixture
def updated_job_model(initial_db) -> Type[DeclarativeMeta]:
TestBase = initial_db[0]

class MockUpdatedJob(TestBase):
__tablename__ = "jobs"
__table_args__ = {"extend_existing": True}
job_id = Column(String(36), primary_key=True, default=generate_uuid)
runtime_environment_name = Column(String(256), nullable=False)
input_filename = Column(String(256), nullable=False)
new_column = Column("new_column", Integer)

return MockUpdatedJob


def test_create_tables_with_new_column(jp_scheduler_db_url, initial_db, updated_job_model):
TestBase, Session, initial_job_id = initial_db

session = Session()
initial_columns = {col["name"] for col in inspect(session.bind).get_columns("jobs")}
assert "new_column" not in initial_columns
session.close()

JobModel = updated_job_model
create_tables(db_url=jp_scheduler_db_url, Base=TestBase)

session = Session()
updated_columns = {col["name"] for col in inspect(session.bind).get_columns("jobs")}
assert "new_column" in updated_columns

updated_job = session.query(JobModel).filter(JobModel.job_id == initial_job_id).one()
assert hasattr(updated_job, "new_column")
assert updated_job.runtime_environment_name == "abc"
assert updated_job.input_filename == "input.ipynb"

0 comments on commit 5b55901

Please sign in to comment.