From 0654ed21839f1c8421e322085708371793acc0f9 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Thu, 23 Oct 2025 20:44:36 +0100 Subject: [PATCH 1/9] :sparkles: enhance SQLite to MySQL transfer by supporting expression defaults --- src/sqlite3_to_mysql/transporter.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index bd847a4..bc2f934 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -534,6 +534,19 @@ def _translate_default_for_mysql(self, column_type: str, default: str) -> str: return s # Fallback: return stripped expression (MySQL 8.0.13+ allows expression defaults) + if self._allow_expr_defaults: + try: + expr = sqlglot.parse_one(s, read="sqlite") + except sqlglot_errors.ParseError: + return s + + expr = expr.transform(self._rewrite_sqlite_view_functions) + + try: + return expr.sql(dialect="mysql") + except sqlglot_errors.SqlglotError: + return s + return s @classmethod From 30ce4bbe8f5a60e270e7741660f92d95035e85cd Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Thu, 23 Oct 2025 20:44:52 +0100 Subject: [PATCH 2/9] :white_check_mark: add tests for translating default values in MySQL with sqlglot strftime modifier --- tests/unit/sqlite3_to_mysql_test.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 766b428..6726724 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -1197,6 +1197,16 @@ def test_time_mapping_from_sqlite_now_respects_fsp(self): == "CURRENT_TIME" ) + def test_translate_default_for_mysql_sqlglot_strftime_modifier(self): + instance = self._mk(expr=True, ts_dt=True, fsp=True) + result = instance._translate_default_for_mysql("VARCHAR(32)", "strftime('%Y-%m-%d', 'now', 'utc')") + assert result == "DATE_FORMAT(UTC_TIMESTAMP(), '%Y-%m-%d')" + + def test_translate_default_for_mysql_sqlglot_requires_expr_support(self): + instance = self._mk(expr=False, ts_dt=True, fsp=True) + result = instance._translate_default_for_mysql("VARCHAR(32)", "strftime('%Y-%m-%d', 'now', 'utc')") + assert result == "strftime('%Y-%m-%d', 'now', 'utc')" + def test_translate_sqlite_view_definition_current_timestamp(self): instance = SQLite3toMySQL.__new__(SQLite3toMySQL) result = instance._translate_sqlite_view_definition( From fac6c43f2b0fba5fe2285f2ef647aede09d537e1 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Thu, 23 Oct 2025 21:20:26 +0100 Subject: [PATCH 3/9] :sparkles: add normalization for SQLite column types during MySQL translation --- src/sqlite3_to_mysql/transporter.py | 43 +++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index bc2f934..3cb1b6f 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -330,6 +330,49 @@ def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]: return cls.COLUMN_PATTERN.match(column_type.strip()) def _translate_type_from_sqlite_to_mysql(self, column_type: str) -> str: + normalized: t.Optional[str] = self._normalize_sqlite_column_type(column_type) + if normalized and normalized.upper() != column_type.upper(): + try: + return self._translate_type_from_sqlite_to_mysql_legacy(normalized) + except ValueError: + pass + return self._translate_type_from_sqlite_to_mysql_legacy(column_type) + + def _normalize_sqlite_column_type(self, column_type: str) -> t.Optional[str]: + clean_type: str = column_type.strip() + if not clean_type: + return None + + normalized_for_parse: str = clean_type.upper().replace("UNSIGNED BIG INT", "BIGINT UNSIGNED") + try: + expression = sqlglot.parse_one(f"SELECT CAST(NULL AS {normalized_for_parse})", read="sqlite") + except sqlglot_errors.ParseError: + return None + + cast: t.Optional[exp.Cast] = expression.find(exp.Cast) + if not cast or not isinstance(cast.to, exp.DataType): + return None + + params: t.List[str] = [] + for expr_param in cast.to.expressions or []: + value_expr = expr_param.this if isinstance(expr_param, exp.DataTypeParam) else expr_param + if value_expr is None: + continue + params.append(value_expr.sql(dialect="mysql")) + + base_match: t.Optional[t.Match[str]] = self._valid_column_type(clean_type) + base = base_match.group(0).upper().strip() if base_match else clean_type.upper() + + normalized = base + if params: + normalized += "(" + ",".join(param.strip("\"'") for param in params) + ")" + + if "UNSIGNED" in clean_type.upper() and "UNSIGNED" not in normalized.upper().split(): + normalized = f"{normalized} UNSIGNED" + + return normalized + + def _translate_type_from_sqlite_to_mysql_legacy(self, column_type: str) -> str: """This could be optimized even further, however is seems adequate.""" full_column_type: str = column_type.upper() unsigned: bool = self.COLUMN_UNSIGNED_PATTERN.search(full_column_type) is not None From b0588436a0b7cb855956a2ad8f98b866feedc108 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Thu, 23 Oct 2025 21:20:36 +0100 Subject: [PATCH 4/9] :white_check_mark: add type normalization tests for SQLite to MySQL translation --- tests/unit/sqlite3_to_mysql_test.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 6726724..22d6bf3 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -1146,6 +1146,18 @@ def _mk(*, expr: bool, ts_dt: bool, fsp: bool) -> SQLite3toMySQL: instance._allow_fsp = fsp # MySQL >= 5.6.4 return instance + @staticmethod + def _type_instance( + mysql_integer_type: str = "INT(11)", + mysql_string_type: str = "VARCHAR(255)", + mysql_text_type: str = "TEXT", + ) -> SQLite3toMySQL: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._mysql_integer_type = mysql_integer_type + instance._mysql_string_type = mysql_string_type + instance._mysql_text_type = mysql_text_type + return instance + @pytest.mark.parametrize( "col, default, flags, expected", [ @@ -1207,6 +1219,16 @@ def test_translate_default_for_mysql_sqlglot_requires_expr_support(self): result = instance._translate_default_for_mysql("VARCHAR(32)", "strftime('%Y-%m-%d', 'now', 'utc')") assert result == "strftime('%Y-%m-%d', 'now', 'utc')" + def test_translate_type_from_sqlite_to_mysql_sqlglot_normalizes_spacing(self): + proc = self._type_instance() + assert proc._translate_type_from_sqlite_to_mysql("NUMERIC ( 10 , 5 )") == "DECIMAL(10,5)" + assert proc._translate_type_from_sqlite_to_mysql("varchar ( 12 )") == "VARCHAR(12)" + assert proc._translate_type_from_sqlite_to_mysql("CHAR ( 7 )") == "CHAR(7)" + + def test_translate_type_from_sqlite_to_mysql_sqlglot_preserves_unsigned(self): + proc = self._type_instance() + assert proc._translate_type_from_sqlite_to_mysql("numeric(8, 3) unsigned") == "DECIMAL(8,3) UNSIGNED" + def test_translate_sqlite_view_definition_current_timestamp(self): instance = SQLite3toMySQL.__new__(SQLite3toMySQL) result = instance._translate_sqlite_view_definition( From 94b1b9d2a29596337f5d22c9ca0eb1a51e968300 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 26 Oct 2025 13:59:14 +0000 Subject: [PATCH 5/9] :white_check_mark: normalize MySQL type strings to uppercase in SQLite3toMySQL instance creation --- tests/unit/sqlite3_to_mysql_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 22d6bf3..740e2e5 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -1153,9 +1153,9 @@ def _type_instance( mysql_text_type: str = "TEXT", ) -> SQLite3toMySQL: instance = SQLite3toMySQL.__new__(SQLite3toMySQL) - instance._mysql_integer_type = mysql_integer_type - instance._mysql_string_type = mysql_string_type - instance._mysql_text_type = mysql_text_type + instance._mysql_integer_type = mysql_integer_type.upper() + instance._mysql_string_type = mysql_string_type.upper() + instance._mysql_text_type = mysql_text_type.upper() return instance @pytest.mark.parametrize( From 5ae15f5cbe023d0d372816c5e2aaee0634f77a25 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 26 Oct 2025 14:42:39 +0000 Subject: [PATCH 6/9] :recycle: log normalized SQLite column types and improve expression default handling --- src/sqlite3_to_mysql/transporter.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index 3cb1b6f..64a22f5 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -332,6 +332,7 @@ def _valid_column_type(cls, column_type: str) -> t.Optional[t.Match[str]]: def _translate_type_from_sqlite_to_mysql(self, column_type: str) -> str: normalized: t.Optional[str] = self._normalize_sqlite_column_type(column_type) if normalized and normalized.upper() != column_type.upper(): + self._logger.info("Normalised SQLite column type %r -> %r", column_type, normalized) try: return self._translate_type_from_sqlite_to_mysql_legacy(normalized) except ValueError: @@ -347,7 +348,12 @@ def _normalize_sqlite_column_type(self, column_type: str) -> t.Optional[str]: try: expression = sqlglot.parse_one(f"SELECT CAST(NULL AS {normalized_for_parse})", read="sqlite") except sqlglot_errors.ParseError: - return None + # Retry: strip UNSIGNED to aid parsing; we'll re-attach it below if present. + try: + no_unsigned = re.sub(r"\bUNSIGNED\b", "", normalized_for_parse).strip() + expression = sqlglot.parse_one(f"SELECT CAST(NULL AS {no_unsigned})", read="sqlite") + except sqlglot_errors.ParseError: + return None cast: t.Optional[exp.Cast] = expression.find(exp.Cast) if not cast or not isinstance(cast.to, exp.DataType): @@ -535,7 +541,8 @@ def _translate_default_for_mysql(self, column_type: str, default: str) -> str: ) and self._allow_expr_defaults ): - # Too old for expression defaults on DATE → fall back + if not self._allow_expr_defaults: + return "" return "CURRENT_DATE" # TIME @@ -549,7 +556,8 @@ def _translate_default_for_mysql(self, column_type: str, default: str) -> str: ) and self._allow_expr_defaults ): - # Too old for expression defaults on TIME → fall back + if not self._allow_expr_defaults: + return "" len_match = self.COLUMN_LENGTH_PATTERN.search(column_type) fsp = "" if self._allow_fsp and len_match: From 9b63359edd1a4dd048940584089f15bb2390704f Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 26 Oct 2025 14:42:46 +0000 Subject: [PATCH 7/9] :recycle: refactor test setup for SQLite to MySQL translation and remove unused type_instance method --- tests/unit/sqlite3_to_mysql_test.py | 47 ++++++++++++++++++----------- 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 740e2e5..498e77d 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -1146,18 +1146,6 @@ def _mk(*, expr: bool, ts_dt: bool, fsp: bool) -> SQLite3toMySQL: instance._allow_fsp = fsp # MySQL >= 5.6.4 return instance - @staticmethod - def _type_instance( - mysql_integer_type: str = "INT(11)", - mysql_string_type: str = "VARCHAR(255)", - mysql_text_type: str = "TEXT", - ) -> SQLite3toMySQL: - instance = SQLite3toMySQL.__new__(SQLite3toMySQL) - instance._mysql_integer_type = mysql_integer_type.upper() - instance._mysql_string_type = mysql_string_type.upper() - instance._mysql_text_type = mysql_text_type.upper() - return instance - @pytest.mark.parametrize( "col, default, flags, expected", [ @@ -1219,14 +1207,40 @@ def test_translate_default_for_mysql_sqlglot_requires_expr_support(self): result = instance._translate_default_for_mysql("VARCHAR(32)", "strftime('%Y-%m-%d', 'now', 'utc')") assert result == "strftime('%Y-%m-%d', 'now', 'utc')" - def test_translate_type_from_sqlite_to_mysql_sqlglot_normalizes_spacing(self): - proc = self._type_instance() + def test_translate_type_from_sqlite_to_mysql_sqlglot_normalizes_spacing( + self, + sqlite_database: str, + mysql_database: Engine, + mysql_credentials: MySQLCredentials, + ) -> None: + proc: SQLite3toMySQL = SQLite3toMySQL( + sqlite_file=sqlite_database, + mysql_user=mysql_credentials.user, + mysql_password=mysql_credentials.password, + mysql_host=mysql_credentials.host, + mysql_port=mysql_credentials.port, + mysql_database=mysql_credentials.database, + quiet=True, + ) assert proc._translate_type_from_sqlite_to_mysql("NUMERIC ( 10 , 5 )") == "DECIMAL(10,5)" assert proc._translate_type_from_sqlite_to_mysql("varchar ( 12 )") == "VARCHAR(12)" assert proc._translate_type_from_sqlite_to_mysql("CHAR ( 7 )") == "CHAR(7)" - def test_translate_type_from_sqlite_to_mysql_sqlglot_preserves_unsigned(self): - proc = self._type_instance() + def test_translate_type_from_sqlite_to_mysql_sqlglot_preserves_unsigned( + self, + sqlite_database: str, + mysql_database: Engine, + mysql_credentials: MySQLCredentials, + ): + proc: SQLite3toMySQL = SQLite3toMySQL( + sqlite_file=sqlite_database, + mysql_user=mysql_credentials.user, + mysql_password=mysql_credentials.password, + mysql_host=mysql_credentials.host, + mysql_port=mysql_credentials.port, + mysql_database=mysql_credentials.database, + quiet=True, + ) assert proc._translate_type_from_sqlite_to_mysql("numeric(8, 3) unsigned") == "DECIMAL(8,3) UNSIGNED" def test_translate_sqlite_view_definition_current_timestamp(self): @@ -1289,7 +1303,6 @@ def test_init_chunk_parameter_conversion( self, sqlite_database: str, mysql_credentials: MySQLCredentials, - mocker: MockFixture, ) -> None: """Verify chunk parameter is correctly converted to integer _chunk_size.""" # Chunk=2 should yield _chunk_size == 2 From 9bdc947cdb6e33e387c7dc719aed29e052694453 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 26 Oct 2025 14:52:12 +0000 Subject: [PATCH 8/9] :rewind: revert expression default handling for DATE and TIME to fallback for older versions --- src/sqlite3_to_mysql/transporter.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index 64a22f5..905008f 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -541,8 +541,7 @@ def _translate_default_for_mysql(self, column_type: str, default: str) -> str: ) and self._allow_expr_defaults ): - if not self._allow_expr_defaults: - return "" + # Too old for expression defaults on DATE → fall back return "CURRENT_DATE" # TIME @@ -556,8 +555,7 @@ def _translate_default_for_mysql(self, column_type: str, default: str) -> str: ) and self._allow_expr_defaults ): - if not self._allow_expr_defaults: - return "" + # Too old for expression defaults on TIME → fall back len_match = self.COLUMN_LENGTH_PATTERN.search(column_type) fsp = "" if self._allow_fsp and len_match: From cfe1e06340ed39e637da1ca2841a90fc2dcde273 Mon Sep 17 00:00:00 2001 From: Klemen Tusar Date: Sun, 26 Oct 2025 19:13:59 +0000 Subject: [PATCH 9/9] :white_check_mark: add tests for rewriting SQLite view functions with date and time defaults --- tests/unit/sqlite3_to_mysql_test.py | 483 ++++++++++++++++++++++++++++ 1 file changed, 483 insertions(+) diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 498e77d..5cb67bf 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -1,6 +1,7 @@ import importlib import logging import re +import sqlite3 import sys import types import typing as t @@ -273,6 +274,15 @@ def test_rewrite_sqlite_view_functions_datetime_now_localtime() -> None: assert transformed.sql(dialect="mysql") == "CURRENT_TIMESTAMP()" +def test_rewrite_sqlite_view_functions_date_now_defaults() -> None: + node = exp.Anonymous( + this=exp.Identifier(this="DATE"), + expressions=[exp.Literal.string("now")], + ) + transformed = SQLite3toMySQL._rewrite_sqlite_view_functions(node) + assert isinstance(transformed, exp.CurrentDate) + + def test_rewrite_sqlite_view_functions_strftime_now() -> None: node = exp.Anonymous( this=exp.Identifier(this="STRFTIME"), @@ -324,6 +334,39 @@ def test_rewrite_sqlite_view_functions_time_to_str() -> None: assert transformed.args["format"].this == "%H:%i" +def test_rewrite_sqlite_view_functions_date_now_utc() -> None: + node = exp.Anonymous( + this=exp.Identifier(this="DATE"), + expressions=[exp.Literal.string("now"), exp.Literal.string("utc")], + ) + transformed = SQLite3toMySQL._rewrite_sqlite_view_functions(node) + assert isinstance(transformed, exp.Anonymous) + assert transformed.sql(dialect="mysql") == "UTC_DATE()" + + +def test_rewrite_sqlite_view_functions_time_now_utc() -> None: + node = exp.Anonymous( + this=exp.Identifier(this="TIME"), + expressions=[exp.Literal.string("now"), exp.Literal.string("utc")], + ) + transformed = SQLite3toMySQL._rewrite_sqlite_view_functions(node) + assert isinstance(transformed, exp.UtcTime) + assert transformed.sql(dialect="mysql") == "UTC_TIME()" + + +def test_rewrite_sqlite_view_functions_time_to_str_with_modifier_list() -> None: + node = exp.TimeToStr( + this=exp.TsOrDsToTimestamp( + this=exp.Literal.string("now"), + expressions=[exp.Literal.string("utc")], + ), + format=exp.Literal.string("%H"), + ) + transformed = SQLite3toMySQL._rewrite_sqlite_view_functions(node) + assert isinstance(transformed, exp.TimeToStr) + assert isinstance(transformed.this, exp.UtcTimestamp) + + def _make_transfer_stub(mocker: MockFixture) -> SQLite3toMySQL: instance = SQLite3toMySQL.__new__(SQLite3toMySQL) instance._sqlite_tables = tuple() @@ -464,6 +507,22 @@ def test_transfer_skips_views_without_sql(mocker: MockFixture) -> None: assert instance._logger.warning.called +def test_transfer_truncates_tables_when_requested(mocker: MockFixture) -> None: + instance = _make_transfer_stub(mocker) + instance._mysql_truncate_tables = True + instance._fetch_sqlite_master_rows = mocker.MagicMock( + side_effect=[ + [{"name": "tbl", "type": "table"}], + [], + ] + ) + instance._sqlite_cur.fetchone.return_value = {"total_records": 0} + + instance.transfer() + + instance._truncate_table.assert_called_once_with("tbl") + + def test_transfer_table_data_without_chunk(mocker: MockFixture) -> None: instance = SQLite3toMySQL.__new__(SQLite3toMySQL) instance._chunk_size = None @@ -774,6 +833,56 @@ def execute(self, statement): sqlite_engine.dispose() + def test_init_mysql_socket_missing_raises(self, sqlite_database: str) -> None: + with pytest.raises(FileNotFoundError): + SQLite3toMySQL( # type: ignore[call-arg] + sqlite_file=sqlite_database, + mysql_user="user", + mysql_password="pass", + mysql_socket="/tmp/definitely_missing.sock", + ) + + def test_init_conflicting_table_filters_raises(self, sqlite_database: str) -> None: + with pytest.raises(ValueError): + SQLite3toMySQL( # type: ignore[call-arg] + sqlite_file=sqlite_database, + mysql_user="user", + mysql_password="pass", + sqlite_tables=("include",), + exclude_sqlite_tables=("exclude",), + ) + + def test_init_normalizes_insert_method_text_type_and_collation( + self, + sqlite_database: str, + mocker: MockFixture, + ) -> None: + fake_cursor = mocker.MagicMock() + fake_cursor.fetchone.return_value = ("version", "8.0.30") + fake_connection = mocker.MagicMock(spec=mysql.connector.MySQLConnection) + fake_connection.cursor.return_value = fake_cursor + fake_connection.is_connected.return_value = True + + mocker.patch("sqlite3_to_mysql.transporter.mysql.connector.connect", return_value=fake_connection) + mocker.patch( + "sqlite3_to_mysql.transporter.CharacterSet.get_default_collation", + return_value=["utf8mb4_0900_ai_ci"], + ) + + proc = SQLite3toMySQL( # type: ignore[call-arg] + sqlite_file=sqlite_database, + mysql_user="user", + mysql_password="pass", + mysql_database="demo", + mysql_insert_method="unsupported", + mysql_text_type="customtext", + mysql_charset="utf8mb4", + ) + + assert proc._mysql_insert_method == "IGNORE" + assert proc._mysql_text_type == "TEXT" + assert proc._mysql_collation == "utf8mb4_unicode_ci" + @pytest.mark.parametrize("quiet", [False, True]) def test_process_cursor_error( self, @@ -853,6 +962,313 @@ def execute(self, statement): sqlite_engine.dispose() + def test_create_table_skips_hidden_columns(self, mocker: MockerFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite_table_xinfo_support = True + instance._sqlite_quote_ident = lambda name: name.replace('"', '""') + instance._mysql_charset = "utf8mb4" + instance._mysql_collation = "utf8mb4_unicode_ci" + instance._logger = mocker.MagicMock() + + rows = [ + {"name": "id", "type": "INTEGER", "notnull": 1, "dflt_value": None, "pk": 1, "hidden": 0}, + {"name": "secret", "type": "TEXT", "notnull": 0, "dflt_value": "0", "pk": 0, "hidden": 1}, + ] + + sqlite_cursor = mocker.MagicMock() + sqlite_cursor.fetchall.return_value = rows + instance._sqlite_cur = sqlite_cursor + + instance._translate_type_from_sqlite_to_mysql = mocker.MagicMock(return_value="INT(11)") + instance._translate_default_for_mysql = mocker.MagicMock(return_value="") + + mysql_cursor = mocker.MagicMock() + instance._mysql_cur = mysql_cursor + instance._mysql = mocker.MagicMock() + + instance._create_table("demo") + + executed_sql = mysql_cursor.execute.call_args[0][0] + assert "`secret`" not in executed_sql + assert "`id` INT(11)" in executed_sql + + def test_create_table_invalid_default_retries_without_defaults(self, mocker: MockerFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite_table_xinfo_support = False + instance._sqlite_quote_ident = lambda name: name.replace('"', '""') + instance._mysql_charset = "utf8mb4" + instance._mysql_collation = "utf8mb4_unicode_ci" + instance._logger = mocker.MagicMock() + + rows = [ + {"name": "created_at", "type": "DATETIME", "notnull": 0, "dflt_value": "CURRENT_TIMESTAMP", "pk": 0}, + ] + + sqlite_cursor = mocker.MagicMock() + sqlite_cursor.fetchall.side_effect = [rows, rows] + instance._sqlite_cur = sqlite_cursor + + instance._translate_type_from_sqlite_to_mysql = mocker.MagicMock(return_value="DATETIME") + instance._translate_default_for_mysql = mocker.MagicMock(return_value="CURRENT_TIMESTAMP") + + mysql_cursor = mocker.MagicMock() + mysql_cursor.execute.side_effect = [ + mysql.connector.Error(msg="bad default", errno=errorcode.ER_INVALID_DEFAULT), + None, + ] + instance._mysql_cur = mysql_cursor + instance._mysql = mocker.MagicMock() + + instance._create_table("events") + + assert mysql_cursor.execute.call_count == 2 + retry_sql = mysql_cursor.execute.call_args_list[1][0][0] + assert "DEFAULT CURRENT_TIMESTAMP" not in retry_sql + instance._logger.warning.assert_called_once() + + def test_truncate_table_executes_when_table_exists(self, mocker: MockerFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + cursor = mocker.MagicMock() + cursor.fetchall.return_value = [("demo",)] + instance._mysql_cur = cursor + instance._mysql_database = "test_db" + instance._logger = mocker.MagicMock() + + instance._truncate_table("demo") + + assert cursor.execute.call_count == 2 + assert cursor.execute.call_args_list[1][0][0].startswith("TRUNCATE TABLE") + instance._logger.info.assert_called_once() + + def test_add_indices_uses_fulltext_when_supported(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite_quote_ident = lambda name: name.replace('"', '""') + instance._use_fulltext = True + instance._mysql_fulltext_support = True + instance._logger = mocker.MagicMock() + + sqlite_cursor = mocker.MagicMock() + sqlite_cursor.fetchall.side_effect = [ + [{"name": "textcol", "type": "TEXT"}], + [{"name": "idx_text", "unique": 0, "origin": "c"}], + [{"name": "textcol"}], + ] + instance._sqlite_cur = sqlite_cursor + + add_index = mocker.patch.object(instance, "_add_index") + + instance._add_indices("demo") + + assert add_index.call_count == 1 + kwargs = add_index.call_args.kwargs + assert kwargs["index_type"] == "FULLTEXT" + assert "`textcol`" in kwargs["index_columns"] + + def test_add_indices_handles_missing_column_metadata(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite_quote_ident = lambda name: name.replace('"', '""') + instance._use_fulltext = False + instance._mysql_fulltext_support = False + instance._logger = mocker.MagicMock() + + sqlite_cursor = mocker.MagicMock() + sqlite_cursor.fetchall.side_effect = [ + [], + [{"name": "idx_missing", "unique": 0, "origin": "c"}], + [{"name": "missing"}], + ] + instance._sqlite_cur = sqlite_cursor + + add_index = mocker.patch.object(instance, "_add_index") + + instance._add_indices("demo") + + add_index.assert_not_called() + instance._logger.warning.assert_called_once() + + def test_add_indices_retries_without_fulltext_on_bad_column(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite_quote_ident = lambda name: name.replace('"', '""') + instance._use_fulltext = True + instance._mysql_fulltext_support = True + instance._logger = mocker.MagicMock() + + sqlite_cursor = mocker.MagicMock() + sqlite_cursor.fetchall.side_effect = [ + [{"name": "textcol", "type": "TEXT"}], + [{"name": "idx_text", "unique": 0, "origin": "c"}], + [{"name": "textcol"}], + ] + instance._sqlite_cur = sqlite_cursor + + add_index = mocker.patch.object( + instance, + "_add_index", + side_effect=[ + mysql.connector.Error(msg="bad ft column", errno=errorcode.ER_BAD_FT_COLUMN), + None, + ], + ) + + instance._add_indices("demo") + + assert add_index.call_count == 2 + assert add_index.call_args_list[0].kwargs["index_type"] == "FULLTEXT" + assert add_index.call_args_list[1].kwargs["index_type"] == "INDEX" + + def test_get_mysql_version_missing_row_raises(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + cursor = mocker.MagicMock() + cursor.fetchone.return_value = None + instance._mysql_cur = cursor + instance._logger = mocker.MagicMock() + + with pytest.raises(mysql.connector.Error): + instance._get_mysql_version() + + instance._logger.error.assert_called() + + def test_get_sqlite_version_error_raises(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + cursor = mocker.MagicMock() + cursor.execute.side_effect = sqlite3.Error("boom") + instance._sqlite_cur = cursor + instance._logger = mocker.MagicMock() + + with pytest.raises(sqlite3.Error): + instance._get_sqlite_version() + + instance._logger.error.assert_called() + + def test_sqlite_table_has_rowid_handles_operational_error(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + cursor = mocker.MagicMock() + cursor.execute.side_effect = sqlite3.OperationalError("no rowid") + instance._sqlite_cur = cursor + + assert instance._sqlite_table_has_rowid("problematic") is False + + def test_translate_type_recovers_from_normalized_failure(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._logger = mocker.MagicMock() + mocker.patch.object(instance, "_normalize_sqlite_column_type", return_value="NUMERIC(5)") + mocker.patch.object( + SQLite3toMySQL, + "_translate_type_from_sqlite_to_mysql_legacy", + side_effect=[ValueError("bad type"), "VARCHAR(255)"], + ) + + result = instance._translate_type_from_sqlite_to_mysql("numeric") + + assert result == "VARCHAR(255)" + + def test_add_index_duplicate_keyname_retries_with_suffix(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._mysql_cur = mocker.MagicMock() + instance._mysql_cur.execute.side_effect = [ + mysql.connector.Error(msg="dup key", errno=errorcode.ER_DUP_KEYNAME), + None, + ] + instance._mysql = mocker.MagicMock() + instance._logger = mocker.MagicMock() + instance._ignore_duplicate_keys = False + + index = {"name": "idx_demo", "unique": 0} + index_infos = ({"name": "col"},) + + SQLite3toMySQL._add_index( + instance, + table_name="demo", + index_type="INDEX", + index=index, + index_columns="`col`", + index_infos=index_infos, + ) + + assert instance._mysql_cur.execute.call_count == 2 + instance._logger.warning.assert_called_with( + 'Duplicate key "%s" in table %s detected! Trying to create new key "%s_%s" ...', + "idx_demo", + "demo", + "idx_demo", + 1, + ) + + def test_add_index_duplicate_keyname_ignored_when_configured(self, mocker: MockFixture) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._mysql_cur = mocker.MagicMock() + instance._mysql_cur.execute.side_effect = mysql.connector.Error( + msg="dup key", + errno=errorcode.ER_DUP_KEYNAME, + ) + instance._mysql = mocker.MagicMock() + instance._logger = mocker.MagicMock() + instance._ignore_duplicate_keys = True + + index = {"name": "idx_demo", "unique": 0} + index_infos = ({"name": "col"},) + + SQLite3toMySQL._add_index( + instance, + table_name="demo", + index_type="INDEX", + index=index, + index_columns="`col`", + index_infos=index_infos, + ) + + instance._logger.warning.assert_called_with( + 'Ignoring duplicate key "%s" in table %s!', + "idx_demo", + "demo", + ) + + @pytest.mark.parametrize( + "errno, log_method, message_fragment, expect_raise", + [ + (errorcode.ER_DUP_ENTRY, "warning", "duplicate entry", False), + (errorcode.ER_DUP_FIELDNAME, "warning", "Duplicate field name", False), + (errorcode.ER_TOO_MANY_KEYS, "warning", "Too many keys", False), + (errorcode.ER_TOO_LONG_KEY, "warning", "Key length too long", False), + (errorcode.ER_BAD_FT_COLUMN, "warning", "Retrying without FULLTEXT", True), + ], + ) + def test_add_index_error_handling( + self, + mocker: MockFixture, + errno: int, + log_method: str, + message_fragment: str, + expect_raise: bool, + ) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._mysql_cur = mocker.MagicMock() + instance._mysql_cur.execute.side_effect = mysql.connector.Error(msg="fail", errno=errno) + instance._mysql = mocker.MagicMock() + instance._logger = mocker.MagicMock() + instance._ignore_duplicate_keys = False + + index = {"name": "idx_demo", "unique": 0} + index_infos = ({"name": "col"},) + + call = lambda: SQLite3toMySQL._add_index( # noqa: E731 + instance, + table_name="demo", + index_type="FULLTEXT" if errno == errorcode.ER_BAD_FT_COLUMN else "INDEX", + index=index, + index_columns="`col`", + index_infos=index_infos, + ) + + if expect_raise: + with pytest.raises(mysql.connector.Error): + call() + else: + call() + + log_mock = getattr(instance._logger, log_method) + assert any(message_fragment in args[0] for args, _ in log_mock.call_args_list) + @pytest.mark.parametrize("quiet", [False, True]) def test_add_foreign_keys_error( self, @@ -1207,6 +1623,73 @@ def test_translate_default_for_mysql_sqlglot_requires_expr_support(self): result = instance._translate_default_for_mysql("VARCHAR(32)", "strftime('%Y-%m-%d', 'now', 'utc')") assert result == "strftime('%Y-%m-%d', 'now', 'utc')" + def test_translate_default_for_mysql_preserves_empty_string(self) -> None: + instance = self._mk(expr=True, ts_dt=True, fsp=True) + assert instance._translate_default_for_mysql("VARCHAR(10)", " ") == "" + + def test_translate_default_for_mysql_preserves_null_literal(self) -> None: + instance = self._mk(expr=True, ts_dt=True, fsp=True) + assert instance._translate_default_for_mysql("INTEGER", "NULL") == "NULL" + + def test_translate_default_for_mysql_timestamp_handles_invalid_precision_and_utc(self) -> None: + instance = self._mk(expr=False, ts_dt=True, fsp=True) + + class FakeMatch: + def group(self, *_: t.Any) -> str: + return "()" + + instance.COLUMN_LENGTH_PATTERN = types.SimpleNamespace(search=lambda _: FakeMatch()) # type: ignore[attr-defined] + result = instance._translate_default_for_mysql("TIMESTAMP(foo)", "datetime('now','utc')") + assert result == "UTC_TIMESTAMP" + + def test_translate_default_for_mysql_datetime_without_support_returns_empty(self) -> None: + instance = self._mk(expr=False, ts_dt=False, fsp=True) + assert instance._translate_default_for_mysql("DATETIME(6)", "CURRENT_TIMESTAMP") == "" + + def test_translate_default_for_mysql_datetime_utc_handles_invalid_precision(self) -> None: + instance = self._mk(expr=False, ts_dt=True, fsp=True) + + class FakeMatch: + def group(self, *_: t.Any) -> str: + return "()" + + instance.COLUMN_LENGTH_PATTERN = types.SimpleNamespace(search=lambda _: FakeMatch()) # type: ignore[attr-defined] + result = instance._translate_default_for_mysql("DATETIME(foo)", "datetime('now','utc')") + assert result == "UTC_TIMESTAMP" + + def test_translate_default_for_mysql_time_utc_handles_invalid_precision(self) -> None: + instance = self._mk(expr=True, ts_dt=False, fsp=True) + + class FakeMatch: + def group(self, *_: t.Any) -> str: + return "()" + + instance.COLUMN_LENGTH_PATTERN = types.SimpleNamespace(search=lambda _: FakeMatch()) # type: ignore[attr-defined] + result = instance._translate_default_for_mysql("TIME(foo)", "time('now','utc')") + assert result == "UTC_TIME" + + def test_translate_default_for_mysql_sqlglot_parse_error_returns_original(self, mocker: MockerFixture) -> None: + instance = self._mk(expr=True, ts_dt=True, fsp=True) + mocker.patch( + "sqlite3_to_mysql.transporter.sqlglot.parse_one", + side_effect=sqlglot_errors.ParseError("boom", "expr"), + ) + + original = "json_extract(payload, '$.foo')" + assert instance._translate_default_for_mysql("VARCHAR(255)", original) == original + + def test_translate_default_for_mysql_sqlglot_render_error_returns_original(self, mocker: MockerFixture) -> None: + instance = self._mk(expr=True, ts_dt=True, fsp=True) + + fake_expression = mocker.MagicMock() + fake_expression.transform.side_effect = lambda fn: fake_expression + fake_expression.sql.side_effect = sqlglot_errors.SqlglotError("render fail") + + mocker.patch("sqlite3_to_mysql.transporter.sqlglot.parse_one", return_value=fake_expression) + + original = "(SELECT 1)" + assert instance._translate_default_for_mysql("VARCHAR(255)", original) == "SELECT 1" + def test_translate_type_from_sqlite_to_mysql_sqlglot_normalizes_spacing( self, sqlite_database: str,