diff --git a/src/sqlite3_to_mysql/cli.py b/src/sqlite3_to_mysql/cli.py index 0b3df7c..f06276e 100644 --- a/src/sqlite3_to_mysql/cli.py +++ b/src/sqlite3_to_mysql/cli.py @@ -41,6 +41,15 @@ help="Transfer only these specific tables (space separated table names). " "Implies --without-foreign-keys which inhibits the transfer of foreign keys.", ) +@click.option( + "-e", + "--exclude-sqlite-tables", + type=tuple, + cls=OptionEatAll, + help="Transfer all tables except these specific tables (space separated table names). " + "Implies --without-foreign-keys which inhibits the transfer of foreign keys. " + "Can not be used together with --sqlite-tables.", +) @click.option("-X", "--without-foreign-keys", is_flag=True, help="Do not transfer foreign keys.") @click.option( "-W", @@ -136,6 +145,7 @@ def cli( sqlite_file: t.Union[str, "os.PathLike[t.Any]"], sqlite_tables: t.Optional[t.Sequence[str]], + exclude_sqlite_tables: t.Optional[t.Sequence[str]], without_foreign_keys: bool, ignore_duplicate_keys: bool, mysql_user: str, @@ -182,10 +192,17 @@ def cli( "There is nothing to do. Exiting..." ) + if sqlite_tables and exclude_sqlite_tables: + raise click.ClickException( + "Error: Both -t/--sqlite-tables and -e/--exclude-sqlite-tables options are set. " + "Please use only one of them." + ) + SQLite3toMySQL( sqlite_file=sqlite_file, sqlite_tables=sqlite_tables or tuple(), - without_foreign_keys=without_foreign_keys or (sqlite_tables is not None and len(sqlite_tables) > 0), + exclude_sqlite_tables=exclude_sqlite_tables or tuple(), + without_foreign_keys=without_foreign_keys or bool(sqlite_tables) or bool(exclude_sqlite_tables), mysql_user=mysql_user, mysql_password=mysql_password or prompt_mysql_password, mysql_database=mysql_database, diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index 7c0864b..e9d05da 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -109,7 +109,15 @@ def __init__(self, **kwargs: Unpack[SQLite3toMySQLParams]): self._sqlite_tables = kwargs.get("sqlite_tables") or tuple() - self._without_foreign_keys = bool(self._sqlite_tables) or bool(kwargs.get("without_foreign_keys", False)) + self._exclude_sqlite_tables = kwargs.get("exclude_sqlite_tables") or tuple() + + if bool(self._sqlite_tables) and bool(self._exclude_sqlite_tables): + raise ValueError("Please provide either sqlite_tables or exclude_sqlite_tables, not both") + + if bool(self._sqlite_tables) or bool(self._exclude_sqlite_tables): + self._without_foreign_keys = True + else: + self._without_foreign_keys = bool(kwargs.get("without_foreign_keys", False)) self._mysql_ssl_disabled = bool(kwargs.get("mysql_ssl_disabled", False)) @@ -857,16 +865,23 @@ def _transfer_table_data(self, sql: str, total_records: int = 0) -> None: def transfer(self) -> None: """The primary and only method with which we transfer all the data.""" - if len(self._sqlite_tables) > 0: + if len(self._sqlite_tables) > 0 or len(self._exclude_sqlite_tables) > 0: # transfer only specific tables + specific_tables: t.Sequence[str] = ( + self._exclude_sqlite_tables if len(self._exclude_sqlite_tables) > 0 else self._sqlite_tables + ) + self._sqlite_cur.execute( - f""" + """ SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%' - AND name IN({("?, " * len(self._sqlite_tables)).rstrip(" ,")}) - """, - self._sqlite_tables, + AND name {exclude} IN ({placeholders}) + """.format( + exclude="NOT" if len(self._exclude_sqlite_tables) > 0 else "", + placeholders=", ".join("?" * len(specific_tables)), + ), + specific_tables, ) else: # transfer all tables diff --git a/src/sqlite3_to_mysql/types.py b/src/sqlite3_to_mysql/types.py index 3717877..84a79a3 100644 --- a/src/sqlite3_to_mysql/types.py +++ b/src/sqlite3_to_mysql/types.py @@ -22,6 +22,7 @@ class SQLite3toMySQLParams(TypedDict): sqlite_file: t.Union[str, "os.PathLike[t.Any]"] sqlite_tables: t.Optional[t.Sequence[str]] + exclude_sqlite_tables: t.Optional[t.Sequence[str]] without_foreign_keys: t.Optional[bool] mysql_user: t.Optional[str] mysql_password: t.Optional[t.Union[str, bool]] @@ -52,6 +53,7 @@ class SQLite3toMySQLAttributes: _sqlite_file: t.Union[str, "os.PathLike[t.Any]"] _sqlite_tables: t.Sequence[str] + _exclude_sqlite_tables: t.Sequence[str] _without_foreign_keys: bool _mysql_user: str _mysql_password: t.Optional[str] diff --git a/tests/func/sqlite3_to_mysql_test.py b/tests/func/sqlite3_to_mysql_test.py index bb887cb..bbab6a8 100644 --- a/tests/func/sqlite3_to_mysql_test.py +++ b/tests/func/sqlite3_to_mysql_test.py @@ -458,32 +458,56 @@ def test_transfer_transfers_all_tables_in_sqlite_file( @pytest.mark.transfer @pytest.mark.parametrize( - "chunk, with_rowid, mysql_insert_method, ignore_duplicate_keys", + "chunk, with_rowid, mysql_insert_method, ignore_duplicate_keys, exclude_tables", [ - (None, False, "IGNORE", False), - (None, False, "IGNORE", True), - (None, False, "UPDATE", True), - (None, False, "UPDATE", False), - (None, False, "DEFAULT", True), - (None, False, "DEFAULT", False), - (None, True, "IGNORE", False), - (None, True, "IGNORE", True), - (None, True, "UPDATE", True), - (None, True, "UPDATE", False), - (None, True, "DEFAULT", True), - (None, True, "DEFAULT", False), - (10, False, "IGNORE", False), - (10, False, "IGNORE", True), - (10, False, "UPDATE", True), - (10, False, "UPDATE", False), - (10, False, "DEFAULT", True), - (10, False, "DEFAULT", False), - (10, True, "IGNORE", False), - (10, True, "IGNORE", True), - (10, True, "UPDATE", True), - (10, True, "UPDATE", False), - (10, True, "DEFAULT", True), - (10, True, "DEFAULT", False), + (None, False, "IGNORE", False, False), + (None, False, "IGNORE", False, True), + (None, False, "IGNORE", True, False), + (None, False, "IGNORE", True, True), + (None, False, "UPDATE", True, False), + (None, False, "UPDATE", True, True), + (None, False, "UPDATE", False, False), + (None, False, "UPDATE", False, True), + (None, False, "DEFAULT", True, False), + (None, False, "DEFAULT", True, True), + (None, False, "DEFAULT", False, False), + (None, False, "DEFAULT", False, True), + (None, True, "IGNORE", False, False), + (None, True, "IGNORE", False, True), + (None, True, "IGNORE", True, False), + (None, True, "IGNORE", True, True), + (None, True, "UPDATE", True, False), + (None, True, "UPDATE", True, True), + (None, True, "UPDATE", False, False), + (None, True, "UPDATE", False, True), + (None, True, "DEFAULT", True, False), + (None, True, "DEFAULT", True, True), + (None, True, "DEFAULT", False, False), + (None, True, "DEFAULT", False, True), + (10, False, "IGNORE", False, False), + (10, False, "IGNORE", False, True), + (10, False, "IGNORE", True, False), + (10, False, "IGNORE", True, True), + (10, False, "UPDATE", True, False), + (10, False, "UPDATE", True, True), + (10, False, "UPDATE", False, False), + (10, False, "UPDATE", False, True), + (10, False, "DEFAULT", True, False), + (10, False, "DEFAULT", True, True), + (10, False, "DEFAULT", False, False), + (10, False, "DEFAULT", False, True), + (10, True, "IGNORE", False, False), + (10, True, "IGNORE", False, True), + (10, True, "IGNORE", True, False), + (10, True, "IGNORE", True, True), + (10, True, "UPDATE", True, False), + (10, True, "UPDATE", True, True), + (10, True, "UPDATE", False, False), + (10, True, "UPDATE", False, True), + (10, True, "DEFAULT", True, False), + (10, True, "DEFAULT", True, True), + (10, True, "DEFAULT", False, False), + (10, True, "DEFAULT", False, True), ], ) def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_file( @@ -498,6 +522,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi with_rowid: bool, mysql_insert_method: str, ignore_duplicate_keys: bool, + exclude_tables: bool, ) -> None: sqlite_engine: Engine = create_engine( f"sqlite:///{sqlite_database}", @@ -513,9 +538,13 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi random_sqlite_tables: t.List[str] = sample(sqlite_tables, table_number) random_sqlite_tables.sort() + remaining_tables: t.List[str] = list(set(sqlite_tables) - set(random_sqlite_tables)) + remaining_tables.sort() + proc: SQLite3toMySQL = SQLite3toMySQL( # type: ignore[call-arg] sqlite_file=sqlite_database, - sqlite_tables=random_sqlite_tables, + sqlite_tables=None if exclude_tables else random_sqlite_tables, + exclude_sqlite_tables=random_sqlite_tables if exclude_tables else None, mysql_user=mysql_credentials.user, mysql_password=mysql_credentials.password, mysql_host=mysql_credentials.host, @@ -528,6 +557,16 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi ) caplog.set_level(logging.DEBUG) proc.transfer() + assert all( + message in [record.message for record in caplog.records] + for message in set( + [ + f"Transferring table {table}" + for table in (remaining_tables if exclude_tables else random_sqlite_tables) + ] + + ["Done!"] + ) + ) assert all(record.levelname == "INFO" for record in caplog.records) assert not any(record.levelname == "ERROR" for record in caplog.records) out, err = capsys.readouterr() @@ -542,10 +581,13 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi mysql_tables: t.List[str] = mysql_inspect.get_table_names() """ Test if both databases have the same table names """ - assert random_sqlite_tables == mysql_tables + if exclude_tables: + assert remaining_tables == mysql_tables + else: + assert random_sqlite_tables == mysql_tables """ Test if all the tables have the same column names """ - for table_name in random_sqlite_tables: + for table_name in remaining_tables if exclude_tables else random_sqlite_tables: column_names: t.List[t.Any] = [column["name"] for column in sqlite_inspect.get_columns(table_name)] if with_rowid: column_names.insert(0, "rowid") @@ -558,7 +600,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi for index in (chain.from_iterable(mysql_inspect.get_indexes(table_name) for table_name in mysql_tables)) ) - for table_name in random_sqlite_tables: + for table_name in remaining_tables if exclude_tables else random_sqlite_tables: sqlite_indices: t.List[ReflectedIndex] = sqlite_inspect.get_indexes(table_name) if with_rowid: sqlite_indices.insert( @@ -580,7 +622,7 @@ def test_transfer_specific_tables_transfers_only_specified_tables_from_sqlite_fi mysql_results: t.List[t.Tuple[t.Tuple[t.Any, ...], ...]] = [] meta: MetaData = MetaData() - for table_name in random_sqlite_tables: + for table_name in remaining_tables if exclude_tables else random_sqlite_tables: sqlite_table: Table = Table(table_name, meta, autoload_with=sqlite_engine) sqlite_stmt: Select = select(sqlite_table) sqlite_result: t.List[Row[t.Any]] = list(sqlite_cnx.execute(sqlite_stmt).fetchall()) diff --git a/tests/func/test_cli.py b/tests/func/test_cli.py index a80b999..a1db3bb 100644 --- a/tests/func/test_cli.py +++ b/tests/func/test_cli.py @@ -629,6 +629,40 @@ def test_invalid_mysql_collation( assert "Error: Invalid value for '--mysql-collation'" in result.output assert "invalid_collation" in result.output + def test_specific_tables_include_and_exclude_are_mutually_exclusive_options( + self, + cli_runner: CliRunner, + sqlite_database: str, + mysql_credentials: MySQLCredentials, + ) -> None: + """Test that specifying both include and exclude tables raises an error.""" + result: Result = cli_runner.invoke( + sqlite3mysql, + [ + "-f", + sqlite_database, + "-t", + "table1 table2", + "-e", + "table3 table4", + "-d", + mysql_credentials.database, + "-u", + mysql_credentials.user, + "--mysql-password", + mysql_credentials.password, + "-h", + mysql_credentials.host, + "-P", + str(mysql_credentials.port), + ], + ) + assert result.exit_code > 0 + assert ( + "Error: Both -t/--sqlite-tables and -e/--exclude-sqlite-tables options are set. " + "Please use only one of them." + ) in result.output + def test_transfer_specific_tables_only( self, cli_runner: CliRunner,