diff --git a/src/sqlite3_to_mysql/sqlite_utils.py b/src/sqlite3_to_mysql/sqlite_utils.py index 0a2a960..a79b977 100644 --- a/src/sqlite3_to_mysql/sqlite_utils.py +++ b/src/sqlite3_to_mysql/sqlite_utils.py @@ -53,3 +53,14 @@ def check_sqlite_table_xinfo_support(version_string: str) -> bool: """Check for SQLite table_xinfo support.""" sqlite_version: Version = version.parse(version_string) return sqlite_version.major > 3 or (sqlite_version.major == 3 and sqlite_version.minor >= 26) + + +def check_sqlite_jsonb_support(version_string: str) -> bool: + """Check for SQLite JSONB support.""" + sqlite_version: Version = version.parse(version_string) + return sqlite_version.major > 3 or (sqlite_version.major == 3 and sqlite_version.minor >= 45) + + +def sqlite_jsonb_column_expression(quoted_column_name: str) -> str: + """Return a SELECT expression that converts JSONB blobs to textual JSON while preserving NULLs.""" + return 'CASE WHEN "{name}" IS NULL THEN NULL ELSE json("{name}") END AS "{name}"'.format(name=quoted_column_name) diff --git a/src/sqlite3_to_mysql/transporter.py b/src/sqlite3_to_mysql/transporter.py index 3c80029..7635669 100644 --- a/src/sqlite3_to_mysql/transporter.py +++ b/src/sqlite3_to_mysql/transporter.py @@ -36,10 +36,12 @@ from sqlite3_to_mysql.sqlite_utils import ( adapt_decimal, adapt_timedelta, + check_sqlite_jsonb_support, check_sqlite_table_xinfo_support, convert_date, convert_decimal, convert_timedelta, + sqlite_jsonb_column_expression, unicase_compare, ) @@ -188,6 +190,7 @@ def __init__(self, **kwargs: Unpack[SQLite3toMySQLParams]): self._sqlite_version = self._get_sqlite_version() self._sqlite_table_xinfo_support = check_sqlite_table_xinfo_support(self._sqlite_version) + self._sqlite_jsonb_support = check_sqlite_jsonb_support(self._sqlite_version) self._mysql_create_tables = bool(kwargs.get("mysql_create_tables", True)) self._mysql_transfer_data = bool(kwargs.get("mysql_transfer_data", True)) @@ -304,6 +307,13 @@ def _get_table_info(self, table_name: str) -> t.List[t.Dict[str, t.Any]]: self._sqlite_cur.execute(f'PRAGMA {pragma}("{quoted_table_name}")') return [dict(row) for row in self._sqlite_cur.fetchall()] + @staticmethod + def _declared_type_is_jsonb(column_type: t.Optional[str]) -> bool: + """Return True when a SQLite column is declared as JSONB.""" + if not column_type: + return False + return column_type.strip().upper().startswith("JSONB") + def _get_table_primary_key_columns(self, table_name: str) -> t.List[str]: """Return visible primary key columns ordered by their PK sequence.""" primary_key_rows: t.List[t.Dict[str, t.Any]] = sorted( @@ -516,6 +526,8 @@ def _translate_type_from_sqlite_to_mysql_legacy(self, column_type: str) -> str: return "TINYINT(1)" if data_type.startswith(("REAL", "DOUBLE", "FLOAT", "DECIMAL", "DEC", "FIXED")): return full_column_type + if data_type == "JSONB" or data_type.startswith("JSONB"): + return "JSON" if self._mysql_json_support else self._mysql_text_type if data_type not in MYSQL_COLUMN_TYPES: return self._mysql_string_type return full_column_type @@ -1323,8 +1335,36 @@ def transfer(self) -> None: "view" if object_type == "view" else "table", table_name, ) + table_column_info: t.List[t.Dict[str, t.Any]] = self._get_table_info(table_name) + visible_columns: t.List[t.Dict[str, t.Any]] = [ + column for column in table_column_info if column.get("hidden", 0) != 1 + ] + jsonb_columns: t.Set[str] + if self._sqlite_jsonb_support: + jsonb_columns = { + str(column["name"]) + for column in visible_columns + if column.get("name") and self._declared_type_is_jsonb(column.get("type")) + } + else: + jsonb_columns = set() + + select_parts: t.List[str] = [] if transfer_rowid: - select_list: str = 'rowid as "rowid", *' + select_parts.append('rowid AS "rowid"') + + for column in visible_columns: + column_name: t.Optional[str] = column.get("name") + if not column_name: + continue + quoted_column: str = self._sqlite_quote_ident(column_name) + if column_name in jsonb_columns: + select_parts.append(sqlite_jsonb_column_expression(quoted_column)) + else: + select_parts.append(f'"{quoted_column}"') + + if select_parts: + select_list = ", ".join(select_parts) else: select_list = "*" self._sqlite_cur.execute(f'SELECT {select_list} FROM "{quoted_table_name}"') diff --git a/tests/unit/sqlite3_to_mysql_test.py b/tests/unit/sqlite3_to_mysql_test.py index 0d1af2e..d712a99 100644 --- a/tests/unit/sqlite3_to_mysql_test.py +++ b/tests/unit/sqlite3_to_mysql_test.py @@ -24,6 +24,9 @@ from tests.conftest import MySQLCredentials +SQLITE_SUPPORTS_JSONB: bool = sqlite3.sqlite_version_info >= (3, 45, 0) + + def test_cli_sqlite_views_flag_propagates( cli_runner: CliRunner, sqlite_database: str, @@ -398,9 +401,35 @@ def _make_transfer_stub(mocker: MockFixture) -> SQLite3toMySQL: instance._translate_sqlite_view_definition = mocker.MagicMock(return_value="CREATE VIEW translated AS SELECT 1") instance._sqlite_cur.fetchall.return_value = [] instance._sqlite_cur.execute.return_value = None + instance._get_table_info = mocker.MagicMock( + return_value=[ + {"name": "c1", "type": "TEXT", "hidden": 0}, + ] + ) + instance._sqlite_jsonb_support = True return instance +class RecordingMySQLCursor: + def __init__(self) -> None: + self.executed_sql: t.List[str] = [] + self.inserted_batches: t.List[t.List[t.Tuple[t.Any, ...]]] = [] + + def execute(self, sql: str, params: t.Optional[t.Tuple[t.Any, ...]] = None) -> None: + del params + self.executed_sql.append(sql) + + def fetchall(self) -> t.List[t.Any]: + return [] + + def fetchone(self) -> t.Optional[t.Any]: + return None + + def executemany(self, sql: str, rows: t.Iterable[t.Tuple[t.Any, ...]]) -> None: + self.executed_sql.append(sql) + self.inserted_batches.append([tuple(row) for row in rows]) + + def test_transfer_creates_mysql_views(mocker: MockFixture) -> None: instance = _make_transfer_stub(mocker) @@ -472,7 +501,112 @@ def execute_side_effect(sql, *params): executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list] assert 'SELECT COUNT(*) AS total_records FROM "tbl""quote"' in executed_sqls - assert 'SELECT * FROM "tbl""quote"' in executed_sqls + assert 'SELECT "c1" FROM "tbl""quote"' in executed_sqls + + +def test_transfer_selects_jsonb_columns_via_json_function(mocker: MockFixture) -> None: + instance = _make_transfer_stub(mocker) + instance._mysql_transfer_data = True + instance._sqlite_cur.fetchone.return_value = {"total_records": 1} + instance._sqlite_cur.fetchall.return_value = [(1, b"blob")] + instance._get_table_info.return_value = [ + {"name": "id", "type": "INTEGER", "hidden": 0}, + {"name": "payload", "type": "JSONB", "hidden": 0}, + ] + + def execute_side_effect(sql, *params): + del params + if 'json("payload")' in sql: + instance._sqlite_cur.description = [("id",), ("payload",)] + return None + + instance._sqlite_cur.execute.side_effect = execute_side_effect + instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "tbl", "type": "table"}], []]) + + instance.transfer() + + executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list] + json_selects = [sql for sql in executed_sqls if 'json("payload")' in sql] + assert json_selects + + +def test_transfer_leaves_jsonb_columns_when_sqlite_lacks_support(mocker: MockFixture) -> None: + instance = _make_transfer_stub(mocker) + instance._sqlite_jsonb_support = False + instance._mysql_transfer_data = True + instance._sqlite_cur.fetchone.return_value = {"total_records": 1} + instance._sqlite_cur.fetchall.return_value = [(1, b"blob")] + instance._get_table_info.return_value = [ + {"name": "id", "type": "INTEGER", "hidden": 0}, + {"name": "payload", "type": "JSONB", "hidden": 0}, + ] + + def execute_side_effect(sql, *params): + del params + if sql.startswith("SELECT ") and "FROM" in sql and "COUNT" not in sql.upper(): + instance._sqlite_cur.description = [("id",), ("payload",)] + return None + + instance._sqlite_cur.execute.side_effect = execute_side_effect + instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "tbl", "type": "table"}], []]) + + instance.transfer() + + executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list] + assert all('json("payload")' not in sql for sql in executed_sqls) + + +@pytest.mark.skipif(not SQLITE_SUPPORTS_JSONB, reason="SQLite 3.45+ required for JSONB tests") +def test_transfer_converts_jsonb_values_to_textual_json(mocker: MockFixture) -> None: + sqlite_connection = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES) + sqlite_connection.row_factory = sqlite3.Row + sqlite_cursor = sqlite_connection.cursor() + sqlite_cursor.execute("CREATE TABLE data (id INTEGER PRIMARY KEY, payload JSONB)") + sqlite_cursor.execute("INSERT INTO data(payload) VALUES (jsonb(?))", ('{"foo":"bar"}',)) + sqlite_cursor.execute("INSERT INTO data(payload) VALUES (NULL)") + sqlite_connection.commit() + + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._sqlite = sqlite_connection + instance._sqlite_cur = sqlite_connection.cursor() + instance._sqlite_tables = tuple() + instance._exclude_sqlite_tables = tuple() + instance._sqlite_views_as_tables = False + instance._sqlite_table_xinfo_support = True + instance._sqlite_jsonb_support = SQLITE_SUPPORTS_JSONB + instance._mysql_create_tables = False + instance._mysql_transfer_data = True + instance._mysql_truncate_tables = False + instance._mysql_insert_method = "IGNORE" + instance._mysql_version = "8.0.32" + instance._without_foreign_keys = True + instance._use_fulltext = False + instance._mysql_fulltext_support = False + instance._with_rowid = False + instance._chunk_size = None + instance._quiet = True + instance._mysql_charset = "utf8mb4" + instance._mysql_collation = "utf8mb4_unicode_ci" + instance._mysql_cur = RecordingMySQLCursor() + instance._mysql = mocker.MagicMock() + instance._mysql.commit = mocker.MagicMock() + instance._logger = mocker.MagicMock() + instance._create_table = mocker.MagicMock() + instance._truncate_table = mocker.MagicMock() + instance._add_indices = mocker.MagicMock() + instance._add_foreign_keys = mocker.MagicMock() + instance._create_mysql_view = mocker.MagicMock() + instance._translate_sqlite_view_definition = mocker.MagicMock() + instance._sqlite_table_has_rowid = lambda _table: False + instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "data", "type": "table"}], []]) + + instance.transfer() + + assert instance._mysql_cur.inserted_batches, "expected captured MySQL inserts" + inserted_rows = instance._mysql_cur.inserted_batches[0] + payload_by_id = {row[0]: row[1] for row in inserted_rows} + assert payload_by_id[1] == '{"foo":"bar"}' + assert payload_by_id[2] is None def test_translate_sqlite_view_definition_strftime_weekday() -> None: @@ -554,6 +688,24 @@ def test_transfer_table_data_with_chunking(mocker: MockFixture) -> None: instance._mysql.commit.assert_called_once() +@pytest.mark.parametrize( + "json_support,expected", + [ + (True, "JSON"), + (False, "TEXT"), + ], +) +def test_translate_type_from_sqlite_maps_jsonb_to_json(json_support: bool, expected: str) -> None: + instance = SQLite3toMySQL.__new__(SQLite3toMySQL) + instance._mysql_text_type = "TEXT" + instance._mysql_string_type = "VARCHAR(255)" + instance._mysql_integer_type = "INT" + instance._mysql_json_support = json_support + + assert instance._translate_type_from_sqlite_to_mysql("JSONB") == expected + assert instance._translate_type_from_sqlite_to_mysql("jsonb(16)") == expected + + @pytest.mark.usefixtures("sqlite_database", "mysql_instance") class TestSQLite3toMySQL: @pytest.mark.parametrize("quiet", [False, True])