When using a multiple database setup, alembic does not detect added foreign keys #1443
-
I have alembic set up with multiple postgres databases, I am by no means an expert so followed this guide: https://medium.com/pythonistas/managing-multiple-databases-migrations-with-alembic-10025a4b3ab3. They are tracked by a single I am experiencing some strange issues. When adding a foreign key column to a table, the new column is detected, but the foreign key constraint is not. I've had to manually add the foreign key constraint every time to the migration file. This only happens when adding the foreign key to a preexisting table. If creating a new table with a foreign key, the new foreign key is correctly detected and added. I've tried to debug what'as happening in the I've tried explicitly naming my foreign key constraints, and also auto naming, according to this guide: https://alembic.sqlalchemy.org/en/latest/naming.html. Neither seemed to make alembic pick it up. I'm assuming this issue is a peculiarity of my multi DB setup, but just can't see where exactly it's being caused. Any insights would be welcome! Here is my env.py file: import asyncio
from logging.config import fileConfig
import alembic_postgresql_enum # noqa: F401
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database.models import Base
import os
from dotenv import load_dotenv
load_dotenv()
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
section = config.config_ini_section
config.set_section_option(section, "DB_USER", os.environ.get("DB_USER"))
config.set_section_option(section, "DB_PASSWORD", os.environ.get("DB_PASS_ENCODED"))
config.set_section_option(section, "DB_HOST", os.environ.get("DB_HOST"))
config.set_section_option(section, "DEV_DATABASE_NAME", "dev")
config.set_section_option(section, "LIVE_DATABASE_NAME", "live")
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import mymodel
# target_metadata = mymodel.Base.metadata
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
#
def get_url():
return (
f"postgresql+asyncpg://{os.environ.get('DB_USER')}:"
f"{os.environ.get('DB_PASS_ENCODED')}@{os.environ.get('DB_HOST')}"
)
db_url = get_url()
db_name = (
config.config_ini_section
) # active config ini section is the db name that we have chosen
config.set_main_option("sqlalchemy.url", f"{db_url}/{db_name}")
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
def include_object(object, name, type_, reflected, compare_to):
if (
type_ == "foreign_key_constraint"
and compare_to
and (
compare_to.elements[0].target_fullname
== db_name + "." + object.elements[0].target_fullname
or db_name + "." + compare_to.elements[0].target_fullname
== object.elements[0].target_fullname
)
):
return False
# Make sure we don't drop the spatial_ref_sys table
if type_ == "table" and name == "spatial_ref_sys":
return False
if type_ == "table":
if object.schema == db_name or object.schema is None:
return True
elif object.table.schema == db_name or object.table.schema is None:
return True
else:
return False
if type_ == "column" and compare_to is None:
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode."""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online() |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 10 replies
-
what kind of "foreign keys" are these? how do these foreign keys relate to "multiple databases" ? what does "multiple databases" mean here? |
Beta Was this translation helpful? Give feedback.
-
Thanks for the comments. I'm pretty sure it is related to my custom include_object but not sure exactly what's causing it. The issue is present on both databases. The issue is present if I add a foreign key constaint to a preexisting column. For example, if I start with a model like this: class Table1(Base):
__tablename__ = "table_1"
id: Mapped[Optional[int]] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
field_1: Mapped[int] = mapped_column()
class Table2(Base):
__tablename__ = "table_2"
id: Mapped[Optional[int]] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
field_2: Mapped[int] = mapped_column() and change the class Table1(Base):
__tablename__ = "table_1"
id: Mapped[Optional[int]] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(ForeignKey("table_2.name"))
field_1: Mapped[int] = mapped_column()
class Table2(Base):
__tablename__ = "table_2"
id: Mapped[Optional[int]] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column()
field_2: Mapped[int] = mapped_column() This is not detected by alembic and produces an empty migration file. If I create a new table from scratch with a foreign key, it is correctly detected. @CaselIT I've tried commenting out the part relating to the foreign keys in my custom def do_run_migrations(connection: Connection) -> None:
def include_object(object, name, type_, reflected, compare_to):
# if (
# type_ == "foreign_key_constraint"
# and compare_to
# and (
# compare_to.elements[0].target_fullname
# == db_name + "." + object.elements[0].target_fullname
# or db_name + "." + compare_to.elements[0].target_fullname
# == object.elements[0].target_fullname
# )
# ):
# return False
# Make sure we don't drop the spatial_ref_sys table
if type_ == "table" and name == "spatial_ref_sys":
return False
if type_ == "table":
if object.schema == db_name or object.schema is None:
return True
elif object.table.schema == db_name or object.table.schema is None:
return True
else:
return False
if type_ == "column" and compare_to is None:
return True
context.configure(
connection=connection,
target_metadata=target_metadata,
include_object=include_object,
)
with context.begin_transaction():
context.run_migrations() So I guess I might need to specifically include the foreign keys in my include_object, but not sure how to go about that. I did struggle a bit to understand the |
Beta Was this translation helpful? Give feedback.
Note that every time you return a falsy value from include_object that will be treated as "skip", same as returning False.
so you probably need a
after your initial check, otherwise you exclude all of the new ones (that
compare_to=None
)alternatively you may also consider
return True
at the end of your include object, otherwise you are excluding everything that's not matched by your conditions, so for example all indexes, unique, other constraints seem skipped by your include function