In [None]:
import pyodbc
import pandas as pd
import psycopg2
import numpy as np

# Helper function to clean DataFrame for PostgreSQL
def clean_dataframe(df):
    """Replace NaN/NaT with None for proper NULL handling in PostgreSQL"""
    return df.where(pd.notna(df), None)

# -------------------------------
# Connect to SQL Server
# -------------------------------
server = 'DESKTOP-RNHBP1T'
database = 'Northwind_OLTP'
cnxn = pyodbc.connect(
    f'DRIVER={{ODBC Driver 17 for SQL Server}};SERVER={server};DATABASE={database};Trusted_Connection=yes;'
)

# -------------------------------
# Connect to PostgreSQL
# -------------------------------
connpg = psycopg2.connect(
    database="Northwind-OLAP",
    user="majed",
    password="majed123",
    host="localhost",
    port="5432"
)
cursor = connpg.cursor()

# Create schema if not exists
try:
    cursor.execute("CREATE SCHEMA IF NOT EXISTS bronze;")
    connpg.commit()
except Exception as e:
    print(f"Error creating schema: {e}")
    connpg.rollback()

print("Starting data migration...\n")

# -------------------------------
# 1. Categories
# -------------------------------
print("Migrating Categories...")
try:
    df_categories = pd.read_sql_query("SELECT * FROM dbo.Categories", cnxn)
    
    create_categories = """
    CREATE TABLE IF NOT EXISTS bronze.Categories (
        CategoryID INT PRIMARY KEY,
        CategoryName VARCHAR(100),
        Description TEXT,
        Picture BYTEA
    );
    """
    cursor.execute(create_categories)
    connpg.commit()
    
    if not df_categories.empty:
        df_clean = clean_dataframe(df_categories)
        sql = "INSERT INTO bronze.Categories(CategoryID, CategoryName, Description, Picture) VALUES (%s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Categories: {len(df_categories)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Categories: {e}\n")
    connpg.rollback()

# -------------------------------
# 2. Customers
# -------------------------------
print("Migrating Customers...")
try:
    df_customers = pd.read_sql_query("SELECT * FROM dbo.Customers", cnxn)
    
    create_customers = """
    CREATE TABLE IF NOT EXISTS bronze.Customers (
        CustomerID VARCHAR(10) PRIMARY KEY,
        CompanyName VARCHAR(100),
        ContactName VARCHAR(100),
        ContactTitle VARCHAR(100),
        Address VARCHAR(100),
        City VARCHAR(50),
        Region VARCHAR(50),
        PostalCode VARCHAR(20),
        Country VARCHAR(50),
        Phone VARCHAR(30),
        Fax VARCHAR(30)
    );
    """
    cursor.execute(create_customers)
    connpg.commit()
    
    if not df_customers.empty:
        df_clean = clean_dataframe(df_customers)
        sql = "INSERT INTO bronze.Customers(CustomerID, CompanyName, ContactName, ContactTitle, Address, City, Region, PostalCode, Country, Phone, Fax) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Customers: {len(df_customers)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Customers: {e}\n")
    connpg.rollback()

# -------------------------------
# 3. CustomerDemographics
# -------------------------------
print("Migrating CustomerDemographics...")
try:
    df_customer_demographics = pd.read_sql_query("SELECT * FROM dbo.CustomerDemographics", cnxn)
    
    create_customer_demographics = """
    CREATE TABLE IF NOT EXISTS bronze.CustomerDemographics (
        CustomerTypeID VARCHAR(20) PRIMARY KEY,
        CustomerDesc TEXT
    );
    """
    cursor.execute(create_customer_demographics)
    connpg.commit()
    
    if not df_customer_demographics.empty:
        df_clean = clean_dataframe(df_customer_demographics)
        sql = "INSERT INTO bronze.CustomerDemographics(CustomerTypeID, CustomerDesc) VALUES (%s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ CustomerDemographics: {len(df_customer_demographics)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating CustomerDemographics: {e}\n")
    connpg.rollback()

# -------------------------------
# 4. CustomerCustomerDemo
# -------------------------------
print("Migrating CustomerCustomerDemo...")
try:
    df_customer_customer_demo = pd.read_sql_query("SELECT * FROM dbo.CustomerCustomerDemo", cnxn)
    
    create_customer_customer_demo = """
    CREATE TABLE IF NOT EXISTS bronze.CustomerCustomerDemo (
        CustomerID VARCHAR(10),
        CustomerTypeID VARCHAR(20),
        PRIMARY KEY (CustomerID, CustomerTypeID)
    );
    """
    cursor.execute(create_customer_customer_demo)
    connpg.commit()
    
    if not df_customer_customer_demo.empty:
        df_clean = clean_dataframe(df_customer_customer_demo)
        sql = "INSERT INTO bronze.CustomerCustomerDemo(CustomerID, CustomerTypeID) VALUES (%s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ CustomerCustomerDemo: {len(df_customer_customer_demo)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating CustomerCustomerDemo: {e}\n")
    connpg.rollback()

# -------------------------------
# 5. Region
# -------------------------------
print("Migrating Region...")
try:
    df_region = pd.read_sql_query("SELECT * FROM dbo.Region", cnxn)
    
    create_region = """
    CREATE TABLE IF NOT EXISTS bronze.Region (
        RegionID INT PRIMARY KEY,
        RegionDescription VARCHAR(100)
    );
    """
    cursor.execute(create_region)
    connpg.commit()
    
    if not df_region.empty:
        df_clean = clean_dataframe(df_region)
        sql = "INSERT INTO bronze.Region(RegionID, RegionDescription) VALUES (%s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Region: {len(df_region)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Region: {e}\n")
    connpg.rollback()

# -------------------------------
# 6. Territories
# -------------------------------
print("Migrating Territories...")
try:
    df_territories = pd.read_sql_query("SELECT * FROM dbo.Territories", cnxn)
    
    create_territories = """
    CREATE TABLE IF NOT EXISTS bronze.Territories (
        TerritoryID VARCHAR(20) PRIMARY KEY,
        TerritoryDescription VARCHAR(100),
        RegionID INT
    );
    """
    cursor.execute(create_territories)
    connpg.commit()
    
    if not df_territories.empty:
        df_clean = clean_dataframe(df_territories)
        sql = "INSERT INTO bronze.Territories(TerritoryID, TerritoryDescription, RegionID) VALUES (%s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Territories: {len(df_territories)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Territories: {e}\n")
    connpg.rollback()

# -------------------------------
# 7. Employees
# -------------------------------
print("Migrating Employees...")
try:
    df_employees = pd.read_sql_query("SELECT * FROM dbo.Employees", cnxn)
    
    create_employees = """
    CREATE TABLE IF NOT EXISTS bronze.Employees (
        EmployeeID INT PRIMARY KEY,
        LastName VARCHAR(50),
        FirstName VARCHAR(50),
        Title VARCHAR(100),
        TitleOfCourtesy VARCHAR(25),
        BirthDate DATE,
        HireDate DATE,
        Address VARCHAR(100),
        City VARCHAR(50),
        Region VARCHAR(50),
        PostalCode VARCHAR(20),
        Country VARCHAR(50),
        HomePhone VARCHAR(30),
        Extension VARCHAR(10),
        Photo BYTEA,
        Notes TEXT,
        ReportsTo INT,
        PhotoPath VARCHAR(255)
    );
    """
    cursor.execute(create_employees)
    connpg.commit()
    
    if not df_employees.empty:
        df_clean = clean_dataframe(df_employees)
        sql = "INSERT INTO bronze.Employees(EmployeeID, LastName, FirstName, Title, TitleOfCourtesy, BirthDate, HireDate, Address, City, Region, PostalCode, Country, HomePhone, Extension, Photo, Notes, ReportsTo, PhotoPath) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Employees: {len(df_employees)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Employees: {e}\n")
    connpg.rollback()

# -------------------------------
# 8. EmployeeTerritories
# -------------------------------
print("Migrating EmployeeTerritories...")
try:
    df_employee_territories = pd.read_sql_query("SELECT * FROM dbo.EmployeeTerritories", cnxn)
    
    create_employee_territories = """
    CREATE TABLE IF NOT EXISTS bronze.EmployeeTerritories (
        EmployeeID INT,
        TerritoryID VARCHAR(20),
        PRIMARY KEY (EmployeeID, TerritoryID)
    );
    """
    cursor.execute(create_employee_territories)
    connpg.commit()
    
    if not df_employee_territories.empty:
        df_clean = clean_dataframe(df_employee_territories)
        sql = "INSERT INTO bronze.EmployeeTerritories(EmployeeID, TerritoryID) VALUES (%s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ EmployeeTerritories: {len(df_employee_territories)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating EmployeeTerritories: {e}\n")
    connpg.rollback()

# -------------------------------
# 9. Shippers
# -------------------------------
print("Migrating Shippers...")
try:
    df_shippers = pd.read_sql_query("SELECT * FROM dbo.Shippers", cnxn)
    
    create_shippers = """
    CREATE TABLE IF NOT EXISTS bronze.Shippers (
        ShipperID INT PRIMARY KEY,
        CompanyName VARCHAR(100),
        Phone VARCHAR(30)
    );
    """
    cursor.execute(create_shippers)
    connpg.commit()
    
    if not df_shippers.empty:
        df_clean = clean_dataframe(df_shippers)
        sql = "INSERT INTO bronze.Shippers(ShipperID, CompanyName, Phone) VALUES (%s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Shippers: {len(df_shippers)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Shippers: {e}\n")
    connpg.rollback()

# -------------------------------
# 10. Suppliers
# -------------------------------
print("Migrating Suppliers...")
try:
    df_suppliers = pd.read_sql_query("SELECT * FROM dbo.Suppliers", cnxn)
    
    create_suppliers = """
    CREATE TABLE IF NOT EXISTS bronze.Suppliers (
        SupplierID INT PRIMARY KEY,
        CompanyName VARCHAR(100),
        ContactName VARCHAR(100),
        ContactTitle VARCHAR(100),
        Address VARCHAR(100),
        City VARCHAR(50),
        Region VARCHAR(50),
        PostalCode VARCHAR(20),
        Country VARCHAR(50),
        Phone VARCHAR(30),
        Fax VARCHAR(30),
        HomePage TEXT
    );
    """
    cursor.execute(create_suppliers)
    connpg.commit()
    
    if not df_suppliers.empty:
        df_clean = clean_dataframe(df_suppliers)
        sql = "INSERT INTO bronze.Suppliers(SupplierID, CompanyName, ContactName, ContactTitle, Address, City, Region, PostalCode, Country, Phone, Fax, HomePage) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Suppliers: {len(df_suppliers)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Suppliers: {e}\n")
    connpg.rollback()

# -------------------------------
# 11. Products
# -------------------------------
print("Migrating Products...")
try:
    df_products = pd.read_sql_query("SELECT * FROM dbo.Products", cnxn)
    
    create_products = """
    CREATE TABLE IF NOT EXISTS bronze.Products (
        ProductID INT PRIMARY KEY,
        ProductName VARCHAR(100),
        SupplierID INT,
        CategoryID INT,
        QuantityPerUnit VARCHAR(50),
        UnitPrice DECIMAL(10, 2),
        UnitsInStock INT,
        UnitsOnOrder INT,
        ReorderLevel INT,
        Discontinued BOOLEAN
    );
    """
    cursor.execute(create_products)
    connpg.commit()
    
    if not df_products.empty:
        df_clean = clean_dataframe(df_products)
        sql = "INSERT INTO bronze.Products(ProductID, ProductName, SupplierID, CategoryID, QuantityPerUnit, UnitPrice, UnitsInStock, UnitsOnOrder, ReorderLevel, Discontinued) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Products: {len(df_products)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Products: {e}\n")
    connpg.rollback()

# -------------------------------
# 12. Orders
# -------------------------------
print("Migrating Orders...")
try:
    df_orders = pd.read_sql_query("SELECT * FROM dbo.Orders", cnxn)
    
    create_orders = """
    CREATE TABLE IF NOT EXISTS bronze.Orders (
        OrderID INT PRIMARY KEY,
        CustomerID VARCHAR(10),
        EmployeeID INT,
        OrderDate DATE,
        RequiredDate DATE,
        ShippedDate DATE,
        ShipVia INT,
        Freight DECIMAL(10, 2),
        ShipName VARCHAR(100),
        ShipAddress VARCHAR(100),
        ShipCity VARCHAR(50),
        ShipRegion VARCHAR(50),
        ShipPostalCode VARCHAR(20),
        ShipCountry VARCHAR(50)
    );
    """
    cursor.execute(create_orders)
    connpg.commit()
    
    if not df_orders.empty:
        df_clean = clean_dataframe(df_orders)
        sql = "INSERT INTO bronze.Orders(OrderID, CustomerID, EmployeeID, OrderDate, RequiredDate, ShippedDate, ShipVia, Freight, ShipName, ShipAddress, ShipCity, ShipRegion, ShipPostalCode, ShipCountry) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Orders: {len(df_orders)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Orders: {e}\n")
    connpg.rollback()

# -------------------------------
# 13. Order Details
# -------------------------------
print("Migrating Order Details...")
try:
    df_order_details = pd.read_sql_query("SELECT * FROM dbo.[Order Details]", cnxn)
    
    create_order_details = """
    CREATE TABLE IF NOT EXISTS bronze.OrderDetails (
        OrderID INT,
        ProductID INT,
        UnitPrice DECIMAL(10, 2),
        Quantity INT,
        Discount REAL,
        PRIMARY KEY (OrderID, ProductID)
    );
    """
    cursor.execute(create_order_details)
    connpg.commit()
    
    if not df_order_details.empty:
        df_clean = clean_dataframe(df_order_details)
        sql = "INSERT INTO bronze.OrderDetails(OrderID, ProductID, UnitPrice, Quantity, Discount) VALUES (%s, %s, %s, %s, %s)"
        cursor.executemany(sql, df_clean.values.tolist())
        connpg.commit()
    print(f"✓ Order Details: {len(df_order_details)} rows inserted\n")
except Exception as e:
    print(f"✗ Error migrating Order Details: {e}\n")
    connpg.rollback()

# -------------------------------
# Close connections
# -------------------------------
cnxn.close()
connpg.close()

print("=" * 50)
print("✓ Migration completed!")
print("=" * 50)