In [1]:
pip install psycopg2

Defaulting to user installation because normal site-packages is not writeableNote: you may need to restart the kernel to use updated packages.




[notice] A new release of pip is available: 24.0 -> 24.1.2
[notice] To update, run: python.exe -m pip install --upgrade pip


In [4]:
#pip install --upgrade pip

In [5]:
import psycopg2
import pandas as pd
import csv
import os
from pathlib import Path

## Extraction

In [45]:
def extract(folder_path):
    """
    Load all CSV files in the given folder into dataFrames.
    
    Args:
        folder_path (str or Path): Path to the folder containing CSV files.

    Returns:
        dict: A dictionary with file names as keys and DataFrames as values.
    """
    data_folder = Path(folder_path)
    csv_files = list(data_folder.glob("*.csv"))
    dfs = {}

    for file_path in csv_files:
        try:
            df = pd.read_csv(file_path)
            key = file_path.stem.replace(" ","_").lower()
            dfs[key] = df
            dataframes[file_path.name] = df
            print(f"Loaded: {file_path.name} dfs['{key}'], {df.shape[0]} rows {df.shape[1]} cols")
        except pd.errors.EmptyDataError:
            print(f"Skipped (empty): {file_path.name}")
        except pd.errors.ParserError:
            print(f"Skipped (parser error): {file_path.name}")
        except Exception as e:
            print(f"Failed to load {file_path.name}: {e}")
    return dfs


In [46]:
df = extract('data')

Loaded: brands.csv dfs['brands'], 9 rows 2 cols
Loaded: categories.csv dfs['categories'], 7 rows 2 cols
Loaded: customers.csv dfs['customers'], 1445 rows 9 cols
Loaded: orders.csv dfs['orders'], 1615 rows 8 cols
Loaded: order_items.csv dfs['order_items'], 4722 rows 6 cols
Loaded: products.csv dfs['products'], 321 rows 6 cols
Loaded: staffs.csv dfs['staffs'], 10 rows 8 cols
Loaded: stocks.csv dfs['stocks'], 939 rows 3 cols
Loaded: stores.csv dfs['stores'], 3 rows 8 cols


## Transform

In [47]:
df['customers'].head()

Unnamed: 0,customer_id,first_name,last_name,phone,email,street,city,state,zip_code
0,1,Debra,Burks,,debra.burks@yahoo.com,9273 Thorne Ave.,Orchard Park,NY,14127
1,2,Kasha,Todd,,kasha.todd@yahoo.com,910 Vine Street,Campbell,CA,95008
2,3,Tameka,Fisher,,tameka.fisher@aol.com,769C Honey Creek St.,Redondo Beach,CA,90278
3,4,Daryl,Spence,,daryl.spence@aol.com,988 Pearl Lane,Uniondale,NY,11553
4,5,Charolette,Rice,(916) 381-6003,charolette.rice@msn.com,107 River Dr.,Sacramento,CA,95820


In [48]:
import hashlib

In [51]:
customers = df["customers"]
customers.head()

Unnamed: 0,customer_id,first_name,last_name,phone,email,street,city,state,zip_code
0,1,Debra,Burks,,debra.burks@yahoo.com,9273 Thorne Ave.,Orchard Park,NY,14127
1,2,Kasha,Todd,,kasha.todd@yahoo.com,910 Vine Street,Campbell,CA,95008
2,3,Tameka,Fisher,,tameka.fisher@aol.com,769C Honey Creek St.,Redondo Beach,CA,90278
3,4,Daryl,Spence,,daryl.spence@aol.com,988 Pearl Lane,Uniondale,NY,11553
4,5,Charolette,Rice,(916) 381-6003,charolette.rice@msn.com,107 River Dr.,Sacramento,CA,95820


In [59]:
# remove PII
#assumptions are that the null phone numbers will be updated at some point
def hash_value(val):
    if pd.isna(val):
        return ""
    return hashlib.sha256(str(val).encode("utf-8")).hexdigest()

df["customers"]["phone"] = df["customers"]["phone"].apply(hash_value)
df["customers"]["email"] = df["customers"]["email"].apply(hash_value)
df["customers"].head()

Unnamed: 0,customer_id,first_name,last_name,phone,email,street,city,state,zip_code
0,1,Debra,Burks,e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b93...,20a5a8de2efdb443bf4db4af9c923da4db0dda97f4813c...,9273 Thorne Ave.,Orchard Park,NY,14127
1,2,Kasha,Todd,e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b93...,8cdd0ae80572fce2ccc844ba7bcfb54b445e31489703ca...,910 Vine Street,Campbell,CA,95008
2,3,Tameka,Fisher,e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b93...,d72a8a538ced48c5f5c15479eb5d04bc9cbf7ece9dca64...,769C Honey Creek St.,Redondo Beach,CA,90278
3,4,Daryl,Spence,e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b93...,c810b26977b5506676c4e5a218b4719bb4780f3e528150...,988 Pearl Lane,Uniondale,NY,11553
4,5,Charolette,Rice,66fc3de81238684d81f6a9a39bb3a7ec7ebafe58ea2ef1...,922be691172d9cd99d323f831eff27db4c7fad915e8e40...,107 River Dr.,Sacramento,CA,95820


In [61]:
df["staffs"].head()

Unnamed: 0,staff_id,first_name,last_name,email,phone,active,store_id,manager_id
0,1,Fabiola,Jackson,fabiola.jackson@bikes.shop,(831) 555-5554,1,1,
1,2,Mireya,Copeland,mireya.copeland@bikes.shop,(831) 555-5555,1,1,1.0
2,3,Genna,Serrano,genna.serrano@bikes.shop,(831) 555-5556,1,1,2.0
3,4,Virgie,Wiggins,virgie.wiggins@bikes.shop,(831) 555-5557,1,1,2.0
4,5,Jannette,David,jannette.david@bikes.shop,(516) 379-4444,1,2,1.0


In [None]:
df["staffs"]["phone"] = df["staffs"]["phone"].apply(hash_value)
df["staffs"]["email"] = df["staffs"]["email"].apply(hash_value)

## Loading

In [64]:
pip install dotenv

Defaulting to user installation because normal site-packages is not writeable
Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting python-dotenv (from dotenv)
  Downloading python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading dotenv-0.9.9-py2.py3-none-any.whl (1.9 kB)
Downloading python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv, dotenv

   ---------------------------------------- 0/2 [python-dotenv]
   ---------------------------------------- 0/2 [python-dotenv]
   ---------------------------------------- 0/2 [python-dotenv]
   -------------------- ------------------- 1/2 [dotenv]
   ---------------------------------------- 2/2 [dotenv]

Successfully installed dotenv-0.9.9 python-dotenv-1.1.1
Note: you may need to restart the kernel to use updated packages.


In [72]:
from sqlalchemy import create_engine
from dotenv import load_dotenv
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import text


In [76]:
def create_database():
    # Load .env variables
    load_dotenv()

    # Fetch DB credentials
    db_host = os.getenv("DB_HOST")
    db_port = os.getenv("DB_PORT", "5432")
    db_name = os.getenv("DB_NAME")
    db_user = os.getenv("DB_USER")
    db_password = os.getenv("DB_PASSWORD")
    # Create database connection to default DB
    conn = psycopg2.connect(f"""host={db_host} 
                                dbname={db_name}
                                user={db_user} 
                                password={db_password}
                             """)
    # Autocommit
    conn.set_session(autocommit=True)
    
    # Create cursor object
    cur = conn.cursor()

    # Create bike_store database
    cur.execute("DROP DATABASE IF EXISTS Bike_Store")
    cur.execute("CREATE DATABASE Bike_Store")

    # Close default db connection
    cur.close()
    conn.close()

    # Connect to Bike_Store database
    conn = psycopg2.connect(f"host={db_host} dbname=bike_store user={db_user} password={db_password}")
    cur = conn.cursor()

    return cur, conn

In [77]:
cur, conn = create_database()

In [78]:
cur.execute("SELECT current_database();")
print(cur.fetchone())

('bike_store',)


In [84]:

tables = {
    ("customer_table","""CREATE TABLE IF NOT EXISTS Customers(customer_id INT PRIMARY KEY, 
                  first_name VARCHAR (255) NOT NULL, 
                  last_name VARCHAR (255) NOT NULL, 
                  street VARCHAR (255),
                  city VARCHAR (50),
                  state VARCHAR (25),
                  zip_code VARCHAR (5)
                  );
                  """),
                  
    ("orders","""CREATE TABLE IF NOT EXISTS Orders(
                order_id INT PRIMARY KEY,
	            customer_id INT,
	            order_status INT NOT NULL,
	            -- Order status: 1 = Pending; 2 = Processing; 3 = Rejected; 4 = Completed
	            order_date DATE NOT NULL,
	            required_date DATE NOT NULL,
	            shipped_date DATE,
	            store_id INT NOT NULL,
	            staff_id INT NOT NULL,
	            FOREIGN KEY (customer_id) REFERENCES customers (customer_id) ON DELETE CASCADE ON UPDATE CASCADE,
	            FOREIGN KEY (store_id) REFERENCES stores (store_id) ON DELETE CASCADE ON UPDATE CASCADE,
	            FOREIGN KEY (staff_id) REFERENCES staff (staff_id) ON DELETE NO ACTION ON UPDATE NO ACTION
                );
                """),

    ("order_items","""CREATE TABLE IF NOT EXISTS Order_items(
                    order_id INT,
	                item_id INT,
	                product_id INT NOT NULL,
	                quantity INT NOT NULL,
	                list_price DECIMAL (10, 2) NOT NULL,
	                discount DECIMAL (4, 2) NOT NULL DEFAULT 0,
	                PRIMARY KEY (order_id, item_id),
	                FOREIGN KEY (order_id) REFERENCES orders (order_id) ON DELETE CASCADE ON UPDATE CASCADE,
	                FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE CASCADE ON UPDATE CASCADE
                    );
                    """),

    ("stores","""CREATE TABLE IF NOT EXISTS Stores(
              store_id INT PRIMARY KEY,
              store_name VARCHAR (255) NOT NULL,
              street VARCHAR (255),
              city VARCHAR (255),
              state VARCHAR (10),
              zip_code VARCHAR (5)
              );
            """),

    ("staffs","""CREATE TABLE IF NOT EXISTS Staff(
               staff_id INT PRIMARY KEY,
               first_name VARCHAR (50) NOT NULL,
               last_name VARCHAR (50) NOT NULL,
               active INT NOT NULL,
               store_id INT NOT NULL,
               manager_id INT,
               FOREIGN KEY (store_id) REFERENCES stores (store_id) ON DELETE CASCADE ON UPDATE CASCADE,
               FOREIGN KEY (manager_id) REFERENCES staff(staff_id) ON DELETE NO ACTION ON UPDATE NO ACTION
               );"""),

    ("categories","""CREATE TABLE IF NOT EXISTS Categories(category_id INT PRIMARY KEY,
                 category_name VARCHAR (255) NOT NULL
                 );"""),

    ("brands","""CREATE TABLE IF NOT EXISTS Brands(brand_id INT PRIMARY KEY,
                brand_name VARCHAR (255) NOT NULL);"""),

    ("products","""CREATE TABLE IF NOT EXISTS Products(product_id INT PRIMARY KEY,
                 product_name VARCHAR (255) NOT NULL,
                 brand_id INT NOT NULL,
                 category_id INT NOT NULL,
                 model_year SMALLINT NOT NULL,
                 list_price DECIMAL (10, 2) NOT NULL,
                 FOREIGN KEY (category_id) REFERENCES categories(category_id) ON DELETE CASCADE ON UPDATE CASCADE,
                 FOREIGN KEY (brand_id) REFERENCES brands(brand_id) ON DELETE CASCADE ON UPDATE CASCADE
                 );"""),

    ("stocks","""CREATE TABLE IF NOT EXISTS Stocks(store_id INT,
                product_id INT,
                quantity INT,
                PRIMARY KEY (store_id, product_id),
                FOREIGN KEY (store_id) REFERENCES stores (store_id) ON DELETE CASCADE ON UPDATE CASCADE,
                FOREIGN KEY (product_id) REFERENCES products (product_id) ON DELETE CASCADE ON UPDATE CASCADE)""")

                  }

In [85]:
for name, sql in tables:
    try:
        cur.execute(sql)
        conn.commit()
        print(f"Created table: {name}")
    except Exception as e:
        print(f"Failed to create {name}: {e}")
        conn.rollback()

Created table: customer_table
Created table: categories
Created table: staffs
Created table: stocks
Created table: order_items
Created table: orders
Created table: brands
Created table: stores
Created table: products


In [86]:
from sqlalchemy import create_engine

In [90]:
db_host = os.getenv("DB_HOST")
db_port = os.getenv("DB_PORT", "5432")
db_name = os.getenv("DB_NAME")
db_user = os.getenv("DB_USER")
db_password = os.getenv("DB_PASSWORD")

In [91]:
connection_uri = f"postgresql+psycopg2://{db_user}:{db_password}@{db_host}:{db_port}/{db_name}"

In [97]:
engine = create_engine(connection_uri)

In [93]:
df.keys()

dict_keys(['brands', 'categories', 'customers', 'orders', 'order_items', 'products', 'staffs', 'stocks', 'stores'])

In [100]:
for table, dfs in df.items():
    try:
        dfs.to_sql(table, engine, if_exists="replace", index=False)
        print(f" Loaded {len(dfs)} rows into '{table}'")
    except Exception as e:
        print(f" Failed to load '{table}': {e}")


 Loaded 9 rows into 'brands'
 Loaded 7 rows into 'categories'
 Loaded 1445 rows into 'customers'
 Loaded 1615 rows into 'orders'
 Loaded 4722 rows into 'order_items'
 Loaded 321 rows into 'products'
 Loaded 10 rows into 'staffs'
 Loaded 939 rows into 'stocks'
 Loaded 3 rows into 'stores'
