### **AdventureWorks2022 Project**


### Section 1: Preview and Schema Inspection

In [None]:
# Path to folder containing raw files in Lakehouse
folder_path = "Files/raw/"

# List of files to inspect
files = [
    "AdventureWorks Customer Lookup.csv",
    "AdventureWorks Product Lookup.csv",
    "AdventureWorks Calendar Lookup.csv",
    "AdventureWorks Product Categories Lookup.csv",
    "AdventureWorks Product Subcategories Lookup.csv",
    "AdventureWorks Territory Lookup.csv",
    "AdventureWorks Returns Lookup.csv",
    "AdventureWorks Sales Data 2020.csv",
    "AdventureWorks Sales Data 2021.csv",
    "AdventureWorks Sales Data 2022.csv"
]

# Display schema and sample rows for each file
for file in files:
    print(f"\n📂 Inspecting file: {file}")
    df = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(folder_path + file)
    )
    df.printSchema()
    df.show(5, truncate=False)


 ### Section 2: ETL – Load and Transform to Tables

In [None]:
from pyspark.sql.functions import col

# Path to folder containing raw files
folder_path = "Files/raw/"

# Mapping of file names to destination Lakehouse tables
files_and_tables = {
    "AdventureWorks Customer Lookup.csv": "Dim_Customer",
    "AdventureWorks Product Lookup.csv": "Dim_Product",
    "AdventureWorks Calendar Lookup.csv": "Dim_Calendar",
    "AdventureWorks Product Categories Lookup.csv": "Dim_ProductCategory",
    "AdventureWorks Product Subcategories Lookup.csv": "Dim_ProductSubcategory",
    "AdventureWorks Territory Lookup.csv": "Dim_Territory",
    "AdventureWorks Returns Lookup.csv": "Fact_Returns",
    "AdventureWorks Sales Data 2020.csv": "Stg_Sales_2020",
    "AdventureWorks Sales Data 2021.csv": "Stg_Sales_2021",
    "AdventureWorks Sales Data 2022.csv": "Stg_Sales_2022"
}

# Columns to cast to string (by file)
columns_to_string = {
    "AdventureWorks Customer Lookup.csv": ["CustomerKey"],
    "AdventureWorks Product Lookup.csv": ["ProductKey", "ProductSubcategoryKey"],
    "AdventureWorks Returns Lookup.csv": ["TerritoryKey", "ProductKey"],
    "AdventureWorks Product Categories Lookup.csv": ["ProductCategoryKey"],
    "AdventureWorks Product Subcategories Lookup.csv": ["ProductSubcategoryKey", "ProductCategoryKey"],
    "AdventureWorks Territory Lookup.csv": ["SalesTerritoryKey"]
}

# Process each file
for file_name, table_name in files_and_tables.items():
    print(f"\n📥 Processing file: {file_name} → saving as table: {table_name}")

    df = (
        spark.read
        .option("header", "true")
        .option("inferSchema", "true")
        .csv(folder_path + file_name)
    )

    # Cast specified columns to string
    if file_name in columns_to_string:
        for col_name in columns_to_string[file_name]:
            df = df.withColumn(col_name, col(col_name).cast("string"))
        print(f"🔤 Converted keys to string: {columns_to_string[file_name]}")

    # Use overwrite for sales data, append for others
    mode = "overwrite" if "Sales" in file_name else "append"

    df.write.mode(mode).saveAsTable(table_name)
    print(f"✅ Table saved: {table_name} (mode = {mode})")


 ### Section 3: Add FullName to Dim_Customer

In [None]:
from pyspark.sql.functions import concat_ws, col

# Load customer file
file_path = "Files/raw/AdventureWorks Customer Lookup.csv"
df = spark.read.option("header", True).csv(file_path)

# Filter out rows with null CustomerKey
df_cleaned = df.filter(col("CustomerKey").isNotNull())

# Add FullName column
df_with_fullname = df_cleaned.withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName")))

# Overwrite existing table
df_with_fullname.write.mode("overwrite").option("overwriteSchema", "true").saveAsTable("Dim_Customer")
print("✅ Dim_Customer updated with FullName column.")


✅ Summary of Actions Performed in the Notebook

| Step | Description                                                              | Tool/Technique Used                            |
| ---- | ------------------------------------------------------------------------ | ---------------------------------------------- |
| 1️⃣  | Inspected the structure and schema of raw CSV files                      | PySpark `DataFrame.printSchema()` and `show()` |
| 2️⃣  | Loaded raw CSV files into Lakehouse                                      | PySpark `read.csv()`                           |
| 3️⃣  | Casted key columns to string data type where needed                      | `withColumn(...cast("string"))`                |
| 4️⃣  | Saved transformed data to Lakehouse tables using appropriate write modes | `write.mode(...).saveAsTable()`                |
| 5️⃣  | Cleaned customer data and added `FullName` column                        | `filter()`, `concat_ws()`                      |
| 6️⃣  | Overwrote the existing `Dim_Customer` table with enriched data           | `write.mode("overwrite").saveAsTable()`        |
