Skip to content
Merged
19 changes: 18 additions & 1 deletion src/sqlite3_to_mysql/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
help="Transfer only these specific tables (space separated table names). "
"Implies --without-foreign-keys which inhibits the transfer of foreign keys.",
)
@click.option(
"-e",
"--exclude-sqlite-tables",
type=tuple,
cls=OptionEatAll,
help="Transfer all tables except these specific tables (space separated table names). "
"Implies --without-foreign-keys which inhibits the transfer of foreign keys. "
"Can not be used together with --sqlite-tables.",
)
@click.option("-X", "--without-foreign-keys", is_flag=True, help="Do not transfer foreign keys.")
@click.option(
"-W",
Expand Down Expand Up @@ -136,6 +145,7 @@
def cli(
sqlite_file: t.Union[str, "os.PathLike[t.Any]"],
sqlite_tables: t.Optional[t.Sequence[str]],
exclude_sqlite_tables: t.Optional[t.Sequence[str]],
without_foreign_keys: bool,
ignore_duplicate_keys: bool,
mysql_user: str,
Expand Down Expand Up @@ -182,10 +192,17 @@ def cli(
"There is nothing to do. Exiting..."
)

if sqlite_tables and exclude_sqlite_tables:
raise click.ClickException(
"Error: Both -t/--sqlite-tables and -e/--exclude-sqlite-tables options are set. "
"Please use only one of them."
)

SQLite3toMySQL(
sqlite_file=sqlite_file,
sqlite_tables=sqlite_tables or tuple(),
without_foreign_keys=without_foreign_keys or (sqlite_tables is not None and len(sqlite_tables) > 0),
exclude_sqlite_tables=exclude_sqlite_tables or tuple(),
without_foreign_keys=without_foreign_keys or bool(sqlite_tables) or bool(exclude_sqlite_tables),
mysql_user=mysql_user,
mysql_password=mysql_password or prompt_mysql_password,
mysql_database=mysql_database,
Expand Down
27 changes: 21 additions & 6 deletions src/sqlite3_to_mysql/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,15 @@ def __init__(self, **kwargs: Unpack[SQLite3toMySQLParams]):

self._sqlite_tables = kwargs.get("sqlite_tables") or tuple()

self._without_foreign_keys = bool(self._sqlite_tables) or bool(kwargs.get("without_foreign_keys", False))
self._exclude_sqlite_tables = kwargs.get("exclude_sqlite_tables") or tuple()

if bool(self._sqlite_tables) and bool(self._exclude_sqlite_tables):
raise ValueError("Please provide either sqlite_tables or exclude_sqlite_tables, not both")

if bool(self._sqlite_tables) or bool(self._exclude_sqlite_tables):
self._without_foreign_keys = True
else:
self._without_foreign_keys = bool(kwargs.get("without_foreign_keys", False))

self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False))

Expand Down Expand Up @@ -857,16 +865,23 @@ def _transfer_table_data(self, sql: str, total_records: int = 0) -> None:

def transfer(self) -> None:
"""The primary and only method with which we transfer all the data."""
if len(self._sqlite_tables) > 0:
if len(self._sqlite_tables) > 0 or len(self._exclude_sqlite_tables) > 0:
# transfer only specific tables
specific_tables: t.Sequence[str] = (
self._exclude_sqlite_tables if len(self._exclude_sqlite_tables) > 0 else self._sqlite_tables
)

self._sqlite_cur.execute(
f"""
"""
SELECT name FROM sqlite_master
WHERE type='table'
AND name NOT LIKE 'sqlite_%'
AND name IN({("?, " * len(self._sqlite_tables)).rstrip(" ,")})
""",
self._sqlite_tables,
AND name {exclude} IN ({placeholders})
""".format(
exclude="NOT" if len(self._exclude_sqlite_tables) > 0 else "",
placeholders=", ".join("?" * len(specific_tables)),
),
specific_tables,
)
else:
# transfer all tables
Expand Down
2 changes: 2 additions & 0 deletions src/sqlite3_to_mysql/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class SQLite3toMySQLParams(TypedDict):

sqlite_file: t.Union[str, "os.PathLike[t.Any]"]
sqlite_tables: t.Optional[t.Sequence[str]]
exclude_sqlite_tables: t.Optional[t.Sequence[str]]
without_foreign_keys: t.Optional[bool]
mysql_user: t.Optional[str]
mysql_password: t.Optional[t.Union[str, bool]]
Expand Down Expand Up @@ -52,6 +53,7 @@ class SQLite3toMySQLAttributes:

_sqlite_file: t.Union[str, "os.PathLike[t.Any]"]
_sqlite_tables: t.Sequence[str]
_exclude_sqlite_tables: t.Sequence[str]
_without_foreign_keys: bool
_mysql_user: str
_mysql_password: t.Optional[str]
Expand Down
102 changes: 72 additions & 30 deletions tests/func/sqlite3_to_mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,32 +458,56 @@ def test_transfer_transfers_all_tables_in_sqlite_file(

@pytest.mark.transfer
@pytest.mark.parametrize(
"chunk, with_rowid, mysql_insert_method, ignore_duplicate_keys",
"chunk, with_rowid, mysql_insert_method, ignore_duplicate_keys, exclude_tables",
[
(None, False, "IGNORE", False),
(None, False, "IGNORE", True),
(None, False, "UPDATE", True),
(None, False, "UPDATE", False),
(None, False, "DEFAULT", True),
(None, False, "DEFAULT", False),
(None, True, "IGNORE", False),
(None, True, "IGNORE", True),
(None, True, "UPDATE", True),
(None, True, "UPDATE", False),
(None, True, "DEFAULT", True),
(None, True, "DEFAULT", False),
(10, False, "IGNORE", False),
(10, False, "IGNORE", True),
(10, False, "UPDATE", True),
(10, False, "UPDATE", False),
(10, False, "DEFAULT", True),
(10, False, "DEFAULT", False),
(10, True, "IGNORE", False),
(10, True, "IGNORE", True),
(10, True, "UPDATE", True),
(10, True, "UPDATE", False),
(10, True, "DEFAULT", True),
(10, True, "DEFAULT", False),
(None, False, "IGNORE", False, False),
(None, False, "IGNORE", False, True),
(None, False, "IGNORE", True, False),
(None, False, "IGNORE", True, True),
(None, False, "UPDATE", True, False),
(None, False, "UPDATE", True, True),
(None, False, "UPDATE", False, False),
(None, False, "UPDATE", False, True),
(None, False, "DEFAULT", True, False),
(None, False, "DEFAULT", True, True),
(None, False, "DEFAULT", False, False),
(None, False, "DEFAULT", False, True),
(None, True, "IGNORE", False, False),
(None, True, "IGNORE", False, True),
(None, True, "IGNORE", True, False),
(None, True, "IGNORE", True, True),
(None, True, "UPDATE", True, False),
(None, True, "UPDATE", True, True),
(None, True, "UPDATE", False, False),
(None, True, "UPDATE", False, True),
(None, True, "DEFAULT", True, False),
(None, True, "DEFAULT", True, True),
(None, True, "DEFAULT", False, False),
(None, True, "DEFAULT", False, True),
(10, False, "IGNORE", False, False),
(10, False, "IGNORE", False, True),
(10, False, "IGNORE", True, False),
(10, False, "IGNORE", True, True),
(10, False, "UPDATE", True, False),
(10, False, "UPDATE", True, True),
(10, False, "UPDATE", False, False),
(10, False, "UPDATE", False, True),
(10, False, "DEFAULT", True, False),
(10, False, "DEFAULT", True, True),
(10, False, "DEFAULT", False, False),
(10, False, "DEFAULT", False, True),
(10, True, "IGNORE", False, False),
(10, True, "IGNORE", False, True),
(10, True, "IGNORE", True, False),
(10, True, "IGNORE", True, True),
(10, True, "UPDATE", True, False),
(10, True, "UPDATE", True, True),
(10, True, "UPDATE", False, False),
(10, True, "UPDATE", False, True),
(10, True, "DEFAULT", True, False),
(10, True, "DEFAULT", True, True),
(10, True, "DEFAULT", False, False),
(10, True, "DEFAULT", False, True),
],
)
def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_file(
Expand All @@ -498,6 +522,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
with_rowid: bool,
mysql_insert_method: str,
ignore_duplicate_keys: bool,
exclude_tables: bool,
) -> None:
sqlite_engine: Engine = create_engine(
f"sqlite:///{sqlite_database}",
Expand All @@ -513,9 +538,13 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
random_sqlite_tables: t.List[str] = sample(sqlite_tables, table_number)
random_sqlite_tables.sort()

remaining_tables: t.List[str] = list(set(sqlite_tables) - set(random_sqlite_tables))
remaining_tables.sort()

proc: SQLite3toMySQL = SQLite3toMySQL( # type: ignore[call-arg]
sqlite_file=sqlite_database,
sqlite_tables=random_sqlite_tables,
sqlite_tables=None if exclude_tables else random_sqlite_tables,
exclude_sqlite_tables=random_sqlite_tables if exclude_tables else None,
mysql_user=mysql_credentials.user,
mysql_password=mysql_credentials.password,
mysql_host=mysql_credentials.host,
Expand All @@ -528,6 +557,16 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
)
caplog.set_level(logging.DEBUG)
proc.transfer()
assert all(
message in [record.message for record in caplog.records]
for message in set(
[
f"Transferring table {table}"
for table in (remaining_tables if exclude_tables else random_sqlite_tables)
]
+ ["Done!"]
)
)
assert all(record.levelname == "INFO" for record in caplog.records)
assert not any(record.levelname == "ERROR" for record in caplog.records)
out, err = capsys.readouterr()
Expand All @@ -542,10 +581,13 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
mysql_tables: t.List[str] = mysql_inspect.get_table_names()

""" Test if both databases have the same table names """
assert random_sqlite_tables == mysql_tables
if exclude_tables:
assert remaining_tables == mysql_tables
else:
assert random_sqlite_tables == mysql_tables

""" Test if all the tables have the same column names """
for table_name in random_sqlite_tables:
for table_name in remaining_tables if exclude_tables else random_sqlite_tables:
column_names: t.List[t.Any] = [column["name"] for column in sqlite_inspect.get_columns(table_name)]
if with_rowid:
column_names.insert(0, "rowid")
Expand All @@ -558,7 +600,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
for index in (chain.from_iterable(mysql_inspect.get_indexes(table_name) for table_name in mysql_tables))
)

for table_name in random_sqlite_tables:
for table_name in remaining_tables if exclude_tables else random_sqlite_tables:
sqlite_indices: t.List[ReflectedIndex] = sqlite_inspect.get_indexes(table_name)
if with_rowid:
sqlite_indices.insert(
Expand All @@ -580,7 +622,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi
mysql_results: t.List[t.Tuple[t.Tuple[t.Any, ...], ...]] = []

meta: MetaData = MetaData()
for table_name in random_sqlite_tables:
for table_name in remaining_tables if exclude_tables else random_sqlite_tables:
sqlite_table: Table = Table(table_name, meta, autoload_with=sqlite_engine)
sqlite_stmt: Select = select(sqlite_table)
sqlite_result: t.List[Row[t.Any]] = list(sqlite_cnx.execute(sqlite_stmt).fetchall())
Expand Down
34 changes: 34 additions & 0 deletions tests/func/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,40 @@ def test_invalid_mysql_collation(
assert "Error: Invalid value for '--mysql-collation'" in result.output
assert "invalid_collation" in result.output

def test_specific_tables_include_and_exclude_are_mutually_exclusive_options(
self,
cli_runner: CliRunner,
sqlite_database: str,
mysql_credentials: MySQLCredentials,
) -> None:
"""Test that specifying both include and exclude tables raises an error."""
result: Result = cli_runner.invoke(
sqlite3mysql,
[
"-f",
sqlite_database,
"-t",
"table1 table2",
"-e",
"table3 table4",
"-d",
mysql_credentials.database,
"-u",
mysql_credentials.user,
"--mysql-password",
mysql_credentials.password,
"-h",
mysql_credentials.host,
"-P",
str(mysql_credentials.port),
],
)
assert result.exit_code > 0
assert (
"Error: Both -t/--sqlite-tables and -e/--exclude-sqlite-tables options are set. "
"Please use only one of them."
) in result.output

def test_transfer_specific_tables_only(
self,
cli_runner: CliRunner,
Expand Down