In [None]:
# -----------------------------
# Imports
# -----------------------------

import pandas as pd              # Data manipulation and analysis
import os                        # Operating system utilities
from sqlalchemy import create_engine  # Database connection engine
import logging                   # Logging system for debugging and monitoring
from pathlib import Path         # Path handling
import time                      # Time-related functions


# -----------------------------
# Project paths
# -----------------------------

# Root directory of the project
# Path.cwd() = current working directory
# .parent moves one level up
ROOT = Path.cwd().parent

# Directory where log files will be stored
LOGS_DIR = ROOT / "logs"

# Directory where data files are stored
DATA_DIR = ROOT / "data"


# -----------------------------
# Ensure directories exist
# -----------------------------
LOGS_DIR.mkdir(parents=True, exist_ok=True)
DATA_DIR.mkdir(parents=True, exist_ok=True)


# -----------------------------
# Logging configuration
# -----------------------------

logging.basicConfig(
    filename=LOGS_DIR / "sales_db.log",   # Log file location
    level=logging.DEBUG,                  # Capture all levels (DEBUG â†’ CRITICAL)
    format="%(asctime)s - %(levelname)s - %(message)s",
    filemode="a"                          # Append logs
)

logging.info("Logging system initialized successfully.")


In [2]:
engine = create_engine("sqlite:///sales.db")
print("LOGS DIR:", LOGS_DIR)
print("DATA DIR:", DATA_DIR)

LOGS DIR: c:\Users\aceve\Documents\GitHub\Supplier-performance-data-analysis\logs
DATA DIR: c:\Users\aceve\Documents\GitHub\Supplier-performance-data-analysis\data


In [3]:
def ingest_db(df, table_name, engine):
    """this function will ingest the dataframe into db table"""

    df.to_sql(
        name=table_name,     # Target table name
        con=engine,          # Database connection
        if_exists="replace", # Replace table if it exists
        index=False          # Do NOT write DataFrame index as a column
    )


In [4]:
def load_raw_data():
    """
    Loads all CSV files from the data directory, converts them into
    pandas DataFrames, and ingests them into the database.
    """

    # Start timer to measure ingestion performance
    start = time.time()

    # Loop through all CSV files in the DATA_DIR
    for file in DATA_DIR.glob("*.csv"):

        try:
            # Read CSV file into a DataFrame
            df = pd.read_csv(file)

            # Log which file is being ingested
            logging.info(f"Ingesting '{file.name}' into database")

            # Use file name (without .csv) as table name
            ingest_db(df, file.stem, engine)

        except Exception as e:
            # Log error but continue with next file
            logging.error(f"Failed to ingest '{file.name}': {e}")

    # End timer
    end = time.time()

    # Total time in minutes
    total_time = round((end - start) / 60, 2)

    # Final logs
    logging.info("-------- Ingestion Complete --------")
    logging.info(f"Total time taken: {total_time} minutes")


In [None]:
load_raw_data()