In [1]:
import pandas as pd
from sqlalchemy import create_engine, text
import os
from datetime import datetime
import numpy as np

In [2]:
# Database connection configuration
DB_CONFIG = {
    'host': 'localhost',
    'user': 'root',
    'password': 'Keklol360**',
    'database': 'diggerz'
}

In [None]:
def create_mysql_engine():
    """Create SQLAlchemy engine for MySQL connection"""
    connection_string = f"mysql+pymysql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}/{DB_CONFIG['database']}"
    return create_engine(connection_string)

def clean_dataframe(df):
    """Enhanced dataframe cleaning function"""
    # Handle string columns
    for col in df.select_dtypes(include=['object']).columns:
        df[col] = df[col].fillna('Unknown')
        if col in ['release_title', 'label_name', 'artist_name', 'track_title']:
            df[col] = df[col].apply(lambda x: str(x).replace("'", "''"))
    
    # Handle numeric columns
    for col in df.select_dtypes(include=['int64', 'float64']).columns:
        df[col] = df[col].fillna(0)
    
    # Handle datetime columns
    date_columns = ['updated_on', 'release_date']
    for col in date_columns:
        if col in df.columns:
            df[col] = pd.to_datetime(df[col], errors='coerce')
            df[col] = df[col].fillna(pd.Timestamp('2020-01-01'))
    
    return df

def create_database_schema(engine):
    """Create database tables"""
    try:
        print("Creating database schema...")
        with engine.connect() as conn:
            # Create ARTIST table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS artist (
                artist_id VARCHAR(255) PRIMARY KEY,
                artist_name VARCHAR(255) NOT NULL,
                updated_on DATETIME NOT NULL
            )
            """))

            # Create RELEASE table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS `release` (
                release_id VARCHAR(255) PRIMARY KEY,
                release_title VARCHAR(255) NOT NULL,
                release_date DATE,
                upc VARCHAR(255),
                popularity INT,
                total_tracks INT,
                album_type VARCHAR(50),
                release_img VARCHAR(255),
                label_name VARCHAR(255),
                updated_on DATETIME NOT NULL
            )
            """))

            # Create TRACK table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS track (
                track_id VARCHAR(255) PRIMARY KEY,
                track_title VARCHAR(255) NOT NULL,
                duration_ms INT,
                isrc VARCHAR(255) UNIQUE,
                track_number INT,
                release_id VARCHAR(255),
                explicit BOOLEAN,
                disc_number INT,
                preview_url TEXT,
                updated_on DATETIME NOT NULL,
                FOREIGN KEY (release_id) REFERENCES `release`(release_id)
            )
            """))

            # Create AUDIO_FEATURES table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS audio_features (
                isrc VARCHAR(255) PRIMARY KEY,
                acousticness FLOAT,
                danceability FLOAT,
                energy FLOAT,
                instrumentalness FLOAT,
                `key` INT,
                liveness FLOAT,
                loudness FLOAT,
                mode INT,
                speechiness FLOAT,
                tempo FLOAT,
                time_signature INT,
                valence FLOAT,
                updated_on DATETIME NOT NULL,
                FOREIGN KEY (isrc) REFERENCES track(isrc)
            )
            """))

            # Create ARTIST_TRACK table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS artist_track (
                track_id VARCHAR(255),
                artist_id VARCHAR(255),
                updated_on DATETIME NOT NULL,
                PRIMARY KEY (track_id, artist_id),
                FOREIGN KEY (track_id) REFERENCES track(track_id),
                FOREIGN KEY (artist_id) REFERENCES artist(artist_id)
            )
            """))

            # Create ARTIST_RELEASE table
            conn.execute(text("""
            CREATE TABLE IF NOT EXISTS artist_release (
                release_id VARCHAR(255),
                artist_id VARCHAR(255),
                updated_on DATETIME NOT NULL,
                PRIMARY KEY (release_id, artist_id),
                FOREIGN KEY (release_id) REFERENCES `release`(release_id),
                FOREIGN KEY (artist_id) REFERENCES artist(artist_id)
            )
            """))
            
            print("Database schema created successfully!")
    except Exception as e:
        print(f"Error creating database schema: {str(e)}")
        raise

In [None]:
"""# At the start of your script
def optimize_mysql_connection(engine):
    with engine.connect() as conn:
        # Disable various checks temporarily
        conn.execute(text("SET UNIQUE_CHECKS=0"))
        conn.execute(text("SET FOREIGN_KEY_CHECKS=0"))
        conn.execute(text("SET SQL_MODE='NO_AUTO_VALUE_ON_ZERO'"))
        
        # Set bulk insert optimizations
        conn.execute(text("SET autocommit=0"))

# At the end of your imports
def restore_mysql_settings(engine):
    with engine.connect() as conn:
        # Re-enable checks
        conn.execute(text("SET UNIQUE_CHECKS=1"))
        conn.execute(text("SET FOREIGN_KEY_CHECKS=1"))
        conn.execute(text("SET SQL_MODE='NO_ENGINE_SUBSTITUTION'"))
        conn.execute(text("SET autocommit=1"))"""

In [23]:
def import_artists(engine):
    """Import artist data with optimized bulk loading and proper parameter binding"""
    try:
        print("Importing artists...")
        df = pd.read_csv('sp_artist.csv')
        df = clean_dataframe(df)
        df['updated_on'] = df['updated_on'].dt.strftime('%Y-%m-%d %H:%M:%S')
        
        chunk_size = 50000
        total_rows = len(df)
        
        with engine.connect() as conn:
            trans = conn.begin()
            try:
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    # Use parameterized query instead of string formatting
                    insert_stmt = text("""
                        INSERT IGNORE INTO artist (artist_id, artist_name, updated_on)
                        VALUES (:artist_id, :artist_name, :updated_on)
                    """)
                    
                    # Convert chunk to list of dictionaries for parameter binding
                    records = chunk.to_dict('records')
                    conn.execute(insert_stmt, records)
                    
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} artists")
                
                trans.commit()
                print("Artists imported successfully")
                
            except Exception as e:
                trans.rollback()
                raise e
                
    except Exception as e:
        print(f"Error importing artists: {str(e)}")
        raise

In [24]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_artists(engine)
        
        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing artists...
Processed 50000 out of 676911 artists
Processed 100000 out of 676911 artists
Processed 150000 out of 676911 artists
Processed 200000 out of 676911 artists
Processed 250000 out of 676911 artists
Processed 300000 out of 676911 artists
Processed 350000 out of 676911 artists
Processed 400000 out of 676911 artists
Processed 450000 out of 676911 artists
Processed 500000 out of 676911 artists
Processed 550000 out of 676911 artists
Processed 600000 out of 676911 artists
Processed 650000 out of 676911 artists
Processed 676911 out of 676911 artists
Artists imported successfully
All data imported successfully!


In [6]:
def import_releases(engine):
    """Import release data with optimized bulk loading and proper parameter binding"""
    try:
        print("Importing releases...")
        df = pd.read_csv('sp_release.csv')
        df = clean_dataframe(df)
        df['updated_on'] = pd.to_datetime(df['updated_on']).dt.strftime('%Y-%m-%d %H:%M:%S')
        
        # Ensure release_date is in correct format
        df['release_date'] = pd.to_datetime(df['release_date']).dt.strftime('%Y-%m-%d')
        
        # Handle potential NaN values in numeric columns
        df['popularity'] = df['popularity'].fillna(0).astype(int)
        df['total_tracks'] = df['total_tracks'].fillna(0).astype(int)
        
        chunk_size = 50000
        total_rows = len(df)
        
        with engine.connect() as conn:
            trans = conn.begin()
            try:
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    # Convert chunk to list of dictionaries and clean data
                    records = []
                    for _, row in chunk.iterrows():
                        record = {
                            'release_id': str(row['release_id']),
                            'release_title': str(row['release_title']).replace("'", "''"),
                            'release_date': row['release_date'],
                            'upc': str(row['upc']) if pd.notna(row['upc']) else None,
                            'popularity': int(row['popularity']),
                            'total_tracks': int(row['total_tracks']),
                            'album_type': str(row['album_type']),
                            'release_img': str(row['release_img']),
                            'label_name': str(row['label_name']).replace("'", "''"),
                            'updated_on': row['updated_on']
                        }
                        records.append(record)
                    
                    # Use parameterized query
                    insert_stmt = text("""
                        INSERT IGNORE INTO `release` 
                        (release_id, release_title, release_date, upc, popularity,
                         total_tracks, album_type, release_img, label_name, updated_on)
                        VALUES 
                        (:release_id, :release_title, :release_date, :upc, :popularity,
                         :total_tracks, :album_type, :release_img, :label_name, :updated_on)
                    """)
                    
                    # Execute with parameter binding
                    conn.execute(insert_stmt, records)
                    
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} releases")
                
                trans.commit()
                print("Releases imported successfully")
                
            except Exception as e:
                trans.rollback()
                print(f"Transaction error: {str(e)}")
                raise e
                
    except Exception as e:
        print(f"Error importing releases: {str(e)}")
        print(f"Error details: {str(e.__cause__)}")
        raise

In [7]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table

        import_releases(engine)

        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing releases...
Processed 50000 out of 713563 releases
Processed 100000 out of 713563 releases
Processed 150000 out of 713563 releases
Processed 200000 out of 713563 releases
Processed 250000 out of 713563 releases
Processed 300000 out of 713563 releases
Processed 350000 out of 713563 releases
Processed 400000 out of 713563 releases
Processed 450000 out of 713563 releases
Processed 500000 out of 713563 releases
Processed 550000 out of 713563 releases
Processed 600000 out of 713563 releases
Processed 650000 out of 713563 releases
Processed 700000 out of 713563 releases
Processed 713563 out of 713563 releases
Releases imported successfully
All data imported successfully!


In [4]:
def import_tracks(engine):
    """Import track data with parameter binding"""
    try:
        print("Importing tracks...")
        df = pd.read_csv('sp_track.csv')
        df = clean_dataframe(df)
        df['updated_on'] = pd.to_datetime(df['updated_on']).dt.strftime('%Y-%m-%d %H:%M:%S')
        df['explicit'] = df['explicit'].map({'f': False, 't': True})
        
        chunk_size = 10000
        total_rows = len(df)
        
        with engine.connect() as conn:
            trans = conn.begin()
            try:
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    records = [{
                        'track_id': str(row['track_id']),
                        'track_title': str(row['track_title']).replace("'", "''"),
                        'duration_ms': int(row['duration_ms']),
                        'isrc': str(row['isrc']),
                        'track_number': int(row['track_number']),
                        'release_id': str(row['release_id']),
                        'explicit': bool(row['explicit']),
                        'disc_number': int(row['disc_number']),
                        'preview_url': str(row['preview_url']),
                        'updated_on': row['updated_on']
                    } for _, row in chunk.iterrows()]
                    
                    insert_stmt = text("""
                        INSERT IGNORE INTO track 
                        (track_id, track_title, duration_ms, isrc, track_number,
                         release_id, explicit, disc_number, preview_url, updated_on)
                        VALUES 
                        (:track_id, :track_title, :duration_ms, :isrc, :track_number,
                         :release_id, :explicit, :disc_number, :preview_url, :updated_on)
                    """)
                    
                    conn.execute(insert_stmt, records)
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} tracks")
                
                trans.commit()
                print("Tracks imported successfully")
                
            except Exception as e:
                trans.rollback()
                raise e
                
    except Exception as e:
        print(f"Error importing tracks: {str(e)}")
        raise


In [5]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_tracks(engine)

        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing tracks...
Processed 10000 out of 5777707 tracks
Processed 20000 out of 5777707 tracks
Processed 30000 out of 5777707 tracks
Processed 40000 out of 5777707 tracks
Processed 50000 out of 5777707 tracks
Processed 60000 out of 5777707 tracks
Processed 70000 out of 5777707 tracks
Processed 80000 out of 5777707 tracks
Processed 90000 out of 5777707 tracks
Processed 100000 out of 5777707 tracks
Processed 110000 out of 5777707 tracks
Processed 120000 out of 5777707 tracks
Processed 130000 out of 5777707 tracks
Processed 140000 out of 5777707 tracks
Processed 150000 out of 5777707 tracks
Processed 160000 out of 5777707 tracks
Processed 170000 out of 5777707 tracks
Processed 180000 out of 5777707 tracks
Processed 190000 out of 5777707 tracks
Processed 200000 out of 5777707 tracks
Processed 210000 out of 5777707 tracks
Processed 220000 out of 5777707 tracks
Processed 230000 out of 5777707 tracks
Processed 240000 out of 5777707 tracks
Processed 250000 out of 5777707 tracks
Processed 2600

In [6]:
def import_audio_features(engine):
    """Import audio features data with parameter binding"""
    try:
        print("Importing audio features...")
        df = pd.read_csv('audio_features.csv')
        df = clean_dataframe(df)
        df['updated_on'] = pd.to_datetime(df['updated_on']).dt.strftime('%Y-%m-%d %H:%M:%S')
        
        chunk_size = 1000
        total_rows = len(df)
        
        with engine.connect() as conn:
            trans = conn.begin()
            try:
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    records = [{
                        'isrc': str(row['isrc']),
                        'acousticness': float(row['acousticness']),
                        'danceability': float(row['danceability']),
                        'energy': float(row['energy']),
                        'instrumentalness': float(row['instrumentalness']),
                        'key': int(row['key']),
                        'liveness': float(row['liveness']),
                        'loudness': float(row['loudness']),
                        'mode': int(row['mode']),
                        'speechiness': float(row['speechiness']),
                        'tempo': float(row['tempo']),
                        'time_signature': int(row['time_signature']),
                        'valence': float(row['valence']),
                        'updated_on': row['updated_on']
                    } for _, row in chunk.iterrows()]
                    
                    insert_stmt = text("""
                        INSERT IGNORE INTO audio_features 
                        (isrc, acousticness, danceability, energy, instrumentalness,
                         `key`, liveness, loudness, mode, speechiness, tempo,
                         time_signature, valence, updated_on)
                        VALUES 
                        (:isrc, :acousticness, :danceability, :energy, :instrumentalness,
                         :key, :liveness, :loudness, :mode, :speechiness, :tempo,
                         :time_signature, :valence, :updated_on)
                    """)
                    
                    conn.execute(insert_stmt, records)
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} audio features")
                
                trans.commit()
                print("Audio features imported successfully")
                
            except Exception as e:
                trans.rollback()
                raise e
                
    except Exception as e:
        print(f"Error importing audio features: {str(e)}")
        raise

In [7]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_audio_features(engine)

        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing audio features...
Processed 1000 out of 4687104 audio features
Processed 2000 out of 4687104 audio features
Processed 3000 out of 4687104 audio features
Processed 4000 out of 4687104 audio features
Processed 5000 out of 4687104 audio features
Processed 6000 out of 4687104 audio features
Processed 7000 out of 4687104 audio features
Processed 8000 out of 4687104 audio features
Processed 9000 out of 4687104 audio features
Processed 10000 out of 4687104 audio features
Processed 11000 out of 4687104 audio features
Processed 12000 out of 4687104 audio features
Processed 13000 out of 4687104 audio features
Processed 14000 out of 4687104 audio features
Processed 15000 out of 4687104 audio features
Processed 16000 out of 4687104 audio features
Processed 17000 out of 4687104 audio features
Processed 18000 out of 4687104 audio features
Processed 19000 out of 4687104 audio features
Processed 20000 out of 4687104 audio features
Processed 21000 out of 4687104 audio features
Processed 22000

In [12]:
def import_artist_tracks(engine):
    """Import artist-track relationships with parameter binding"""
    try:
        print("Importing artist-track relationships...")
        df = pd.read_csv('sp_artist_track.csv')
        df = clean_dataframe(df)
        df['updated_on'] = pd.to_datetime(df['updated_on']).dt.strftime('%Y-%m-%d %H:%M:%S')
        
        chunk_size = 1000
        total_rows = len(df)
        
        # First connection to disable foreign key checks
        with engine.connect() as conn1:
            conn1.execute(text("SET FOREIGN_KEY_CHECKS=0"))
        
        # Second connection for data import
        with engine.connect() as conn2:
            with conn2.begin():
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    records = [{
                        'track_id': str(row['track_id']),
                        'artist_id': str(row['artist_id']),
                        'updated_on': row['updated_on']
                    } for _, row in chunk.iterrows()]
                    
                    insert_stmt = text("""
                        INSERT IGNORE INTO artist_track (track_id, artist_id, updated_on)
                        VALUES (:track_id, :artist_id, :updated_on)
                    """)
                    
                    conn2.execute(insert_stmt, records)
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} artist-track relationships")
        
        # Third connection to re-enable foreign key checks
        with engine.connect() as conn3:
            conn3.execute(text("SET FOREIGN_KEY_CHECKS=1"))
            
        print("Artist-track relationships imported successfully")
                
    except Exception as e:
        print(f"Error importing artist-track relationships: {str(e)}")
        raise

In [13]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_artist_tracks(engine)

        
        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing artist-track relationships...
Processed 1000 out of 7716591 artist-track relationships
Processed 2000 out of 7716591 artist-track relationships
Processed 3000 out of 7716591 artist-track relationships
Processed 4000 out of 7716591 artist-track relationships
Processed 5000 out of 7716591 artist-track relationships
Processed 6000 out of 7716591 artist-track relationships
Processed 7000 out of 7716591 artist-track relationships
Processed 8000 out of 7716591 artist-track relationships
Processed 9000 out of 7716591 artist-track relationships
Processed 10000 out of 7716591 artist-track relationships
Processed 11000 out of 7716591 artist-track relationships
Processed 12000 out of 7716591 artist-track relationships
Processed 13000 out of 7716591 artist-track relationships
Processed 14000 out of 7716591 artist-track relationships
Processed 15000 out of 7716591 artist-track relationships
Processed 16000 out of 7716591 artist-track relationships
Processed 17000 out of 7716591 artist-tra

In [17]:
def import_artist_releases(engine):
    """Import artist-release relationships with parameter binding"""
    try:
        print("Importing artist-release relationships...")
        df = pd.read_csv('sp_artist_release.csv')
        df = clean_dataframe(df)
        df['updated_on'] = pd.to_datetime(df['updated_on']).dt.strftime('%Y-%m-%d %H:%M:%S')
        
        chunk_size = 10000
        total_rows = len(df)
        
        # First connection to disable foreign key checks
        with engine.connect() as conn1:
            conn1.execute(text("SET FOREIGN_KEY_CHECKS=0"))
        
        # Second connection for data import
        with engine.connect() as conn2:
            with conn2.begin():
                for i in range(0, total_rows, chunk_size):
                    chunk = df.iloc[i:i + chunk_size]
                    
                    records = [{
                        'release_id': str(row['release_id']),
                        'artist_id': str(row['artist_id']),
                        'updated_on': row['updated_on']
                    } for _, row in chunk.iterrows()]
                    
                    insert_stmt = text("""
                        INSERT IGNORE INTO artist_release (release_id, artist_id, updated_on)
                        VALUES (:release_id, :artist_id, :updated_on)
                    """)
                    
                    conn2.execute(insert_stmt, records)
                    print(f"Processed {min(i + chunk_size, total_rows)} out of {total_rows} artist-release relationships")
        
        # Third connection to re-enable foreign key checks
        with engine.connect() as conn3:
            conn3.execute(text("SET FOREIGN_KEY_CHECKS=1"))
            
        print("Artist-release relationships imported successfully")
                
    except Exception as e:
        print(f"Error importing artist-release relationships: {str(e)}")
        raise

In [18]:
def main():
    """Main function to create schema and import data"""
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_artist_releases(engine)
        
        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()

Importing artist-release relationships...
Processed 10000 out of 3969489 artist-release relationships
Processed 20000 out of 3969489 artist-release relationships
Processed 30000 out of 3969489 artist-release relationships
Processed 40000 out of 3969489 artist-release relationships
Processed 50000 out of 3969489 artist-release relationships
Processed 60000 out of 3969489 artist-release relationships
Processed 70000 out of 3969489 artist-release relationships
Processed 80000 out of 3969489 artist-release relationships
Processed 90000 out of 3969489 artist-release relationships
Processed 100000 out of 3969489 artist-release relationships
Processed 110000 out of 3969489 artist-release relationships
Processed 120000 out of 3969489 artist-release relationships
Processed 130000 out of 3969489 artist-release relationships
Processed 140000 out of 3969489 artist-release relationships
Processed 150000 out of 3969489 artist-release relationships
Processed 160000 out of 3969489 artist-release relat

In [None]:
"""def main():
    "Main function to create schema and import data"
    try:
        engine = create_mysql_engine()
        
        # Import data table by table
        import_artists(engine)
        import_releases(engine)
        import_tracks(engine)
        import_audio_features(engine)
        import_artist_tracks(engine)
        import_artist_releases(engine)
        
        print("All data imported successfully!")
    except Exception as e:
        print(f"Error in main execution: {str(e)}")
        raise

if __name__ == "__main__":
    main()"""

Importing artists...


KeyboardInterrupt: 