From ae64d828df9debe61283ca7aa436bdbf0b49d0e7 Mon Sep 17 00:00:00 2001 From: Matt Riedemann Date: Fri, 20 Feb 2015 09:26:10 -0800 Subject: [PATCH] allow dropping fkeys with sqlite This implements the ability to drop foreign keys with sqlite. It's basically the same implementation used for dropping unique constraints so the common code is refactored. The existing FKey test that was skipping sqlite is no longer skipped to show this works. Change-Id: Idaaf4229e34af4c21c3bcead4b4e22491d24238e Closes-Bug: #1423955 --- migrate/changeset/databases/sqlite.py | 53 +++++++++++++++------- migrate/tests/changeset/test_constraint.py | 2 +- 2 files changed, 37 insertions(+), 18 deletions(-) diff --git a/migrate/changeset/databases/sqlite.py b/migrate/changeset/databases/sqlite.py index 8a5ba90d..92d42f25 100644 --- a/migrate/changeset/databases/sqlite.py +++ b/migrate/changeset/databases/sqlite.py @@ -11,6 +11,7 @@ import re from sqlalchemy.databases import sqlite as sa_base +from sqlalchemy.schema import ForeignKeyConstraint from sqlalchemy.schema import UniqueConstraint from migrate import exceptions @@ -29,8 +30,24 @@ def _not_supported(self, op): class SQLiteHelper(SQLiteCommon): - def _get_unique_constraints(self, table): - """Retrieve information about existing unique constraints of the table + def _filter_columns(self, cols, table): + """Splits the string of columns and returns those only in the table. + + :param cols: comma-delimited string of table columns + :param table: the table to check + :return: list of columns in the table + """ + columns = [] + for c in cols.split(","): + if c in table.columns: + # There was a bug in reflection of SQLite columns with + # reserved identifiers as names (SQLite can return them + # wrapped with double quotes), so strip double quotes. + columns.extend(c.strip(' "')) + return columns + + def _get_constraints(self, table): + """Retrieve information about existing constraints of the table This feature is needed for recreate_table() to work properly. """ @@ -48,19 +65,21 @@ def _get_unique_constraints(self, table): constraints = [] for name, cols in re.findall(UNIQUE_PATTERN, data): # Filter out any columns that were dropped from the table. - columns = [] - for c in cols.split(","): - if c in table.columns: - # There was a bug in reflection of SQLite columns with - # reserved identifiers as names (SQLite can return them - # wrapped with double quotes), so strip double quotes. - columns.extend(c.strip(' "')) + columns = self._filter_columns(cols, table) if columns: constraints.extend(UniqueConstraint(*columns, name=name)) + + FKEY_PATTERN = "CONSTRAINT (\w+) FOREIGN KEY \(([^\)]+)\)" + for name, cols in re.findall(FKEY_PATTERN, data): + # Filter out any columns that were dropped from the table. + columns = self._filter_columns(cols, table) + if columns: + constraints.extend(ForeignKeyConstraint(*columns, name=name)) + return constraints def recreate_table(self, table, column=None, delta=None, - omit_uniques=None): + omit_constraints=None): table_name = self.preparer.format_table(table) # we remove all indexes so as not to have @@ -68,13 +87,13 @@ def recreate_table(self, table, column=None, delta=None, for index in table.indexes: index.drop() - # reflect existing unique constraints - for uc in self._get_unique_constraints(table): - table.append_constraint(uc) - # omit given unique constraints when creating a new table if required + # reflect existing constraints + for constraint in self._get_constraints(table): + table.append_constraint(constraint) + # omit given constraints when creating a new table if required table.constraints = set([ cons for cons in table.constraints - if omit_uniques is None or cons.name not in omit_uniques + if omit_constraints is None or cons.name not in omit_constraints ]) self.append('ALTER TABLE %s RENAME TO migration_tmp' % table_name) @@ -182,13 +201,13 @@ def visit_migrate_primary_key_constraint(self, constraint): self.execute() def visit_migrate_foreign_key_constraint(self, *p, **k): - self._not_supported('ALTER TABLE DROP CONSTRAINT') + self.recreate_table(p[0].table, omit_constraints=[p[0].name]) def visit_migrate_check_constraint(self, *p, **k): self._not_supported('ALTER TABLE DROP CONSTRAINT') def visit_migrate_unique_constraint(self, *p, **k): - self.recreate_table(p[0].table, omit_uniques=[p[0].name]) + self.recreate_table(p[0].table, omit_constraints=[p[0].name]) # TODO: technically primary key is a NOT NULL + UNIQUE constraint, should add NOT NULL to index diff --git a/migrate/tests/changeset/test_constraint.py b/migrate/tests/changeset/test_constraint.py index 6421206a..1faf4ccf 100644 --- a/migrate/tests/changeset/test_constraint.py +++ b/migrate/tests/changeset/test_constraint.py @@ -71,7 +71,7 @@ def _define_pk(self, *cols): self.assertTrue(isinstance(self.table.primary_key, schema.PrimaryKeyConstraint)) return pk - @fixture.usedb(not_supported='sqlite') + @fixture.usedb() def test_define_fk(self): """FK constraints can be defined, created, and dropped""" # FK target must be unique