Code used to create tables is in src/utils/create_db.sql file

This notebook does etl to db in Cloud SQL, as a source of data it takes top_songs_curated.csv from data directory

In [6]:
import csv
import psycopg2
from psycopg2 import sql
from datetime import datetime
import logging
import os
from dotenv import load_dotenv

In [7]:
load_dotenv()

# --- Database Connection Parameters ---
DB_NAME = os.getenv("DB_NAME", "database-instance")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
DB_HOST = os.getenv("DB_HOST")
DB_PORT = os.getenv("DB_PORT", "5432")

# --- CSV File Path ---
CSV_FILE_PATH = "../data/top_songs_curated.csv"

In [8]:
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)

In [9]:
def parse_date(date_str):
    """
    Parses a date string into a datetime.date object.
    Handles empty or invalid date strings by returning None.
    """
    if not date_str:
        return None
    try:
        if len(date_str) == 10:  # YYYY-MM-DD
            return datetime.strptime(date_str, "%Y-%m-%d").date()
        else:
            return None
    except ValueError:
        print(f"Warning: Could not parse date string: {date_str}. Storing as NULL.")
        return None

In [10]:
def get_db_connection():
    """Establishes and returns a database connection."""
    logging.info(
        f"Attempting to connect to database '{DB_NAME}' on {DB_HOST}:{DB_PORT}..."
    )
    try:
        conn = psycopg2.connect(
            dbname=DB_NAME,
            user=DB_USER,
            password=DB_PASSWORD,
            host=DB_HOST,
            port=DB_PORT,
        )
        logging.info("Successfully connected to the database.")
        return conn
    except psycopg2.OperationalError as e:
        logging.error(f"Database connection failed: {e}")
        raise

In [11]:
def process_csv_and_load_data(conn, csv_filepath):
    """
    Reads the CSV file, processes each row, and loads data into the database.

    Args:
        conn: A psycopg2 database connection object.
        csv_filepath (str): The path to the CSV file.
    """
    logging.info(f"Starting ETL process for CSV: {csv_filepath}")
    processed_rows = 0
    inserted_tracks = 0
    updated_tracks = 0
    inserted_genres = 0
    linked_track_genres = 0

    required_cols = [
        "Track URI",
        "Track Name",
        "Artist Name(s)",
        "Album Name",
        "Album Release Date",
        "Album Image URL",
        "Track Duration (ms)",
        "Explicit",
        "Popularity",
        "Artist Genres",
        "youtube_title",
        "youtube_url",
    ]

    try:
        with open(csv_filepath, mode="r", encoding="utf-8") as csvfile:
            csv_reader = csv.DictReader(csvfile)

            header = csv_reader.fieldnames
            if not header:
                logging.error("CSV file is empty or has no header.")
                return

            missing_cols = [col for col in required_cols if col not in header]
            if missing_cols:
                logging.error(
                    f"Missing required columns in CSV header: {', '.join(missing_cols)}"
                )
                logging.info(f"Available columns: {header}")
                return

            with conn.cursor() as cursor:
                for row_num, row in enumerate(csv_reader, 1):
                    try:
                        # 1. Extract and prepare track data
                        original_track_uri = row.get("Track URI")
                        track_name = row.get("Track Name")

                        if not original_track_uri:
                            logging.warning(
                                f"Skipping row {row_num} due to missing 'Track URI'."
                            )
                            continue
                        if not track_name:
                            logging.warning(
                                f"Skipping row {row_num} (URI: {original_track_uri}) due to missing 'Track Name'."
                            )
                            continue

                        artist_names = row.get("Artist Name(s)")
                        album_name = row.get("Album Name")
                        album_release_date_str = row.get("Album Release Date")
                        album_image_url = row.get("Album Image URL")

                        track_duration_ms_str = row.get("Track Duration (ms)")
                        track_duration_ms = (
                            int(track_duration_ms_str)
                            if track_duration_ms_str and track_duration_ms_str.isdigit()
                            else None
                        )

                        explicit_str = row.get("Explicit", "").strip().lower()
                        explicit = explicit_str == "true" if explicit_str else None

                        popularity_str = row.get("Popularity")
                        popularity = (
                            int(popularity_str)
                            if popularity_str and popularity_str.isdigit()
                            else None
                        )

                        youtube_title = row.get("youtube_title")
                        youtube_url = row.get("youtube_url")
                        artist_genres_str = row.get("Artist Genres", "")

                        album_release_date = parse_date(album_release_date_str)

                        # 2. Insert/Update Track and get track_id
                        track_sql = sql.SQL(
                            """
                            INSERT INTO tracks (
                                original_track_uri, track_name, artist_names, album_name,
                                album_release_date, album_image_url, track_duration_ms,
                                explicit, popularity, youtube_title, youtube_url
                            ) VALUES (
                                %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
                            )
                            ON CONFLICT (original_track_uri) DO UPDATE SET
                                track_name = EXCLUDED.track_name,
                                artist_names = EXCLUDED.artist_names,
                                album_name = EXCLUDED.album_name,
                                album_release_date = EXCLUDED.album_release_date,
                                album_image_url = EXCLUDED.album_image_url,
                                track_duration_ms = EXCLUDED.track_duration_ms,
                                explicit = EXCLUDED.explicit,
                                popularity = EXCLUDED.popularity,
                                youtube_title = EXCLUDED.youtube_title,
                                youtube_url = EXCLUDED.youtube_url,
                                updated_at = CURRENT_TIMESTAMP
                            RETURNING track_id, (xmax = 0) AS inserted;
                        """
                        )

                        cursor.execute(
                            track_sql,
                            (
                                original_track_uri,
                                track_name,
                                artist_names,
                                album_name,
                                album_release_date,
                                album_image_url,
                                track_duration_ms,
                                explicit,
                                popularity,
                                youtube_title,
                                youtube_url,
                            ),
                        )
                        track_id_result = cursor.fetchone()
                        if not track_id_result:
                            logging.error(
                                f"Failed to insert or update track (URI: {original_track_uri}). Skipping genre processing for this track."
                            )
                            continue

                        track_id, was_inserted = track_id_result
                        if was_inserted:
                            inserted_tracks += 1
                        else:
                            updated_tracks += 1
                        # logging.info(f"Processed track ID: {track_id} (URI: {original_track_uri}) - {'Inserted' if was_inserted else 'Updated'}")

                        # 3. Process and Insert Genres and Track-Genre links
                        if artist_genres_str:
                            genres_from_csv = [
                                genre.strip()
                                for genre in artist_genres_str.split(",")
                                if genre.strip()
                            ]
                            for genre_name in genres_from_csv:
                                if not genre_name:
                                    continue

                                # a. Insert Genre if not exists, and get genre_id
                                genre_id = None
                                cursor.execute(
                                    sql.SQL(
                                        "INSERT INTO genres (genre_name) VALUES (%s) ON CONFLICT (genre_name) DO NOTHING RETURNING genre_id;"
                                    ),
                                    (genre_name,),
                                )
                                result = cursor.fetchone()
                                if result:
                                    genre_id = result[0]
                                    inserted_genres += 1
                                    # logging.info(f"Inserted new genre: '{genre_name}' with ID: {genre_id}")
                                else:  # Genre already existed, fetch its ID
                                    cursor.execute(
                                        sql.SQL(
                                            "SELECT genre_id FROM genres WHERE genre_name = %s;"
                                        ),
                                        (genre_name,),
                                    )
                                    result = cursor.fetchone()
                                    if result:
                                        genre_id = result[0]
                                    else:
                                        logging.error(
                                            f"Could not find or insert genre: '{genre_name}' for track ID {track_id}. Skipping this genre link."
                                        )
                                        continue

                                # b. Insert Track-Genre link
                                try:
                                    cursor.execute(
                                        sql.SQL(
                                            "INSERT INTO track_genres (track_id, genre_id) VALUES (%s, %s) ON CONFLICT (track_id, genre_id) DO NOTHING;"
                                        ),
                                        (track_id, genre_id),
                                    )
                                    if (
                                        cursor.rowcount > 0
                                    ):  # rowcount is 1 if inserted, 0 if conflict and did nothing
                                        linked_track_genres += 1
                                        # logging.info(f"Linked track ID {track_id} to genre ID {genre_id} ('{genre_name}')")
                                except psycopg2.Error as link_err:
                                    logging.error(
                                        f"Error linking track ID {track_id} to genre ID {genre_id} ('{genre_name}'): {link_err}"
                                    )

                        processed_rows += 1
                        if processed_rows % 100 == 0:  # Log progress every 100 rows
                            logging.info(f"Processed {processed_rows} rows...")
                            conn.commit()  # Commit periodically for large files

                    except psycopg2.Error as db_err:
                        logging.error(
                            f"Database error processing row {row_num} (Track URI: {row.get('Track URI', 'N/A')}): {db_err}"
                        )
                        conn.rollback()  # Rollback current transaction segment
                        # Decide if you want to continue with the next row or stop
                    except Exception as e:
                        logging.error(
                            f"General error processing row {row_num} (Track URI: {row.get('Track URI', 'N/A')}): {e}"
                        )
                        conn.rollback()  # Rollback current transaction segment

                conn.commit()  # Final commit for any remaining operations
                logging.info("ETL process completed.")
                logging.info(f"Summary: Processed {processed_rows} rows from CSV.")
                logging.info(
                    f"Tracks: {inserted_tracks} inserted, {updated_tracks} updated."
                )
                logging.info(
                    f"Genres: {inserted_genres} newly inserted (others may have existed)."
                )
                logging.info(f"Track-Genre Links: {linked_track_genres} created.")

    except FileNotFoundError:
        logging.error(f"The file '{csv_filepath}' was not found.")
    except (
        psycopg2.Error
    ) as e:  # Catch errors related to initial connection or cursor creation
        logging.error(f"A database error occurred during ETL setup: {e}")
        if conn:
            conn.rollback()  # Ensure rollback if error happens before main loop
    except Exception as e:
        logging.error(f"An unexpected error occurred during the ETL process: {e}")
        if conn:  # Check if conn was successfully initialized
            conn.rollback()

In [12]:
logging.info("Starting Python ETL script...")

# Validate essential configurations
if DB_NAME == "your_db_name" or DB_USER == "your_db_user":
    logging.error(
        "CRITICAL: Default database credentials are still in use. Please update DB_NAME, DB_USER, and DB_PASSWORD."
    )
    exit(1)
if CSV_FILE_PATH == "your_spotify_data.csv" and not os.path.exists(CSV_FILE_PATH):
    logging.warning(
        f"Default CSV_FILE_PATH '{CSV_FILE_PATH}' is set. Make sure this file exists or update the path."
    )
    # You might want to exit here if the default path is unlikely to be correct and doesn't exist.
    # For now, we'll let it proceed and fail in process_csv_and_load_data if not found.

connection = None
try:
    connection = get_db_connection()
    if connection:
        process_csv_and_load_data(connection, CSV_FILE_PATH)
except Exception as e:
    # This will catch connection errors from get_db_connection or any other unhandled exceptions
    logging.critical(f"ETL process failed critically: {e}")
finally:
    if connection:
        connection.close()
        logging.info("Database connection closed.")
logging.info("Python ETL script finished.")

2025-05-19 17:59:00,844 - INFO - Starting Python ETL script...
2025-05-19 17:59:00,846 - INFO - Attempting to connect to database 'database-instance' on 34.140.62.43:5432...
2025-05-19 17:59:01,539 - INFO - Successfully connected to the database.
2025-05-19 17:59:01,540 - INFO - Starting ETL process for CSV: ../data/top_songs_curated.csv
2025-05-19 17:59:36,632 - INFO - Processed 100 rows...
2025-05-19 18:00:12,814 - INFO - Processed 200 rows...
2025-05-19 18:00:46,738 - INFO - Processed 300 rows...
2025-05-19 18:01:34,232 - INFO - Processed 400 rows...
2025-05-19 18:02:01,890 - INFO - Processed 500 rows...
2025-05-19 18:02:32,500 - INFO - Processed 600 rows...
2025-05-19 18:02:57,848 - INFO - Processed 700 rows...
2025-05-19 18:03:32,180 - INFO - Processed 800 rows...
2025-05-19 18:04:10,520 - INFO - Processed 900 rows...
2025-05-19 18:05:00,598 - INFO - Processed 1000 rows...
2025-05-19 18:05:26,615 - INFO - ETL process completed.
2025-05-19 18:05:26,618 - INFO - Summary: Processed 1