diff --git a/pyproject.toml b/pyproject.toml index acde399..b74cc0f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -46,6 +46,7 @@ dependencies = [ "python-dateutil>=2.9.0.post0", "python-slugify>=7.0.0", "simplejson>=3.19.0", + "sqlglot>=27.27.0", "tqdm>=4.65.0", "tabulate", "typing-extensions; python_version < \"3.11\"", diff --git a/requirements_dev.txt b/requirements_dev.txt index 986342f..1050037 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -17,6 +17,7 @@ simplejson>=3.19.1 types-simplejson sqlalchemy>=2.0.0 sqlalchemy-utils +sqlglot>=27.27.0 types-sqlalchemy-utils tox tqdm>=4.65.0 diff --git a/src/mysql_to_sqlite3/cli.py b/src/mysql_to_sqlite3/cli.py index 1c7ffe0..c97249e 100644 --- a/src/mysql_to_sqlite3/cli.py +++ b/src/mysql_to_sqlite3/cli.py @@ -142,6 +142,12 @@ ) @click.option("-l", "--log-file", type=click.Path(), help="Log file") @click.option("--json-as-text", is_flag=True, help="Transfer JSON columns as TEXT.") +@click.option( + "-T", + "--mysql-views-as-tables", + is_flag=True, + help="Materialize MySQL VIEWs as SQLite tables (legacy behavior).", +) @click.option( "-V", "--vacuum", @@ -182,6 +188,7 @@ def cli( chunk: int, log_file: t.Union[str, "os.PathLike[t.Any]"], json_as_text: bool, + mysql_views_as_tables: bool, vacuum: bool, use_buffered_cursors: bool, quiet: bool, @@ -230,6 +237,7 @@ def cli( mysql_ssl_disabled=skip_ssl, chunk=chunk, json_as_text=json_as_text, + views_as_views=not mysql_views_as_tables, vacuum=vacuum, buffered=use_buffered_cursors, log_file=log_file, diff --git a/src/mysql_to_sqlite3/transporter.py b/src/mysql_to_sqlite3/transporter.py index 9a537b0..1778d6a 100644 --- a/src/mysql_to_sqlite3/transporter.py +++ b/src/mysql_to_sqlite3/transporter.py @@ -15,6 +15,8 @@ from mysql.connector import CharacterSet, errorcode from mysql.connector.abstracts import MySQLConnectionAbstract from mysql.connector.types import RowItemType +from sqlglot import exp, parse_one +from sqlglot.errors import ParseError from tqdm import tqdm, trange @@ -120,6 +122,8 @@ def __init__(self, **kwargs: Unpack[MySQLtoSQLiteParams]) -> None: self._quiet = bool(kwargs.get("quiet", False)) + self._views_as_views = bool(kwargs.get("views_as_views", True)) + self._sqlite_strict = bool(kwargs.get("sqlite_strict", False)) self._logger = self._setup_logger(log_file=kwargs.get("log_file") or None, quiet=self._quiet) @@ -637,6 +641,7 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) -> if not attempting_reconnect: self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.") self._create_table(table_name, True) + return else: self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") raise @@ -650,6 +655,130 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) -> self._logger.error("SQLite failed creating table %s: %s", table_name, err) raise + @staticmethod + def _mysql_viewdef_to_sqlite( + view_select_sql: str, + view_name: str, + schema_name: t.Optional[str] = None, + keep_schema: bool = False, + ) -> str: + """ + Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement. + + If keep_schema is False and schema_name is provided, strip qualifiers like `example`.table. + If keep_schema is True, you must ATTACH the SQLite database as that schema name before using the view. + """ + # Normalize whitespace and avoid double semicolons in output + cleaned_sql = view_select_sql.strip().rstrip(";") + + try: + tree = parse_one(cleaned_sql, read="mysql") + except (ParseError, ValueError, Exception): # pylint: disable=W0718 + # Fallback: return a basic CREATE VIEW using the original SELECT + return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{cleaned_sql};' + + if not keep_schema and schema_name: + # Remove schema qualifiers that match schema_name + for tbl in tree.find_all(exp.Table): + db = tbl.args.get("db") + if db and db.name.strip('`"') == schema_name: + tbl.set("db", None) + + sqlite_select = tree.sql(dialect="sqlite") + return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{sqlite_select};' + + def _build_create_view_sql(self, view_name: str) -> str: + """Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition.""" + # Try to obtain the view definition from information_schema.VIEWS + definition: t.Optional[str] = None + try: + self._mysql_cur_dict.execute( + """ + SELECT VIEW_DEFINITION AS `definition` + FROM information_schema.VIEWS + WHERE TABLE_SCHEMA = %s + AND TABLE_NAME = %s + """, + (self._mysql_database, view_name), + ) + row: t.Optional[t.Dict[str, RowItemType]] = self._mysql_cur_dict.fetchone() + if row is not None and row.get("definition") is not None: + val = row["definition"] + if isinstance(val, bytes): + try: + definition = val.decode() + except UnicodeDecodeError: + definition = str(val) + else: + definition = t.cast(str, val) + except mysql.connector.Error: + # Fall back to SHOW CREATE VIEW below + definition = None + + if not definition: + # Fallback: use SHOW CREATE VIEW and extract the SELECT part + try: + # Escape backticks in the MySQL view name for safe interpolation + safe_view_name = view_name.replace("`", "``") + self._mysql_cur.execute(f"SHOW CREATE VIEW `{safe_view_name}`") + res = self._mysql_cur.fetchone() + if res and len(res) >= 2: + create_stmt = res[1] + if isinstance(create_stmt, bytes): + try: + create_stmt_str = create_stmt.decode() + except UnicodeDecodeError: + create_stmt_str = str(create_stmt) + else: + create_stmt_str = t.cast(str, create_stmt) + # Extract the SELECT ... part after AS (supporting newlines) + m = re.search(r"\bAS\b\s*(.*)$", create_stmt_str, re.IGNORECASE | re.DOTALL) + if m: + definition = m.group(1).strip().rstrip(";") + else: + # As a last resort, try to use the full statement replacing the prefix + # Not ideal, but better than failing outright + idx = create_stmt_str.upper().find(" AS ") + if idx != -1: + definition = create_stmt_str[idx + 4 :].strip().rstrip(";") + except mysql.connector.Error: + pass + + if not definition: + raise sqlite3.Error(f"Unable to fetch definition for MySQL view '{view_name}'") + + return self._mysql_viewdef_to_sqlite( + view_name=view_name, + view_select_sql=definition, + schema_name=self._mysql_database, + ) + + def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None: + try: + if attempting_reconnect: + self._mysql.reconnect() + sql = self._build_create_view_sql(view_name) + self._sqlite_cur.execute(sql) + self._sqlite.commit() + except mysql.connector.Error as err: + if err.errno == errorcode.CR_SERVER_LOST: + if not attempting_reconnect: + self._logger.warning("Connection to MySQL server lost.\nAttempting to reconnect.") + self._create_view(view_name, True) + return + else: + self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") + raise + self._logger.error( + "MySQL failed reading view definition from view %s: %s", + view_name, + err, + ) + raise + except sqlite3.Error as err: + self._logger.error("SQLite failed creating view %s: %s", view_name, err) + raise + def _transfer_table_data( self, table_name: str, sql: str, total_records: int = 0, attempting_reconnect: bool = False ) -> None: @@ -693,6 +822,7 @@ def _transfer_table_data( total_records=total_records, attempting_reconnect=True, ) + return else: self._logger.warning("Connection to MySQL server lost.\nReconnection attempt aborted.") raise @@ -720,7 +850,7 @@ def transfer(self) -> None: self._mysql_cur_prepared.execute( """ - SELECT TABLE_NAME + SELECT TABLE_NAME, TABLE_TYPE FROM information_schema.TABLES WHERE TABLE_SCHEMA = SCHEMA() AND TABLE_NAME {exclude} IN ({placeholders}) @@ -730,25 +860,49 @@ def transfer(self) -> None: ), specific_tables, ) - tables: t.Iterable[RowItemType] = (row[0] for row in self._mysql_cur_prepared.fetchall()) + tables: t.Iterable[t.Tuple[str, str]] = ( + ( + str(row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0]), + str(row[1].decode() if isinstance(row[1], (bytes, bytearray)) else row[1]), + ) + for row in self._mysql_cur_prepared.fetchall() + ) else: # transfer all tables self._mysql_cur.execute( """ - SELECT TABLE_NAME + SELECT TABLE_NAME, TABLE_TYPE FROM information_schema.TABLES WHERE TABLE_SCHEMA = SCHEMA() """ ) - tables = (row[0].decode() for row in self._mysql_cur.fetchall()) # type: ignore[union-attr] + + def _coerce_row(row: t.Any) -> t.Tuple[str, str]: + try: + # Row like (name, type) + name = row[0].decode() if isinstance(row[0], (bytes, bytearray)) else row[0] + ttype = ( + row[1].decode() + if (isinstance(row, (list, tuple)) and len(row) > 1 and isinstance(row[1], (bytes, bytearray))) + else (row[1] if (isinstance(row, (list, tuple)) and len(row) > 1) else "BASE TABLE") + ) + return str(name), str(ttype) + except (TypeError, IndexError, UnicodeDecodeError): + # Fallback: treat as a single value name when row is not a 2-tuple or decoding fails + name = row.decode() if isinstance(row, (bytes, bytearray)) else str(row) + return name, "BASE TABLE" + + tables = (_coerce_row(row) for row in self._mysql_cur.fetchall()) try: # turn off foreign key checking in SQLite while transferring data self._sqlite_cur.execute("PRAGMA foreign_keys=OFF") - for table_name in tables: + for table_name, table_type in tables: if isinstance(table_name, bytes): table_name = table_name.decode() + if isinstance(table_type, bytes): + table_type = table_type.decode() self._logger.info( "%s%sTransferring table %s", @@ -761,10 +915,13 @@ def transfer(self) -> None: self._current_chunk_number = 0 if not self._without_tables: - # create the table - self._create_table(table_name) # type: ignore[arg-type] + # create the table or view + if table_type == "VIEW" and self._views_as_views: + self._create_view(table_name) # type: ignore[arg-type] + else: + self._create_table(table_name) # type: ignore[arg-type] - if not self._without_data: + if not self._without_data and not (table_type == "VIEW" and self._views_as_views): # get the size of the data if self._limit_rows > 0: # limit to the requested number of rows diff --git a/src/mysql_to_sqlite3/types.py b/src/mysql_to_sqlite3/types.py index eb0d488..8fe99b2 100644 --- a/src/mysql_to_sqlite3/types.py +++ b/src/mysql_to_sqlite3/types.py @@ -44,6 +44,7 @@ class MySQLtoSQLiteParams(TypedDict): without_tables: t.Optional[bool] without_data: t.Optional[bool] without_foreign_keys: t.Optional[bool] + views_as_views: t.Optional[bool] class MySQLtoSQLiteAttributes: @@ -81,6 +82,7 @@ class MySQLtoSQLiteAttributes: _vacuum: bool _without_data: bool _without_foreign_keys: bool + _views_as_views: bool # Tracking of SQLite index names and counters to ensure uniqueness when prefixing is disabled _seen_sqlite_index_names: t.Set[str] _sqlite_index_name_counters: t.Dict[str, int] diff --git a/tests/unit/test_cli_views_flag.py b/tests/unit/test_cli_views_flag.py new file mode 100644 index 0000000..8dec9a1 --- /dev/null +++ b/tests/unit/test_cli_views_flag.py @@ -0,0 +1,68 @@ +import typing as t + +import pytest +from click.testing import CliRunner + +from mysql_to_sqlite3.cli import cli as mysql2sqlite + + +class TestCLIViewsFlag: + def test_mysql_views_as_tables_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure --mysql-views-as-tables reaches MySQLtoSQLite as views_as_views=False (legacy materialization).""" + received_kwargs: t.Dict[str, t.Any] = {} + + class FakeConverter: + def __init__(self, **kwargs: t.Any) -> None: + received_kwargs.update(kwargs) + + def transfer(self) -> None: # pragma: no cover - nothing to do + return None + + # Patch the converter used by the CLI + monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter) + + runner = CliRunner() + result = runner.invoke( + mysql2sqlite, + [ + "-f", + "out.sqlite3", + "-d", + "db", + "-u", + "user", + "--mysql-views-as-tables", + ], + ) + assert result.exit_code == 0 + assert received_kwargs.get("views_as_views") is False + + def test_mysql_views_as_tables_short_flag_is_threaded(self, monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure -T (short for --mysql-views-as-tables) reaches MySQLtoSQLite as views_as_views=False.""" + received_kwargs: t.Dict[str, t.Any] = {} + + class FakeConverter: + def __init__(self, **kwargs: t.Any) -> None: + received_kwargs.update(kwargs) + + def transfer(self) -> None: # pragma: no cover - nothing to do + return None + + # Patch the converter used by the CLI + monkeypatch.setattr("mysql_to_sqlite3.cli.MySQLtoSQLite", FakeConverter) + + runner = CliRunner() + result = runner.invoke( + mysql2sqlite, + [ + "-f", + "out.sqlite3", + "-d", + "db", + "-u", + "user", + "-T", + ], + ) + assert result.exit_code == 0 + assert received_kwargs.get("views_as_views") is False diff --git a/tests/unit/test_create_and_transfer_reconnect.py b/tests/unit/test_create_and_transfer_reconnect.py new file mode 100644 index 0000000..ac8fdcd --- /dev/null +++ b/tests/unit/test_create_and_transfer_reconnect.py @@ -0,0 +1,75 @@ +import sqlite3 +from unittest.mock import MagicMock, patch + +import mysql.connector +import pytest +from mysql.connector import errorcode + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +def test_create_table_reconnect_on_server_lost_then_success() -> None: + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + + # Patch dependencies + inst._mysql = MagicMock() + inst._sqlite = MagicMock() + inst._sqlite_cur = MagicMock() + inst._logger = MagicMock() + + # First call to build SQL raises CR_SERVER_LOST; second returns a valid SQL + err = mysql.connector.Error(msg="lost", errno=errorcode.CR_SERVER_LOST) + + inst._build_create_table_sql = MagicMock(side_effect=[err, 'CREATE TABLE IF NOT EXISTS "t" ("id" INTEGER);']) + + inst._create_table("t") + + # Reconnect should have been attempted once + inst._mysql.reconnect.assert_called_once() + # executescript should have been called once with the returned SQL + inst._sqlite_cur.executescript.assert_called_once() + inst._sqlite.commit.assert_called_once() + + +def test_create_table_sqlite_error_is_logged_and_raised() -> None: + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + + inst._mysql = MagicMock() + inst._sqlite = MagicMock() + inst._sqlite_cur = MagicMock() + inst._logger = MagicMock() + + inst._build_create_table_sql = MagicMock(return_value='CREATE TABLE "t" ("id" INTEGER);') + inst._sqlite_cur.executescript.side_effect = sqlite3.Error("broken") + + with pytest.raises(sqlite3.Error): + inst._create_table("t") + + inst._logger.error.assert_called() + + +def test_transfer_table_data_reconnect_on_server_lost_then_success() -> None: + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + + inst._mysql = MagicMock() + inst._mysql_cur = MagicMock() + inst._sqlite = MagicMock() + inst._sqlite_cur = MagicMock() + inst._logger = MagicMock() + inst._quiet = True + inst._chunk_size = None + + # First fetchall raises CR_SERVER_LOST; second returns rows + err = mysql.connector.Error(msg="lost", errno=errorcode.CR_SERVER_LOST) + inst._mysql_cur.fetchall.side_effect = [err, [(1,), (2,)]] + + inst._sqlite_cur.executemany = MagicMock() + + inst._transfer_table_data(table_name="t", sql="INSERT INTO t VALUES (?)", total_records=2) + + inst._mysql.reconnect.assert_called_once() + inst._sqlite_cur.executemany.assert_called_once() + inst._sqlite.commit.assert_called_once() diff --git a/tests/unit/test_create_view_reconnect_and_errors.py b/tests/unit/test_create_view_reconnect_and_errors.py new file mode 100644 index 0000000..cf67edd --- /dev/null +++ b/tests/unit/test_create_view_reconnect_and_errors.py @@ -0,0 +1,46 @@ +import sqlite3 +from unittest.mock import MagicMock, patch + +import mysql.connector +import pytest +from mysql.connector import errorcode + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +def test_create_view_reconnect_on_server_lost_then_success() -> None: + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + + inst._mysql = MagicMock() + inst._sqlite = MagicMock() + inst._sqlite_cur = MagicMock() + inst._logger = MagicMock() + + # First build fails with CR_SERVER_LOST, second returns valid SQL + err = mysql.connector.Error(msg="lost", errno=errorcode.CR_SERVER_LOST) + inst._build_create_view_sql = MagicMock(side_effect=[err, 'CREATE VIEW "v" AS SELECT 1;']) + + inst._create_view("v") + + inst._mysql.reconnect.assert_called_once() + inst._sqlite_cur.execute.assert_called_once() + inst._sqlite.commit.assert_called_once() + + +def test_create_view_sqlite_error_is_logged_and_raised() -> None: + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + + inst._mysql = MagicMock() + inst._sqlite = MagicMock() + inst._sqlite_cur = MagicMock() + inst._logger = MagicMock() + + inst._build_create_view_sql = MagicMock(return_value='CREATE VIEW "v" AS SELECT 1;') + inst._sqlite_cur.execute.side_effect = sqlite3.Error("broken") + + with pytest.raises(sqlite3.Error): + inst._create_view("v") + + inst._logger.error.assert_called() diff --git a/tests/unit/test_indices_prefix_and_uniqueness.py b/tests/unit/test_indices_prefix_and_uniqueness.py new file mode 100644 index 0000000..c0b5c01 --- /dev/null +++ b/tests/unit/test_indices_prefix_and_uniqueness.py @@ -0,0 +1,86 @@ +from unittest.mock import MagicMock, patch + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +def _make_instance_with_mocks(): + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + instance = MySQLtoSQLite() # type: ignore[call-arg] + instance._mysql_cur_dict = MagicMock() + instance._mysql_database = "db" + instance._sqlite_json1_extension_enabled = False + instance._collation = "BINARY" + instance._prefix_indices = False + instance._without_tables = False + instance._without_foreign_keys = True + instance._logger = MagicMock() + instance._sqlite_strict = False + # Track index names for uniqueness + instance._seen_sqlite_index_names = set() + instance._sqlite_index_name_counters = {} + return instance + + +def test_build_create_table_sql_prefix_indices_true_prefixes_index_names() -> None: + inst = _make_instance_with_mocks() + inst._prefix_indices = True + + # SHOW COLUMNS + inst._mysql_cur_dict.fetchall.side_effect = [ + [ + {"Field": "id", "Type": "INT", "Null": "NO", "Default": None, "Key": "PRI", "Extra": ""}, + {"Field": "name", "Type": "VARCHAR(10)", "Null": "YES", "Default": None, "Key": "", "Extra": ""}, + ], + # STATISTICS rows + [ + { + "name": "idx_name", + "primary": 0, + "unique": 0, + "auto_increment": 0, + "columns": "name", + "types": "VARCHAR(10)", + } + ], + ] + # TABLE collision check -> 0 + inst._mysql_cur_dict.fetchone.return_value = {"count": 0} + + sql = inst._build_create_table_sql("users") + + # With prefix_indices=True, the index name should be prefixed with table name + assert 'CREATE INDEX IF NOT EXISTS "users_idx_name" ON "users" ("name");' in sql + + +def test_build_create_table_sql_collision_renamed_and_uniqueness_suffix() -> None: + inst = _make_instance_with_mocks() + inst._prefix_indices = False + + # Pre-mark an index name as already used globally to force suffixing + inst._seen_sqlite_index_names.add("dup") + + # SHOW COLUMNS + inst._mysql_cur_dict.fetchall.side_effect = [ + [ + {"Field": "id", "Type": "INT", "Null": "NO", "Default": None, "Key": "", "Extra": ""}, + ], + # STATISTICS rows + [ + { + "name": "dup", # collides globally + "primary": 0, + "unique": 1, + "auto_increment": 0, + "columns": "id", + "types": "INT", + } + ], + ] + # TABLE collision check -> 1 so we also prefix with table name before uniqueness + inst._mysql_cur_dict.fetchone.return_value = {"count": 1} + + sql = inst._build_create_table_sql("accounts") + + # Proposed becomes accounts_dup, and since dup already used, unique name stays accounts_dup (no clash) + # or if accounts_dup was in the seen set, it would become accounts_dup_2. We only asserted the presence of accounts_ prefix. + assert 'CREATE UNIQUE INDEX IF NOT EXISTS "accounts_dup" ON "accounts" ("id");' in sql diff --git a/tests/unit/test_transporter.py b/tests/unit/test_transporter.py index 3c9d4fa..2618045 100644 --- a/tests/unit/test_transporter.py +++ b/tests/unit/test_transporter.py @@ -10,6 +10,34 @@ class TestMySQLtoSQLiteTransporter: + def test_transfer_creates_view_when_flag_enabled(self) -> None: + """When views_as_views is True, encountering a MySQL VIEW should create a SQLite VIEW and skip data transfer.""" + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + instance = MySQLtoSQLite() + # Configure minimal attributes used by transfer() + instance._mysql_tables = [] + instance._exclude_mysql_tables = [] + instance._mysql_cur = MagicMock() + # All-tables branch returns one VIEW + instance._mysql_cur.fetchall.return_value = [(b"my_view", b"VIEW")] + instance._sqlite_cur = MagicMock() + instance._without_data = False + instance._without_tables = False + instance._views_as_views = True + instance._vacuum = False + instance._logger = MagicMock() + + # Spy on methods to ensure correct calls + instance._create_view = MagicMock() + instance._create_table = MagicMock() + instance._transfer_table_data = MagicMock() + + instance.transfer() + + instance._create_view.assert_called_once_with("my_view") + instance._create_table.assert_not_called() + instance._transfer_table_data.assert_not_called() + def test_decode_column_type_with_string(self) -> None: """Test _decode_column_type with string input.""" assert MySQLtoSQLite._decode_column_type("VARCHAR") == "VARCHAR" @@ -398,3 +426,29 @@ def test_translate_default_from_mysql_to_sqlite_bytes(self) -> None: """Test _translate_default_from_mysql_to_sqlite with bytes default.""" result = MySQLtoSQLite._translate_default_from_mysql_to_sqlite(b"abc", column_type="BLOB") assert result.startswith("DEFAULT x'") + + +def test_transfer_coerce_row_fallback_non_subscriptable() -> None: + """Ensure transfer() handles rows that are not indexable by using fallback path in _coerce_row.""" + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + instance = MySQLtoSQLite() + # Configure minimal attributes used by transfer() + instance._mysql_tables = [] + instance._exclude_mysql_tables = [] + instance._mysql_cur = MagicMock() + # Return a non-subscriptable row (int) to trigger the except fallback branch + instance._mysql_cur.fetchall.return_value = [123] + instance._sqlite_cur = MagicMock() + # Skip creating tables/data to keep the test isolated + instance._without_data = True + instance._without_tables = True + instance._views_as_views = True + instance._vacuum = False + instance._logger = MagicMock() + + instance.transfer() + + # Confirm the logger received a transfer message with the coerced table name "123" + # The info call is like: ("%s%sTransferring table %s", prefix1, prefix2, table_name) + called_with_123 = any(call.args and call.args[-1] == "123" for call in instance._logger.info.call_args_list) + assert called_with_123 diff --git a/tests/unit/test_types_and_defaults_extra.py b/tests/unit/test_types_and_defaults_extra.py new file mode 100644 index 0000000..3317601 --- /dev/null +++ b/tests/unit/test_types_and_defaults_extra.py @@ -0,0 +1,74 @@ +import sqlite3 +import typing as t + +import pytest + +from mysql_to_sqlite3.sqlite_utils import CollatingSequences +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +class TestTypesAndDefaultsExtra: + def test_valid_column_type_and_length(self) -> None: + # COLUMN_PATTERN should match the type name without length + m = MySQLtoSQLite._valid_column_type("varchar(255)") + assert m is not None + assert m.group(0).lower() == "varchar" + # No parenthesis -> no length suffix + assert MySQLtoSQLite._column_type_length("int") == "" + # With parenthesis -> returns the (N) + assert MySQLtoSQLite._column_type_length("nvarchar(42)") == "(42)" + + def test_data_type_collation_sequence(self) -> None: + # Collation applies to textual affinity types only + assert ( + MySQLtoSQLite._data_type_collation_sequence(collation=CollatingSequences.NOCASE, column_type="VARCHAR(10)") + == f"COLLATE {CollatingSequences.NOCASE}" + ) + assert ( + MySQLtoSQLite._data_type_collation_sequence(collation=CollatingSequences.NOCASE, column_type="INTEGER") + == "" + ) + + @pytest.mark.parametrize( + "default,expected", + [ + ("curtime()", "DEFAULT CURRENT_TIME"), + ("curdate()", "DEFAULT CURRENT_DATE"), + ("now()", "DEFAULT CURRENT_TIMESTAMP"), + ], + ) + def test_translate_default_common_keywords(self, default: str, expected: str) -> None: + assert MySQLtoSQLite._translate_default_from_mysql_to_sqlite(default) == expected + + def test_translate_default_charset_introducer_str_hex_and_bin(self) -> None: + # DEFAULT_GENERATED with charset introducer and hex (escaped as in MySQL) + s = "_utf8mb4 X\\'41\\'" # hex for 'A' + out = MySQLtoSQLite._translate_default_from_mysql_to_sqlite( + s, column_type="BLOB", column_extra="DEFAULT_GENERATED" + ) + assert out == "DEFAULT x'41'" + # DEFAULT_GENERATED with charset introducer and binary literal (escaped) + s2 = "_utf8mb4 b\\'01000001\\'" # binary for 'A' + out2 = MySQLtoSQLite._translate_default_from_mysql_to_sqlite( + s2, column_type="BLOB", column_extra="DEFAULT_GENERATED" + ) + assert out2 == "DEFAULT 'A'" + + def test_translate_default_charset_introducer_bytes(self) -> None: + # Escaped form in bytes + s = b"_utf8mb4 x\\'41\\'" + out = MySQLtoSQLite._translate_default_from_mysql_to_sqlite( + s, column_type="BLOB", column_extra="DEFAULT_GENERATED" + ) + assert out == "DEFAULT x'41'" + + def test_translate_default_bool_non_boolean_type(self) -> None: + # When column_type is not BOOLEAN, booleans become '1'/'0' + assert MySQLtoSQLite._translate_default_from_mysql_to_sqlite(True, column_type="INTEGER") == "DEFAULT '1'" + assert MySQLtoSQLite._translate_default_from_mysql_to_sqlite(False, column_type="INTEGER") == "DEFAULT '0'" + + def test_translate_default_bool_boolean_type(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Ensure SQLite version satisfies condition + monkeypatch.setattr(sqlite3, "sqlite_version", "3.40.0") + assert MySQLtoSQLite._translate_default_from_mysql_to_sqlite(True, column_type="BOOLEAN") == "DEFAULT(TRUE)" + assert MySQLtoSQLite._translate_default_from_mysql_to_sqlite(False, column_type="BOOLEAN") == "DEFAULT(FALSE)" diff --git a/tests/unit/test_views_build_paths_extra.py b/tests/unit/test_views_build_paths_extra.py new file mode 100644 index 0000000..44dc760 --- /dev/null +++ b/tests/unit/test_views_build_paths_extra.py @@ -0,0 +1,58 @@ +import sqlite3 +from unittest.mock import MagicMock, patch + +import pytest + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +def _inst_with_mysql_dict(): + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_cur_dict = MagicMock() + inst._mysql_cur = MagicMock() + inst._mysql_database = "db" + return inst + + +def test_build_create_view_sql_information_schema_bytes_decode_failure_falls_back( + monkeypatch: pytest.MonkeyPatch, +) -> None: + inst = _inst_with_mysql_dict() + + # information_schema returns bytes that fail to decode in UTF-8 + bad_bytes = b"\xff\xfe\xfa" + inst._mysql_cur_dict.fetchone.return_value = {"definition": bad_bytes} + + captured = {} + + def fake_converter(*, view_select_sql: str, view_name: str, schema_name: str, keep_schema: bool = False) -> str: + captured["view_select_sql"] = view_select_sql + captured["view_name"] = view_name + captured["schema_name"] = schema_name + captured["keep_schema"] = keep_schema + return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS SELECT 1;' + + monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_converter)) + + sql = inst._build_create_view_sql("v_strange") + + # Converter was invoked with the string representation of the undecodable bytes + assert captured["view_name"] == "v_strange" + assert captured["schema_name"] == "db" + assert isinstance(captured["view_select_sql"], str) + # And a CREATE VIEW statement was produced + assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_strange" AS') + assert sql.strip().endswith(";") + + +def test_build_create_view_sql_raises_when_no_definition_available() -> None: + inst = _inst_with_mysql_dict() + + # information_schema path -> None + inst._mysql_cur_dict.fetchone.return_value = None + # SHOW CREATE VIEW returns None + inst._mysql_cur.fetchone.return_value = None + + with pytest.raises(sqlite3.Error): + inst._build_create_view_sql("missing_view") diff --git a/tests/unit/test_views_create_view.py b/tests/unit/test_views_create_view.py new file mode 100644 index 0000000..6205739 --- /dev/null +++ b/tests/unit/test_views_create_view.py @@ -0,0 +1,68 @@ +import typing as t +from unittest.mock import MagicMock, patch + +import mysql.connector +from mysql.connector import errorcode + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +def test_show_create_view_fallback_handles_newline_and_backticks(monkeypatch: "t.Any") -> None: + """ + Force the SHOW CREATE VIEW fallback path and verify: + - The executed SQL escapes backticks in the view name. + - The regex extracts the SELECT when it follows "AS\n" (across newline). + - The extracted SELECT (without trailing semicolon) is passed to _mysql_viewdef_to_sqlite. + """ + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + instance = MySQLtoSQLite() # type: ignore[call-arg] + + # Make information_schema path return None so fallback is used + instance._mysql_cur_dict = MagicMock() + instance._mysql_cur_dict.execute.return_value = None + instance._mysql_cur_dict.fetchone.return_value = None + + # Prepare SHOW CREATE VIEW return value with AS followed by newline + create_stmt = ( + "CREATE ALGORITHM=UNDEFINED DEFINER=`user`@`%` SQL SECURITY DEFINER " "VIEW `we``ird` AS\nSELECT 1 AS `x`;" + ) + executed_sql: t.List[str] = [] + + def capture_execute(sql: str) -> None: + executed_sql.append(sql) + + instance._mysql_cur = MagicMock() + instance._mysql_cur.execute.side_effect = capture_execute + instance._mysql_cur.fetchone.return_value = ("we`ird", create_stmt) + + # Capture the definition passed to _mysql_viewdef_to_sqlite and return a dummy SQL + captured: t.Dict[str, str] = {} + + def fake_mysql_viewdef_to_sqlite( + *, view_select_sql: str, view_name: str, schema_name: t.Optional[str] = None, keep_schema: bool = False + ) -> str: + captured["select"] = view_select_sql + captured["view_name"] = view_name + captured["schema_name"] = schema_name or "" + captured["keep_schema"] = str(keep_schema) + return 'CREATE VIEW IF NOT EXISTS "dummy" AS SELECT 1;' + + monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_mysql_viewdef_to_sqlite)) + + instance._mysql_database = "db" + + # Build the SQL (triggers fallback path) + sql = instance._build_create_view_sql("we`ird") + + # Assert backticks in the view name were escaped in the SHOW CREATE VIEW statement + assert executed_sql and executed_sql[0] == "SHOW CREATE VIEW `we``ird`" + + # The resulting SQL is our fake output + assert sql.startswith('CREATE VIEW IF NOT EXISTS "dummy" AS') + + # Ensure the extracted SELECT excludes the trailing semicolon and spans newlines + assert captured["select"] == "SELECT 1 AS `x`" + # Check view_name was threaded unchanged to the converter + assert captured["view_name"] == "we`ird" + # Schema name also provided + assert captured["schema_name"] == "db" diff --git a/tests/unit/test_views_sqlglot.py b/tests/unit/test_views_sqlglot.py new file mode 100644 index 0000000..0338f2d --- /dev/null +++ b/tests/unit/test_views_sqlglot.py @@ -0,0 +1,54 @@ +import re + +import pytest + +from mysql_to_sqlite3.transporter import MySQLtoSQLite + + +class TestViewsSqlglot: + def test_mysql_viewdef_to_sqlite_strips_schema_and_transpiles(self) -> None: + mysql_select = "SELECT `u`.`id`, `u`.`name` FROM `db`.`users` AS `u` WHERE `u`.`id` > 1" + sql = MySQLtoSQLite._mysql_viewdef_to_sqlite( + view_select_sql=mysql_select, + view_name="v_users", + schema_name="db", + ) + assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_users" AS') + # Ensure schema qualifier was removed + assert '"db".' not in sql + assert "`db`." not in sql + # Ensure it targets sqlite dialect (identifiers quoted with ") + assert 'FROM "users"' in sql + # Ends with single semicolon + assert re.search(r";\s*$", sql) is not None + + def test_mysql_viewdef_to_sqlite_parse_fallback(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Force parse_one to raise so we hit the fallback path + from sqlglot.errors import ParseError + + def boom(*args, **kwargs): + raise ParseError("boom") + + monkeypatch.setattr("mysql_to_sqlite3.transporter.parse_one", boom) + + sql_in = "SELECT 1" + out = MySQLtoSQLite._mysql_viewdef_to_sqlite( + view_select_sql=sql_in, + view_name="v1", + schema_name="db", + ) + assert out.startswith('CREATE VIEW IF NOT EXISTS "v1" AS') + assert "SELECT 1" in out + assert out.strip().endswith(";") + + def test_mysql_viewdef_to_sqlite_keep_schema_true_preserves_qualifiers(self) -> None: + mysql_select = "SELECT `u`.`id` FROM `db`.`users` AS `u`" + sql = MySQLtoSQLite._mysql_viewdef_to_sqlite( + view_select_sql=mysql_select, + view_name="v_users", + schema_name="db", + keep_schema=True, + ) + # Should not strip the schema when keep_schema=True + assert "`db`." in sql or '"db".' in sql + assert sql.strip().endswith(";")