In [1]:
import os

os.system("pgloader")

pgloader [ option ... ] command-file ...
pgloader [ option ... ] SOURCE TARGET
  --help -h                       boolean  Show usage and exit.
  --version -V                    boolean  Displays pgloader version and exit.
  --quiet -q                      boolean  Be quiet
  --verbose -v                    boolean  Be verbose
  --debug -d                      boolean  Display debug level information.
  --log-min-messages              string   Filter logs seen in the logfile (default: "notice")
  --summary -S                    string   Filename where to copy the summary
  --root-dir -D                   string   Output root directory. (default: #P"/tmp/pgloader/")
  --upgrade-config -U             boolean  Output the command(s) corresponding to .conf file for
                                           v2.x
  --list-encodings -E             boolean  List pgloader known encodings and exit.
  --logfile -L                    string   Filename where to send the logs.
  --load-lisp-file -l  

512

In [2]:
def detect_sql_dump_type(file_path: str) -> str:
    """Detect if an SQL dump is from MySQL or PostgreSQL."""
    mysql_keywords = {"ENGINE=", "AUTO_INCREMENT", "UNLOCK TABLES", "LOCK TABLES", "CHARSET="}
    postgres_keywords = {"SET search_path", "SERIAL PRIMARY KEY", "RETURNING", "BIGSERIAL", "NOW()"}

    try:
        with open(file_path, "r", encoding="utf-8", errors="ignore") as file:
            for line in file:
                line = line.strip().upper()
                if any(keyword in line for keyword in mysql_keywords):
                    return "MySQL"
                if any(keyword in line for keyword in postgres_keywords):
                    return "PostgreSQL"
        return "Unknown"
    except Exception as e:
        return f"Error reading file: {e}"

In [209]:
world_sql = "./data/world.sql"
recommender_sql = "./data/recommender.sql"

sql_db = detect_sql_dump_type(world_sql)
if sql_db == "MySQL":
    print("MySQL dump detected")
elif sql_db == "PostgreSQL":
    print("PostgreSQL dump detected")

sql_db = detect_sql_dump_type(recommender_sql)
if sql_db == "MySQL":
    print("MySQL dump detected")
elif sql_db == "PostgreSQL":
    print("PostgreSQL dump detected")

MySQL dump detected
PostgreSQL dump detected


In [74]:
!pip install mysql-connector-python psycopg2-binary


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m25.0.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [210]:
database_id = "unique_chat"

"""
https://stackoverflow.com/a/77842747
Due to the errors with pgloader from apt-get, install it manually from the source.
Then move, mv ./build/bin/pgloader to /usr/bin/pgloader to make executeable runable
"""

class PostgresMigration():
    def __init__(self, host, port, user, password, db):
        self.mysql_host = host
        self.mysql_user = user
        self.mysql_password = password
        self.mysql_database = db
        self.mysql_port = port
        self.mysql_db = db

    def migrate_mysql_to_pg(self, pg_host, pg_port, pg_user, pg_password, pg_db, **kwargs):
        try:
            conn = psycopg2.connect(
                host=pg_host,
                port=pg_port,
                user=pg_user,
                password=pg_password,
                dbname="postgres",
                **kwargs
            )
            conn.autocommit = True
            cursor = conn.cursor()

            cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{pg_db}';")
            if not cursor.fetchone():
                cursor.execute(f"CREATE DATABASE {pg_db};")
                print(f"Database '{pg_db}' created successfully.")
        except psycopg2.Error as e:
            print(f"PostgreSQL Error: {e}")
        except Exception as e:
            print(f"General Error: {e}")

        try:
            mysql_url = f"mysql://{self.mysql_user}:{self.mysql_password}@{self.mysql_host}:{self.mysql_port}/{self.mysql_db}"
            pg_url = f"pgsql://{pg_user}:{pg_password}@{pg_host}:{pg_port}/{database_id}"
            os.system(f"pgloader {mysql_url} {pg_url}")
        except Exception as e:
            print(f"General Error: {e}")

In [211]:
import mysql.connector

def load_mysql_dump(host, port, user, password, db, dump_file, **kwargs):
    try:
        conn = mysql.connector.connect(
            host=host,
            user=user,
            password=password,
            port=port,
            **kwargs
        )
        cursor = conn.cursor()

        cursor.execute(f"CREATE DATABASE IF NOT EXISTS {db};")
        print(f"Database '{db}' checked/created successfully.")

        conn.database = db
        conn.autocommit = True

        with open(dump_file, "r", encoding="utf-8") as file:
            sql_script = file.read()

        sql_script = re.sub(r"(?i)CREATE DATABASE.*?;", "", sql_script)
        sql_script = re.sub(r"(?i)USE\s+\S+;", "", sql_script)

        for statement in sql_script.split(";"):
            statement = statement.strip()
            if statement:
                cursor.execute(statement)
        print("SQL dump loaded successfully.")
    except mysql.connector.Error as e:
        print(f"MySQL Error: {e}")
    except Exception as e:
        print(f"General Error: {e}")
    finally:
        cursor.close()
        conn.close()

import re
import psycopg2

def load_pgsql_dump(host, port, user, password, db, dump_file, **kwargs):
    try:
        conn = psycopg2.connect(
            host=pg_host,
            port=pg_port,
            user=pg_user,
            password=pg_password,
            dbname="postgres",
            **kwargs
        )
        conn.autocommit = True
        cursor = conn.cursor()

        # Create the database if it doesn't exist
        cursor.execute(f"SELECT 1 FROM pg_database WHERE datname = '{db}';")
        if not cursor.fetchone():
            cursor.execute(f"CREATE DATABASE {db};")
            print(f"Database '{db}' created successfully.")
        cursor.close()
        conn.close()
    except psycopg2.Error as e:
        print(f"PostgreSQL Error: {e}")
    finally:
        os.system(f"PGPASSWORD={password} psql -U {user}  -h {host} -p {port} -d {db} < {dump_file}")

In [212]:
mysql_host = "localhost"
mysql_user = "root"
mysql_password = "password"
mysql_port = 3306

pg_host = "127.0.0.1"
pg_port = 5432
pg_user = "postgres"
pg_password = "password"

def load_dump_to_database(sql_dump_file, db_name="TWICE"):
    db = detect_sql_dump_type(sql_dump_file)
    if db == "MySQL":
        print("MySQL dump detected")
        load_mysql_dump(mysql_host, mysql_port, mysql_user, mysql_password, db_name, sql_dump_file)
        migration = PostgresMigration(mysql_host, mysql_port, mysql_user, mysql_password, db_name,)
        migration.migrate_mysql_to_pg(pg_host, pg_port, pg_user, pg_password, db_name)
    elif db == "PostgreSQL":
        print("PostgreSQL dump detected")
        load_pgsql_dump(pg_host, pg_port, pg_user, pg_password,db_name, sql_dump_file)

In [215]:
"""
Test for MySql/MariaDB Dump for fist:
- saving into a MySql database
- transfering the Dump to a PostgreSQL database with 'pgloader'
"""
import os

world_sql = os.path.join(os.getcwd(), 'data', 'world.sql')
load_dump_to_database(world_sql, db_name="world_test")

MySQL dump detected
Database 'world_test' checked/created successfully.
SQL dump loaded successfully.
2025-03-25T23:18:49.008001+01:00 LOG pgloader version "3.6.70f3557"
2025-03-25T23:18:49.062002+01:00 LOG Migrating from #<MYSQL-CONNECTION mysql://root@localhost:3306/world_test {1005C79613}>
2025-03-25T23:18:49.062002+01:00 LOG Migrating into #<PGSQL-CONNECTION pgsql://postgres@127.0.0.1:5432/unique_chat {1005F6FD83}>
2025-03-25T23:18:49.405012+01:00 LOG report summary reset
                table name     errors       rows      bytes      total time
--------------------------  ---------  ---------  ---------  --------------
           fetch meta data          0         10                     0.044s
            Create Schemas          0          0                     0.000s
          Create SQL Types          0          2                     0.008s
             Create tables          0          6                     0.008s
            Set Table OIDs          0          3               

In [216]:
"""
Test for PostgreSQL Dump for loading dump into Postgres database
It simply uses the psql executable.
!!! postgresql-client is mandatory
"""
import os

recommender_sql_file = 'recommender.sql'
path_to_dump = os.path.join(os.getcwd(), 'data', recommender_sql_file)
load_dump_to_database(path_to_dump, db_name="twice_once_super_test")

PostgreSQL dump detected
SET
SET
SET
SET
SET
 set_config 
------------
 
(1 row)

SET
SET
SET
SET
ALTER SCHEMA
COMMENT
SET
SET


ERROR:  schema "public" already exists
ERROR:  relation "artist" already exists
ERROR:  role "vuminhle" does not exist
ERROR:  relation "audio_features" already exists
ERROR:  role "vuminhle" does not exist
ERROR:  relation "track" already exists
ERROR:  role "vuminhle" does not exist
ERROR:  duplicate key value violates unique constraint "artist_pkey"
DETAIL:  Key (artist_uri)=(spotify:artist:4Kxlr1PRlDKEB0ekOCyHgX) already exists.
CONTEXT:  COPY artist, line 1
ERROR:  duplicate key value violates unique constraint "audio_features_pkey"
DETAIL:  Key (track_uri)=(spotify:track:4LOLvDtzykDC7y9WehFoOi) already exists.
CONTEXT:  COPY audio_features, line 1
ERROR:  duplicate key value violates unique constraint "track_pkey"
DETAIL:  Key (track_uri)=(spotify:track:4LOLvDtzykDC7y9WehFoOi) already exists.
CONTEXT:  COPY track, line 1
ERROR:  multiple primary keys for table "artist" are not allowed
ERROR:  multiple primary keys for table "audio_features" are not allowed
ERROR:  multiple primar