In [1]:
# ==========================================================
# ETL Pipeline: Books Dataset → MySQL (books_db)
# Author: Janaki Ram
# Description: End-to-end ETL with structured logging and clean transformations
# ==========================================================

In [2]:
# import libraries
import pandas as pd
from sqlalchemy import create_engine
import os
import re
import csv
import getpass
import urllib.parse
from datetime import datetime

In [3]:
# configuration
DB_USER = "root"
DB_PASS = getpass.getpass("Enter MySQL Workbench password: ")
encoded_pass = urllib.parse.quote_plus(DB_PASS)
DB_HOST = "localhost"
DB_NAME = "books_db"

# file paths
BOOKS_FILE = r"D:\Work\Data Analytics\ETL\books_data\csv\books.csv"
RATINGS_FILE = r"D:\Work\Data Analytics\ETL\books_data\csv\ratings.csv"
USERS_FILE = r"D:\Work\Data Analytics\ETL\books_data\csv\users.csv"

# create a log file to record
LOG_FILE = "etl_log.txt"

# create an excel file to write the data
STAGING_FILE = r"D:\Work\Data Analytics\ETL\books_data\books_staging.xlsx"

Enter MySQL Workbench password:  ········


In [4]:
# logging setup
def log_message(message):
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(LOG_FILE, "a", encoding="utf-8") as log:
        log.write(f"[{timestamp}] {message}\n")

    print(message)

log_message("========== ETL JOB STARTED ==========")



In [5]:
# database connection
try:
    engine = create_engine(f"mysql+pymysql://{DB_USER}:{encoded_pass}@{DB_HOST}/{DB_NAME}")
    log_message("MySQL connection established successfully")

except Exception as e:
    log_message(f"Database connection failed: {e}")
    raise SystemExit()

MySQL connection established successfully


In [6]:
# EXTRACT PHASE

log_message("EXTRACTING HERE...")

def edit_columns(df):
    df = df.copy()

    df.columns = (
        df.columns
        .str.strip()
        .str.lower()
        .str.replace(' ', '_')
        .str.replace('-', '_')
        .str.replace(r'[^0-9a-z_]', '', regex=True)
    )
    return df

# function to safely read the file contents
def read_csv_safe(filepath):
    try:
        df = pd.read_csv(
            filepath, 
            sep=';', 
            quoting=csv.QUOTE_NONE, 
            encoding='latin-1', 
            on_bad_lines='skip',
            engine='python'
        )
        df.columns = [c.strip().strip('"') for c in df.columns] #removing the extra " that's binding the column

        for col in df.columns:
            if df[col].dtype == object:
                df[col] = df[col].astype(str).str.strip().str.strip('"')

        df = edit_columns(df) #editing the columns with a function
        
        print(f"Loaded {os.path.basename(filepath)} - {df.shape[0]} rows, {df.shape[1]} columns")
        return df

    except FileNotFoundError:
        print(f"File not found: {filepath}")
    except pd.errors.EmptyDataError:
        print(f"No data found in: {filepath}")
    except Exception as e:
        print(f"Error loading {filepath}: {e}")
    return pd.DataFrame()

# read files
books_raw = read_csv_safe(BOOKS_FILE)
ratings_raw = read_csv_safe(RATINGS_FILE)
users_raw = read_csv_safe(USERS_FILE)

if books_raw.empty or ratings_raw.empty or users_raw.empty:
    log_message("ETL stopped: One or more files failed to load.")
    raise SystemExit()

log_message("========== EXTRACT PHASE FINISHED ==========")

EXTRACTING HERE...
Loaded books.csv - 250012 rows, 8 columns
Loaded ratings.csv - 1149780 rows, 3 columns
Loaded users.csv - 278700 rows, 3 columns


In [7]:
books_raw.head()

Unnamed: 0,isbn,book_title,book_author,year_of_publication,publisher,image_url_s,image_url_m,image_url_l
0,195153448,Classical Mythology,Mark P. O. Morford,2002,Oxford University Press,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...,http://images.amazon.com/images/P/0195153448.0...
1,2005018,Clara Callan,Richard Bruce Wright,2001,HarperFlamingo Canada,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...,http://images.amazon.com/images/P/0002005018.0...
2,60973129,Decision in Normandy,Carlo D'Este,1991,HarperPerennial,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...,http://images.amazon.com/images/P/0060973129.0...
3,374157065,Flu: The Story of the Great Influenza Pandemic...,Gina Bari Kolata,1999,Farrar Straus Giroux,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...,http://images.amazon.com/images/P/0374157065.0...
4,399135782,The Kitchen God's Wife,Amy Tan,1991,Putnam Pub Group,http://images.amazon.com/images/P/0399135782.0...,http://images.amazon.com/images/P/0399135782.0...,http://images.amazon.com/images/P/0399135782.0...


In [8]:
ratings_raw.head()

Unnamed: 0,user_id,isbn,book_rating
0,276725,034545104X,0
1,276726,0155061224,5
2,276727,0446520802,0
3,276729,052165615X,3
4,276729,0521795028,6


In [9]:
users_raw.head()

Unnamed: 0,user_id,location,age
0,1,"nyc, new york, usa",
1,2,"stockton, california, usa",18.0
2,3,"moscow, yukon territory, russia",
3,4,"porto, v.n.gaia, portugal",17.0
4,5,"farnborough, hants, united kingdom",


In [10]:
# transform books data

def transform_books(df):
    log_message("Transforming books data...")

    # clean columns
    df['year_of_publication'] = pd.to_numeric(df['year_of_publication'], errors='coerce')
    df.loc[(df['year_of_publication'] < 1500) | (df['year_of_publication'] > datetime.now().year), 'year_of_publication'] = pd.NA

    # ensure ISBN as string
    df['isbn'] = df['isbn'].astype(str).str.strip()
    df = df[df['isbn'] != ""]
    
    # remove duplicates
    before = df.shape[0]
    df = df.drop_duplicates()
    after = df.shape[0]
    
    log_message(f"Books: removed {before - after} duplicate rows")

    log_message(f"Books transformed: {df.shape[0]} rows, {df.shape[1]} columns")
    return df

In [11]:
# transform ratings data

def transform_ratings(df):
    log_message("Transforming ratings data...")

    # clean columns
    df['user_id'] = pd.to_numeric(df['user_id'], errors='coerce').astype('Int64')
    df['book_rating'] = pd.to_numeric(df['book_rating']).astype('Int64')

    # Drop rows with no user_id or isbn
    before = df.shape[0]
    df['isbn'] = df['isbn'].astype(str).str.strip()
    df = df.dropna(subset = ['user_id', 'isbn'])
    after = df.shape[0]

    log_message(f"Ratings: dropped {before - after} rows with missing keys")

    # drop duplicates
    before = df.shape[0]
    df = df.drop_duplicates()
    after = df.shape[0]

    log_message(f"Ratings: removed {before - after} duplicate rows")

    log_message(f"Ratings transformed: {df.shape[0]} rows, {df.shape[1]} columns")
    return df

In [12]:
# transform users data

def transform_users(df):
    log_message("Transforming users data...")

    # clean columns
    df['user_id'] = pd.to_numeric(df['user_id'], errors='coerce').astype('Int64')
    df['age'] = pd.to_numeric(df['age'], errors='coerce').astype('Int64')
    df.loc[(df['age'] < 5) | (df['age'] > 100), 'age'] = pd.NA

    # split location into city, state, country
    location_split = df['location'].str.split(',', n=2, expand=True)
    df['city'] = location_split[0].str.strip()
    df['state'] = location_split[1].str.strip() if location_split.shape[1] > 1 else pd.NA
    df['country'] = location_split[2].str.strip() if location_split.shape[1] > 2 else pd.NA

    # drop duplicates
    before = df.shape[0]
    df = df.drop_duplicates()
    after = df.shape[0]

    log_message(f"Users: removed {before - after} duplicate rows")

    log_message(f"Users transformed: {df.shape[0]} rows, {df.shape[1]} columns")
    return df

In [13]:
# TRANSFORM PHASE

log_message("TRANSFORMING HERE...")

books_clean = transform_books(books_raw)
ratings_clean = transform_ratings(ratings_raw)
users_clean = transform_users(users_raw)

# staging
MAX_EXCEL_ROWS = 1_048_575  # Excel row limit

def write_df_with_row_limit(df, writer, base_sheet_name):
    n = len(df)
    if n <= MAX_EXCEL_ROWS:
        df.to_excel(writer, sheet_name=base_sheet_name, index=False)
    else:
        part = 1
        start = 0
        while start < n:
            end = min(start + MAX_EXCEL_ROWS, n)
            sheet = f"{base_sheet_name}_{part}"
            df.iloc[start:end].to_excel(writer, sheet_name=sheet, index=False)
            start = end
            part += 1

log_message("Creating Excel staging file...")

with pd.ExcelWriter(STAGING_FILE, engine='openpyxl') as writer:
    write_df_with_row_limit(books_clean, writer, 'Books')
    write_df_with_row_limit(ratings_clean, writer, 'Ratings')
    write_df_with_row_limit(users_clean, writer, 'Users')

log_message(f"Staging Excel created: {STAGING_FILE}")
log_message("========== TRANSFORM PHASE FINISHED ==========")

TRANSFORMING HERE...
Transforming books data...
Books: removed 1 duplicate rows
Books transformed: 250011 rows, 8 columns
Transforming ratings data...
Ratings: dropped 0 rows with missing keys
Ratings: removed 0 duplicate rows
Ratings transformed: 1149780 rows, 3 columns
Transforming users data...
Users: removed 0 duplicate rows
Users transformed: 278700 rows, 6 columns
Creating Excel staging file...
Staging Excel created: D:\Work\Data Analytics\ETL\books_data\books_staging.xlsx


In [14]:
# from openpyxl import load_workbook

# def write_head_summary(excel_file):
#     wb = load_workbook(excel_file)
#     writer = pd.ExcelWriter(excel_file, engine='openpyxl')
#     writer.book = wb
#     writer.sheets = {ws.title: ws for ws in wb.worksheets}

#     # read sheets
#     books = pd.read_excel(excel_file, sheet_name="Books")
#     ratings = pd.read_excel(excel_file, sheet_name="Ratings")
#     users = pd.read_excel(excel_file, sheet_name="Users")

#     start_row = 0

#     # write summary sheet
#     summary_sheet = "Summary"

#     df_list = [
#         ("Books", books.head()),
#         ("Ratings", ratings.head()),
#         ("Users", users.head())
#     ]

#     for name, df in df_list:
#         # Title
#         writer.book.create_sheet(summary_sheet) if summary_sheet not in writer.book.sheetnames else None
#         ws = writer.book[summary_sheet]
#         ws.cell(row=start_row + 1, column=1, value=f"=== {name} (head) ===")

#         # Write df below title
#         df.to_excel(writer, sheet_name=summary_sheet, startrow=start_row + 2, index=False)

#         # Move pointer for next block (+2 for spacing)
#         start_row += len(df) + 4

#     writer.save()
#     writer.close()

# write_head_summary(STAGING_FILE)

In [15]:
# LOAD PHASE

log_message("LOADING HERE...")

xls = pd.ExcelFile(STAGING_FILE, engine='openpyxl')

books_final = pd.read_excel(xls, sheet_name='Books')
users_final = pd.read_excel(xls, sheet_name='Users')

rating_sheets = [s for s in xls.sheet_names if s.startswith("Ratings")]
ratings_final = pd.concat([pd.read_excel(xls, s) for s in rating_sheets], ignore_index=True)

def load_to_mysql(df, sheet):
    try:
        df.to_sql(name=sheet, con=engine, if_exists='replace', index=False)
        log_message(f"Loaded sheet '{sheet}' -> {df.shape[0]} rows")
    except Exception as e:
        log_message(f"Failed to load '{sheet}': {e}")


load_to_mysql(books_final, "books")
load_to_mysql(ratings_final, "ratings")
load_to_mysql(users_final, "users")

log_message("All sheets loaded successfully.")
log_message("========== LOAD PHASE FINISHED ==========")
log_message("========== ETL JOB COMPLETED ==========\n")

LOADING HERE...
Loaded sheet 'books' -> 250011 rows
Loaded sheet 'ratings' -> 1149780 rows
Loaded sheet 'users' -> 278700 rows
All sheets loaded successfully.

