# Data Warehouse

## Imports

In [153]:
import mysql.connector
import pandas as pd
import warnings
from datetime import datetime

warnings.filterwarnings(
    "ignore",
    message="pandas only supports SQLAlchemy connectable",
    category=UserWarning,
)

## Connection

### Staging Area

In [154]:
staging_area_conn = mysql.connector.connect(host="host.docker.internal", user="deds", passwd="deds", database="deds")
staging_area_cursor = staging_area_conn.cursor()

### Data Warehouse

In [155]:
warehouse_conn = mysql.connector.connect(
    host="host.docker.internal", user="deds", passwd="deds", database="warehouse", port=3307
)
warehouse_cursor = warehouse_conn.cursor()

## Reset database

In [156]:
warehouse_cursor.execute("DELETE FROM DimProduct")
warehouse_cursor.execute("DELETE FROM DimVendor")
warehouse_cursor.execute("DELETE FROM DimEmployee")

warehouse_conn.commit()

## Tables

### Product

In [157]:
def create_product_row(
    id,
    name,
    color,
    size,
    size_unit_measure_code,
    weight,
    weight_unit_measure_code,
    discontinued,
    standard_cost,
    price,
    safety_stock_level,
    reorder_point,
    product_category_name,
):
    def handle_nan(value):
        return None if pd.isnull(value) else value

    id = handle_nan(id)
    name = handle_nan(name)
    color = handle_nan(color)
    size = handle_nan(size)
    size_unit_measure_code = handle_nan(size_unit_measure_code)
    weight = handle_nan(weight)
    weight_unit_measure_code = handle_nan(weight_unit_measure_code)
    discontinued = handle_nan(discontinued)
    standard_cost = handle_nan(standard_cost)
    price = handle_nan(price)
    safety_stock_level = handle_nan(safety_stock_level)
    reorder_point = handle_nan(reorder_point)
    product_category_name = handle_nan(product_category_name)

    query = """
    INSERT INTO DimProduct (
        ProductID, Name, Color, Size, SizeUnitMeasureCode, Weight, WeightUnitMeasureCode,
        Discontinued, StandardCost, Price, SafetyStockLevel, ReorderPoint, ProductCategoryName
    ) VALUES (
        %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
    )
    """

    values = (
        id,
        name,
        color,
        size,
        size_unit_measure_code,
        weight,
        weight_unit_measure_code,
        discontinued,
        standard_cost,
        price,
        safety_stock_level,
        reorder_point,
        product_category_name,
    )

    try:
        warehouse_cursor.execute(query, values)
        print(f"Successfully loaded product {id} into warehouse")
    except mysql.connector.Error as err:
        print(values)
        print(f"Failed to load product into warehouse: {err}")

    warehouse_conn.commit()


staging_area_product_data = pd.read_sql_query("SELECT * FROM Product", staging_area_conn)


for row in staging_area_product_data.itertuples():
    id = row.ProductID
    name = row.ProductName
    color = row.Color
    size = row.Size
    size_unit_measure_code = row.SizeUnitMeasureCode
    weight = row.Weight
    weight_unit_measure_code = row.WeightUnitMeasureCode

    discontinued = False

    staging_area_product_data["DiscontinuedDate"] = pd.to_datetime(staging_area_product_data["DiscontinuedDate"])
    current_date = datetime.now()

    if row.Discontinued == 1:
        discontinued = True
    # check if the discontinued date is already happend then set it to true
    elif row.DiscontinuedDate and row.DiscontinuedDate < current_date:
        discontinued = True

    standard_cost = row.StandardCost

    # price: we combine multiple prices into one price
    price = row.ListPrice

    if pd.isnull(price):
        price = row.UnitPrice

    safety_stock_level = row.SafetyStockLevel

    reorder_point = row.ReorderPoint

    if pd.isnull(reorder_point):
        reorder_point = row.ReorderLevel

    product_category_name = None

    if pd.notnull(row.ProductCategoryID):
        result = pd.read_sql_query(
            f"SELECT Name FROM ProductCategory WHERE ProductCategoryID = {row.ProductCategoryID}", staging_area_conn
        )
        if not result.empty:
            product_category_name = result.iloc[0].Name
    elif pd.notnull(row.category):
        product_category_name = row.category

    create_product_row(
        id,
        name,
        color,
        size,
        size_unit_measure_code,
        weight,
        weight_unit_measure_code,
        discontinued,
        standard_cost,
        price,
        safety_stock_level,
        reorder_point,
        product_category_name,
    )

Successfully loaded product 1 into warehouse
Successfully loaded product 2 into warehouse
Successfully loaded product 3 into warehouse
Successfully loaded product 4 into warehouse
Successfully loaded product 5 into warehouse
Successfully loaded product 6 into warehouse
Successfully loaded product 7 into warehouse
Successfully loaded product 8 into warehouse
Successfully loaded product 9 into warehouse
Successfully loaded product 10 into warehouse
Successfully loaded product 11 into warehouse
Successfully loaded product 12 into warehouse
Successfully loaded product 13 into warehouse
Successfully loaded product 14 into warehouse
Successfully loaded product 15 into warehouse
Successfully loaded product 16 into warehouse
Successfully loaded product 17 into warehouse
Successfully loaded product 18 into warehouse
Successfully loaded product 19 into warehouse
Successfully loaded product 20 into warehouse
Successfully loaded product 21 into warehouse
Successfully loaded product 22 into warehou

### Vendor

In [158]:
def create_vendor_row(id, name):
    def handle_nan(value):
        return None if pd.isnull(value) else value

    id = handle_nan(id)
    name = handle_nan(name)

    query = """
    INSERT INTO DimVendor (
        VendorID, Name
    ) VALUES (
        %s, %s
    )
    """

    values = (id, name)

    try:
        warehouse_cursor.execute(query, values)
        print(f"Successfully loaded vendor {id} into warehouse")
    except mysql.connector.Error as err:
        print(values)
        print(f"Failed to load product into warehouse: {err}")

    warehouse_conn.commit()


staging_area_vendor_data = pd.read_sql_query("SELECT * FROM Vendor", staging_area_conn)

for row in staging_area_vendor_data.itertuples():
    create_vendor_row(row.VendorID, row.Name)

Successfully loaded vendor 1492 into warehouse
Successfully loaded vendor 1494 into warehouse
Successfully loaded vendor 1496 into warehouse
Successfully loaded vendor 1498 into warehouse
Successfully loaded vendor 1500 into warehouse
Successfully loaded vendor 1502 into warehouse
Successfully loaded vendor 1504 into warehouse
Successfully loaded vendor 1506 into warehouse
Successfully loaded vendor 1508 into warehouse
Successfully loaded vendor 1510 into warehouse
Successfully loaded vendor 1512 into warehouse
Successfully loaded vendor 1514 into warehouse
Successfully loaded vendor 1516 into warehouse
Successfully loaded vendor 1518 into warehouse
Successfully loaded vendor 1520 into warehouse
Successfully loaded vendor 1522 into warehouse
Successfully loaded vendor 1524 into warehouse
Successfully loaded vendor 1526 into warehouse
Successfully loaded vendor 1528 into warehouse
Successfully loaded vendor 1530 into warehouse
Successfully loaded vendor 1532 into warehouse
Successfully 

### Employee

In [159]:
def create_employee_row(id, first_name, last_name, department_name, department_group_name):
    def handle_nan(value):
        return None if pd.isnull(value) else value

    id = handle_nan(id)
    first_name = handle_nan(first_name)
    last_name = handle_nan(last_name)
    department_name = handle_nan(department_name)
    department_group_name = handle_nan(department_group_name)

    query = """
    INSERT INTO DimEmployee (
        EmployeeID, FirstName, LastName,
        DepartmentName, DepartmentGroupName
    ) VALUES (
        %s, %s, %s, %s, %s
    )
    """

    values = (id, first_name, last_name, department_name, department_group_name)

    try:
        warehouse_cursor.execute(query, values)
        print(f"Successfully loaded employee {id} into warehouse")
    except mysql.connector.Error as err:
        print(values)
        print(f"Failed to load employee into warehouse: {err}")

    warehouse_conn.commit()


staging_area_employee_data = pd.read_sql_query("SELECT * FROM Employee", staging_area_conn)

for row in staging_area_employee_data.itertuples():
    id = row.EmployeeID

    first_name = row.FirstName
    if pd.isnull(first_name):
        # get from Person table
        result = pd.read_sql_query(f"SELECT FirstName FROM Person WHERE PersonID = {row.EmployeeID}", staging_area_conn)
        if not result.empty:
            first_name = result.iloc[0].FirstName

    last_name = row.LastName
    if pd.isnull(last_name):
        # get from Person table
        result = pd.read_sql_query(f"SELECT LastName FROM Person WHERE PersonID = {row.EmployeeID}", staging_area_conn)
        if not result.empty:
            last_name = result.iloc[0].LastName

    department_name = None
    department_group_name = None

    # department
    if not pd.isnull(row.DepartmentID):
        result = pd.read_sql_query(
            f"SELECT Name, GroupName FROM Department WHERE DepartmentID = {row.DepartmentID}", staging_area_conn
        )
        if not result.empty:
            department_name = result.iloc[0].Name
            department_group_name = result.iloc[0].GroupName

    create_employee_row(id, first_name, last_name, department_name, department_group_name)

Successfully loaded employee 1 into warehouse
Successfully loaded employee 2 into warehouse
Successfully loaded employee 3 into warehouse
Successfully loaded employee 4 into warehouse
Successfully loaded employee 5 into warehouse
Successfully loaded employee 6 into warehouse
Successfully loaded employee 7 into warehouse
Successfully loaded employee 8 into warehouse
Successfully loaded employee 9 into warehouse
Successfully loaded employee 10 into warehouse


Successfully loaded employee 11 into warehouse
Successfully loaded employee 12 into warehouse
Successfully loaded employee 13 into warehouse
Successfully loaded employee 14 into warehouse
Successfully loaded employee 15 into warehouse
Successfully loaded employee 16 into warehouse
Successfully loaded employee 17 into warehouse
Successfully loaded employee 18 into warehouse
Successfully loaded employee 19 into warehouse
Successfully loaded employee 20 into warehouse
Successfully loaded employee 21 into warehouse
Successfully loaded employee 22 into warehouse
Successfully loaded employee 23 into warehouse
Successfully loaded employee 24 into warehouse
Successfully loaded employee 25 into warehouse
Successfully loaded employee 26 into warehouse
Successfully loaded employee 27 into warehouse
Successfully loaded employee 28 into warehouse
Successfully loaded employee 29 into warehouse
Successfully loaded employee 30 into warehouse
Successfully loaded employee 31 into warehouse
Successfully 

### Customer

In [160]:
def create_customer_row(id, address, city, zip, state, country, phone, first_name, last_name):
    def handle_nan(value):
        return None if pd.isnull(value) else value

    id = handle_nan(id)
    address = handle_nan(address)
    city = handle_nan(city)
    zip = handle_nan(zip)
    state = handle_nan(state)
    country = handle_nan(country)
    phone = handle_nan(phone)
    first_name = handle_nan(first_name)
    last_name = handle_nan(last_name)

    query = """
    INSERT INTO DimEmployee (
        CustomerID, Address, City, Zip, State,
        Country, Phone, FirstName, LastName
    ) VALUES (
        %s, %s, %s, %s, %s, %s, %s, %s, %s 
    )
    """

    values = (id, address, city, zip, state, country, phone, first_name, last_name)

    try:
        warehouse_cursor.execute(query, values)
        print(f"Successfully loaded customer {id} into warehouse")
    except mysql.connector.Error as err:
        print(values)
        print(f"Failed to load customer into warehouse: {err}")

    warehouse_conn.commit()


staging_area_customer_data = pd.read_sql_query("SELECT * FROM Customer", staging_area_conn)

# the first time we will map the old customer ids to new ones
for row in staging_area_customer_data.itertuples():
    

# the second loop will be performed to put data into the tables
# we will use data from the first loop to give us information
# on what happend to the old ids so the relations will still be intact
for row in staging_area_customer_data.itertuples():
    id = row.CustomerID

    # all different sources use other methods for adresses so this will get messy
    # we leave address, city, zip, state and country for now

    phone = row.Phone

    first_name = row.fname
    last_name = row.lname

    # when first_name is None last_name is also None
    if pd.isnull(first_name) is None and not pd.isnull(row.PersonID):
        # we will get the person and get the first and last name from there
        person = pd.read_sql_query(
            f"SELECT FirstName, LastName FROM Person WHERE PersonID = {row.CustomerID}", staging_area_conn
        )
        if not person.empty:
            first_name = result.iloc[0].FirstName
            last_name = result.iloc[0].LastName

    create_customer_row(id, None, None, None, None, None, phone, first_name, last_name)

('1', None, None, None, None, None, None, None, None)
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('10', None, None, None, None, None, None, None, None)
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('100', None, None, None, None, None, None, None, None)
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('101', None, None, None, None, None, '2015558966', 'Michaels', 'Devlin')
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('102', None, None, None, None, None, '2125558725', 'Beth', 'Reiser')
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('103', None, None, None, None, None, '2155556513', 'Erin', 'Niedringhaus')
Failed to load customer into warehouse: 1054 (42S22): Unknown column 'CustomerID' in 'field list'
('104', None, None, None, N