Skip to content
11 changes: 11 additions & 0 deletions src/sqlite3_to_mysql/sqlite_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,14 @@ def check_sqlite_table_xinfo_support(version_string: str) -> bool:
"""Check for SQLite table_xinfo support."""
sqlite_version: Version = version.parse(version_string)
return sqlite_version.major > 3 or (sqlite_version.major == 3 and sqlite_version.minor >= 26)


def check_sqlite_jsonb_support(version_string: str) -> bool:
"""Check for SQLite JSONB support."""
sqlite_version: Version = version.parse(version_string)
return sqlite_version.major > 3 or (sqlite_version.major == 3 and sqlite_version.minor >= 45)


def sqlite_jsonb_column_expression(quoted_column_name: str) -> str:
"""Return a SELECT expression that converts JSONB blobs to textual JSON while preserving NULLs."""
return 'CASE WHEN "{name}" IS NULL THEN NULL ELSE json("{name}") END AS "{name}"'.format(name=quoted_column_name)
42 changes: 41 additions & 1 deletion src/sqlite3_to_mysql/transporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,12 @@
from sqlite3_to_mysql.sqlite_utils import (
adapt_decimal,
adapt_timedelta,
check_sqlite_jsonb_support,
check_sqlite_table_xinfo_support,
convert_date,
convert_decimal,
convert_timedelta,
sqlite_jsonb_column_expression,
unicase_compare,
)

Expand Down Expand Up @@ -188,6 +190,7 @@ def __init__(self, **kwargs: Unpack[SQLite3toMySQLParams]):

self._sqlite_version = self._get_sqlite_version()
self._sqlite_table_xinfo_support = check_sqlite_table_xinfo_support(self._sqlite_version)
self._sqlite_jsonb_support = check_sqlite_jsonb_support(self._sqlite_version)

self._mysql_create_tables = bool(kwargs.get("mysql_create_tables", True))
self._mysql_transfer_data = bool(kwargs.get("mysql_transfer_data", True))
Expand Down Expand Up @@ -304,6 +307,13 @@ def _get_table_info(self, table_name: str) -> t.List[t.Dict[str, t.Any]]:
self._sqlite_cur.execute(f'PRAGMA {pragma}("{quoted_table_name}")')
return [dict(row) for row in self._sqlite_cur.fetchall()]

@staticmethod
def _declared_type_is_jsonb(column_type: t.Optional[str]) -> bool:
"""Return True when a SQLite column is declared as JSONB."""
if not column_type:
return False
return column_type.strip().upper().startswith("JSONB")

def _get_table_primary_key_columns(self, table_name: str) -> t.List[str]:
"""Return visible primary key columns ordered by their PK sequence."""
primary_key_rows: t.List[t.Dict[str, t.Any]] = sorted(
Expand Down Expand Up @@ -516,6 +526,8 @@ def _translate_type_from_sqlite_to_mysql_legacy(self, column_type: str) -> str:
return "TINYINT(1)"
if data_type.startswith(("REAL", "DOUBLE", "FLOAT", "DECIMAL", "DEC", "FIXED")):
return full_column_type
if data_type == "JSONB" or data_type.startswith("JSONB"):
return "JSON" if self._mysql_json_support else self._mysql_text_type
if data_type not in MYSQL_COLUMN_TYPES:
return self._mysql_string_type
return full_column_type
Expand Down Expand Up @@ -1323,8 +1335,36 @@ def transfer(self) -> None:
"view" if object_type == "view" else "table",
table_name,
)
table_column_info: t.List[t.Dict[str, t.Any]] = self._get_table_info(table_name)
visible_columns: t.List[t.Dict[str, t.Any]] = [
column for column in table_column_info if column.get("hidden", 0) != 1
]
jsonb_columns: t.Set[str]
if self._sqlite_jsonb_support:
jsonb_columns = {
str(column["name"])
for column in visible_columns
if column.get("name") and self._declared_type_is_jsonb(column.get("type"))
}
else:
jsonb_columns = set()

select_parts: t.List[str] = []
if transfer_rowid:
select_list: str = 'rowid as "rowid", *'
select_parts.append('rowid AS "rowid"')

for column in visible_columns:
column_name: t.Optional[str] = column.get("name")
if not column_name:
continue
quoted_column: str = self._sqlite_quote_ident(column_name)
if column_name in jsonb_columns:
select_parts.append(sqlite_jsonb_column_expression(quoted_column))
else:
select_parts.append(f'"{quoted_column}"')

if select_parts:
select_list = ", ".join(select_parts)
else:
select_list = "*"
self._sqlite_cur.execute(f'SELECT {select_list} FROM "{quoted_table_name}"')
Expand Down
154 changes: 153 additions & 1 deletion tests/unit/sqlite3_to_mysql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from tests.conftest import MySQLCredentials


SQLITE_SUPPORTS_JSONB: bool = sqlite3.sqlite_version_info >= (3, 45, 0)


def test_cli_sqlite_views_flag_propagates(
cli_runner: CliRunner,
sqlite_database: str,
Expand Down Expand Up @@ -398,9 +401,35 @@ def _make_transfer_stub(mocker: MockFixture) -> SQLite3toMySQL:
instance._translate_sqlite_view_definition = mocker.MagicMock(return_value="CREATE VIEW translated AS SELECT 1")
instance._sqlite_cur.fetchall.return_value = []
instance._sqlite_cur.execute.return_value = None
instance._get_table_info = mocker.MagicMock(
return_value=[
{"name": "c1", "type": "TEXT", "hidden": 0},
]
)
instance._sqlite_jsonb_support = True
return instance


class RecordingMySQLCursor:
def __init__(self) -> None:
self.executed_sql: t.List[str] = []
self.inserted_batches: t.List[t.List[t.Tuple[t.Any, ...]]] = []

def execute(self, sql: str, params: t.Optional[t.Tuple[t.Any, ...]] = None) -> None:
del params
self.executed_sql.append(sql)

def fetchall(self) -> t.List[t.Any]:
return []

def fetchone(self) -> t.Optional[t.Any]:
return None

def executemany(self, sql: str, rows: t.Iterable[t.Tuple[t.Any, ...]]) -> None:
self.executed_sql.append(sql)
self.inserted_batches.append([tuple(row) for row in rows])


def test_transfer_creates_mysql_views(mocker: MockFixture) -> None:
instance = _make_transfer_stub(mocker)

Expand Down Expand Up @@ -472,7 +501,112 @@ def execute_side_effect(sql, *params):

executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list]
assert 'SELECT COUNT(*) AS total_records FROM "tbl""quote"' in executed_sqls
assert 'SELECT * FROM "tbl""quote"' in executed_sqls
assert 'SELECT "c1" FROM "tbl""quote"' in executed_sqls


def test_transfer_selects_jsonb_columns_via_json_function(mocker: MockFixture) -> None:
instance = _make_transfer_stub(mocker)
instance._mysql_transfer_data = True
instance._sqlite_cur.fetchone.return_value = {"total_records": 1}
instance._sqlite_cur.fetchall.return_value = [(1, b"blob")]
instance._get_table_info.return_value = [
{"name": "id", "type": "INTEGER", "hidden": 0},
{"name": "payload", "type": "JSONB", "hidden": 0},
]

def execute_side_effect(sql, *params):
del params
if 'json("payload")' in sql:
instance._sqlite_cur.description = [("id",), ("payload",)]
return None

instance._sqlite_cur.execute.side_effect = execute_side_effect
instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "tbl", "type": "table"}], []])

instance.transfer()

executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list]
json_selects = [sql for sql in executed_sqls if 'json("payload")' in sql]
assert json_selects


def test_transfer_leaves_jsonb_columns_when_sqlite_lacks_support(mocker: MockFixture) -> None:
instance = _make_transfer_stub(mocker)
instance._sqlite_jsonb_support = False
instance._mysql_transfer_data = True
instance._sqlite_cur.fetchone.return_value = {"total_records": 1}
instance._sqlite_cur.fetchall.return_value = [(1, b"blob")]
instance._get_table_info.return_value = [
{"name": "id", "type": "INTEGER", "hidden": 0},
{"name": "payload", "type": "JSONB", "hidden": 0},
]

def execute_side_effect(sql, *params):
del params
if sql.startswith("SELECT ") and "FROM" in sql and "COUNT" not in sql.upper():
instance._sqlite_cur.description = [("id",), ("payload",)]
return None

instance._sqlite_cur.execute.side_effect = execute_side_effect
instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "tbl", "type": "table"}], []])

instance.transfer()

executed_sqls = [call.args[0] for call in instance._sqlite_cur.execute.call_args_list]
assert all('json("payload")' not in sql for sql in executed_sqls)


@pytest.mark.skipif(not SQLITE_SUPPORTS_JSONB, reason="SQLite 3.45+ required for JSONB tests")
def test_transfer_converts_jsonb_values_to_textual_json(mocker: MockFixture) -> None:
sqlite_connection = sqlite3.connect(":memory:", detect_types=sqlite3.PARSE_DECLTYPES)
sqlite_connection.row_factory = sqlite3.Row
sqlite_cursor = sqlite_connection.cursor()
sqlite_cursor.execute("CREATE TABLE data (id INTEGER PRIMARY KEY, payload JSONB)")
sqlite_cursor.execute("INSERT INTO data(payload) VALUES (jsonb(?))", ('{"foo":"bar"}',))
sqlite_cursor.execute("INSERT INTO data(payload) VALUES (NULL)")
sqlite_connection.commit()

instance = SQLite3toMySQL.__new__(SQLite3toMySQL)
instance._sqlite = sqlite_connection
instance._sqlite_cur = sqlite_connection.cursor()
instance._sqlite_tables = tuple()
instance._exclude_sqlite_tables = tuple()
instance._sqlite_views_as_tables = False
instance._sqlite_table_xinfo_support = True
instance._sqlite_jsonb_support = SQLITE_SUPPORTS_JSONB
instance._mysql_create_tables = False
instance._mysql_transfer_data = True
instance._mysql_truncate_tables = False
instance._mysql_insert_method = "IGNORE"
instance._mysql_version = "8.0.32"
instance._without_foreign_keys = True
instance._use_fulltext = False
instance._mysql_fulltext_support = False
instance._with_rowid = False
instance._chunk_size = None
instance._quiet = True
instance._mysql_charset = "utf8mb4"
instance._mysql_collation = "utf8mb4_unicode_ci"
instance._mysql_cur = RecordingMySQLCursor()
instance._mysql = mocker.MagicMock()
instance._mysql.commit = mocker.MagicMock()
instance._logger = mocker.MagicMock()
instance._create_table = mocker.MagicMock()
instance._truncate_table = mocker.MagicMock()
instance._add_indices = mocker.MagicMock()
instance._add_foreign_keys = mocker.MagicMock()
instance._create_mysql_view = mocker.MagicMock()
instance._translate_sqlite_view_definition = mocker.MagicMock()
instance._sqlite_table_has_rowid = lambda _table: False
instance._fetch_sqlite_master_rows = mocker.MagicMock(side_effect=[[{"name": "data", "type": "table"}], []])

instance.transfer()

assert instance._mysql_cur.inserted_batches, "expected captured MySQL inserts"
inserted_rows = instance._mysql_cur.inserted_batches[0]
payload_by_id = {row[0]: row[1] for row in inserted_rows}
assert payload_by_id[1] == '{"foo":"bar"}'
assert payload_by_id[2] is None


def test_translate_sqlite_view_definition_strftime_weekday() -> None:
Expand Down Expand Up @@ -554,6 +688,24 @@ def test_transfer_table_data_with_chunking(mocker: MockFixture) -> None:
instance._mysql.commit.assert_called_once()


@pytest.mark.parametrize(
"json_support,expected",
[
(True, "JSON"),
(False, "TEXT"),
],
)
def test_translate_type_from_sqlite_maps_jsonb_to_json(json_support: bool, expected: str) -> None:
instance = SQLite3toMySQL.__new__(SQLite3toMySQL)
instance._mysql_text_type = "TEXT"
instance._mysql_string_type = "VARCHAR(255)"
instance._mysql_integer_type = "INT"
instance._mysql_json_support = json_support

assert instance._translate_type_from_sqlite_to_mysql("JSONB") == expected
assert instance._translate_type_from_sqlite_to_mysql("jsonb(16)") == expected


@pytest.mark.usefixtures("sqlite_database", "mysql_instance")
class TestSQLite3toMySQL:
@pytest.mark.parametrize("quiet", [False, True])
Expand Down
Loading