diff --git a/src/mysql_to_sqlite3/transporter.py b/src/mysql_to_sqlite3/transporter.py index bea3f86..8ce806d 100644 --- a/src/mysql_to_sqlite3/transporter.py +++ b/src/mysql_to_sqlite3/transporter.py @@ -895,37 +895,37 @@ def _create_table(self, table_name: str, attempting_reconnect: bool = False) -> self._logger.error("SQLite failed creating table %s: %s", table_name, err) raise - @staticmethod - def _mysql_viewdef_to_sqlite( - view_select_sql: str, - view_name: str, - schema_name: t.Optional[str] = None, - keep_schema: bool = False, - ) -> str: - """ - Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement. - - If keep_schema is False and schema_name is provided, strip qualifiers like `example`.table. - If keep_schema is True, you must ATTACH the SQLite database as that schema name before using the view. - """ + def _mysql_viewdef_to_sqlite(self, view_select_sql: str, view_name: str) -> str: + """Convert a MySQL VIEW_DEFINITION (a SELECT ...) to a SQLite CREATE VIEW statement.""" # Normalize whitespace and avoid double semicolons in output cleaned_sql = view_select_sql.strip().rstrip(";") try: - tree = parse_one(cleaned_sql, read="mysql") - except (ParseError, ValueError, Exception): # pylint: disable=W0718 - # Fallback: return a basic CREATE VIEW using the original SELECT - return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{cleaned_sql};' - - if not keep_schema and schema_name: - # Remove schema qualifiers that match schema_name - for tbl in tree.find_all(exp.Table): - db = tbl.args.get("db") - if db and db.name.strip('`"') == schema_name: - tbl.set("db", None) - - sqlite_select = tree.sql(dialect="sqlite") - return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS\n{sqlite_select};' + tree: Expression = parse_one(cleaned_sql, read="mysql") + except (ParseError, ValueError, AttributeError, TypeError): + # Fallback: try to remove schema qualifiers if requested, then return + stripped_sql = cleaned_sql + # Remove qualifiers `schema`.tbl or "schema".tbl or schema.tbl + sn: str = re.escape(self._mysql_database) + for pat in (rf"`{sn}`\.", rf'"{sn}"\.', rf"\b{sn}\."): + stripped_sql = re.sub(pat, "", stripped_sql, flags=re.IGNORECASE) + view_ident = self._quote_sqlite_identifier(view_name) + return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{stripped_sql};" + + # Remove schema qualifiers that match schema_name on tables + for tbl in tree.find_all(exp.Table): + db = tbl.args.get("db") + if db and db.name.strip('`"').lower() == self._mysql_database.lower(): + tbl.set("db", None) + # Also remove schema qualifiers on fully-qualified columns (db.table.column) + for col in tree.find_all(exp.Column): + db = col.args.get("db") + if db and db.name.strip('`"').lower() == self._mysql_database.lower(): + col.set("db", None) + + sqlite_select: str = tree.sql(dialect="sqlite") + view_ident = self._quote_sqlite_identifier(view_name) + return f"CREATE VIEW IF NOT EXISTS {view_ident} AS\n{sqlite_select};" def _build_create_view_sql(self, view_name: str) -> str: """Build a CREATE VIEW statement for SQLite from a MySQL VIEW definition.""" @@ -990,7 +990,6 @@ def _build_create_view_sql(self, view_name: str) -> str: return self._mysql_viewdef_to_sqlite( view_name=view_name, view_select_sql=definition, - schema_name=self._mysql_database, ) def _create_view(self, view_name: str, attempting_reconnect: bool = False) -> None: diff --git a/tests/unit/test_views_build_paths_extra.py b/tests/unit/test_views_build_paths_extra.py index 44dc760..9fbcb00 100644 --- a/tests/unit/test_views_build_paths_extra.py +++ b/tests/unit/test_views_build_paths_extra.py @@ -26,11 +26,9 @@ def test_build_create_view_sql_information_schema_bytes_decode_failure_falls_bac captured = {} - def fake_converter(*, view_select_sql: str, view_name: str, schema_name: str, keep_schema: bool = False) -> str: + def fake_converter(*, view_select_sql: str, view_name: str) -> str: captured["view_select_sql"] = view_select_sql captured["view_name"] = view_name - captured["schema_name"] = schema_name - captured["keep_schema"] = keep_schema return f'CREATE VIEW IF NOT EXISTS "{view_name}" AS SELECT 1;' monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_converter)) @@ -39,7 +37,6 @@ def fake_converter(*, view_select_sql: str, view_name: str, schema_name: str, ke # Converter was invoked with the string representation of the undecodable bytes assert captured["view_name"] == "v_strange" - assert captured["schema_name"] == "db" assert isinstance(captured["view_select_sql"], str) # And a CREATE VIEW statement was produced assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_strange" AS') diff --git a/tests/unit/test_views_create_view.py b/tests/unit/test_views_create_view.py index 6205739..4d9c6f1 100644 --- a/tests/unit/test_views_create_view.py +++ b/tests/unit/test_views_create_view.py @@ -38,13 +38,9 @@ def capture_execute(sql: str) -> None: # Capture the definition passed to _mysql_viewdef_to_sqlite and return a dummy SQL captured: t.Dict[str, str] = {} - def fake_mysql_viewdef_to_sqlite( - *, view_select_sql: str, view_name: str, schema_name: t.Optional[str] = None, keep_schema: bool = False - ) -> str: + def fake_mysql_viewdef_to_sqlite(*, view_select_sql: str, view_name: str) -> str: captured["select"] = view_select_sql captured["view_name"] = view_name - captured["schema_name"] = schema_name or "" - captured["keep_schema"] = str(keep_schema) return 'CREATE VIEW IF NOT EXISTS "dummy" AS SELECT 1;' monkeypatch.setattr(MySQLtoSQLite, "_mysql_viewdef_to_sqlite", staticmethod(fake_mysql_viewdef_to_sqlite)) @@ -64,5 +60,3 @@ def fake_mysql_viewdef_to_sqlite( assert captured["select"] == "SELECT 1 AS `x`" # Check view_name was threaded unchanged to the converter assert captured["view_name"] == "we`ird" - # Schema name also provided - assert captured["schema_name"] == "db" diff --git a/tests/unit/test_views_sqlglot.py b/tests/unit/test_views_sqlglot.py index 0338f2d..d53e64e 100644 --- a/tests/unit/test_views_sqlglot.py +++ b/tests/unit/test_views_sqlglot.py @@ -1,4 +1,5 @@ import re +from unittest.mock import patch import pytest @@ -8,10 +9,13 @@ class TestViewsSqlglot: def test_mysql_viewdef_to_sqlite_strips_schema_and_transpiles(self) -> None: mysql_select = "SELECT `u`.`id`, `u`.`name` FROM `db`.`users` AS `u` WHERE `u`.`id` > 1" - sql = MySQLtoSQLite._mysql_viewdef_to_sqlite( + # Use an instance to ensure access to _mysql_database for stripping + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_database = "db" # type: ignore[attr-defined] + sql = inst._mysql_viewdef_to_sqlite( view_select_sql=mysql_select, view_name="v_users", - schema_name="db", ) assert sql.startswith('CREATE VIEW IF NOT EXISTS "v_users" AS') # Ensure schema qualifier was removed @@ -26,29 +30,75 @@ def test_mysql_viewdef_to_sqlite_parse_fallback(self, monkeypatch: pytest.Monkey # Force parse_one to raise so we hit the fallback path from sqlglot.errors import ParseError - def boom(*args, **kwargs): + def boom(*_, **__): raise ParseError("boom") monkeypatch.setattr("mysql_to_sqlite3.transporter.parse_one", boom) sql_in = "SELECT 1" - out = MySQLtoSQLite._mysql_viewdef_to_sqlite( + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_database = "db" # type: ignore[attr-defined] + out = inst._mysql_viewdef_to_sqlite( view_select_sql=sql_in, view_name="v1", - schema_name="db", ) assert out.startswith('CREATE VIEW IF NOT EXISTS "v1" AS') assert "SELECT 1" in out assert out.strip().endswith(";") - def test_mysql_viewdef_to_sqlite_keep_schema_true_preserves_qualifiers(self) -> None: + def test_mysql_viewdef_to_sqlite_parse_fallback_strips_schema(self, monkeypatch: pytest.MonkeyPatch) -> None: + # Force parse_one to raise so we exercise the fallback path with schema qualifiers + from sqlglot.errors import ParseError + + def boom(*_, **__): + raise ParseError("boom") + + monkeypatch.setattr("mysql_to_sqlite3.transporter.parse_one", boom) + + mysql_select = "SELECT `u`.`id` FROM `db`.`users` AS `u` WHERE `u`.`id` > 1" + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_database = "db" # type: ignore[attr-defined] + out = inst._mysql_viewdef_to_sqlite( + view_select_sql=mysql_select, + view_name="v_users", + ) + # Should not contain schema qualifier anymore + assert "`db`." not in out and '"db".' not in out and " db." not in out + # Should still reference the table name + assert "FROM `users`" in out or 'FROM "users"' in out or "FROM users" in out + assert out.strip().endswith(";") + + def test_mysql_viewdef_to_sqlite_strips_schema_from_qualified_columns_nested(self) -> None: + # Based on the user-reported example with nested subquery and fully-qualified columns + mysql_sql = ( + "select `p`.`instrument_id` AS `instrument_id`,`p`.`price_date` AS `price_date`,`p`.`close` AS `close` " + "from (`example`.`prices` `p` join (select `example`.`prices`.`instrument_id` AS `instrument_id`," + "max(`example`.`prices`.`price_date`) AS `max_date` from `example`.`prices` group by " + "`example`.`prices`.`instrument_id`) `t` on(((`t`.`instrument_id` = `p`.`instrument_id`) and " + "(`t`.`max_date` = `p`.`price_date`))))" + ) + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_database = "example" # type: ignore[attr-defined] + out = inst._mysql_viewdef_to_sqlite(view_select_sql=mysql_sql, view_name="v_prices") + # Ensure all schema qualifiers are removed, including on qualified columns inside subqueries + assert '"example".' not in out and "`example`." not in out and " example." not in out + # Still references the base table name + assert 'FROM "prices"' in out or 'FROM ("prices"' in out or "FROM prices" in out + assert out.strip().endswith(";") + + def test_mysql_viewdef_to_sqlite_strips_matching_schema_qualifiers(self) -> None: mysql_select = "SELECT `u`.`id` FROM `db`.`users` AS `u`" - sql = MySQLtoSQLite._mysql_viewdef_to_sqlite( + # Use instance for consistent attribute access + with patch.object(MySQLtoSQLite, "__init__", return_value=None): + inst = MySQLtoSQLite() # type: ignore[call-arg] + inst._mysql_database = "db" # type: ignore[attr-defined] + # Since keep_schema behavior is no longer parameterized, ensure that if schema matches current db, it is stripped + sql = inst._mysql_viewdef_to_sqlite( view_select_sql=mysql_select, view_name="v_users", - schema_name="db", - keep_schema=True, ) - # Should not strip the schema when keep_schema=True - assert "`db`." in sql or '"db".' in sql + assert "`db`." not in sql and '"db".' not in sql assert sql.strip().endswith(";")