# Product Category Analysis - Snowpark Connect

This notebook demonstrates DataFrame transformations using **Snowpark Connect**.

## What is Snowpark Connect?

Snowpark Connect allows you to:
- Use **familiar PySpark APIs** (DataFrames, SQL functions, Window functions)
- Execute processing **inside Snowflake** (no data movement)
- **Migrate existing Spark workloads** with minimal code changes

## Key Features Demonstrated:
- Snowpark Connect session initialization
- Reading from Iceberg tables as Spark DataFrames
- DataFrame joins, aggregations, and window functions
- Writing results back to Snowflake

## Running Interactively in a Snowsight Workspace

Snowflake Notebooks in Workspaces run on **Snowpark Container Services**. When you connect a notebook, 
Snowflake creates a managed notebook service that hosts the kernel and executes your code.

### Understanding Compute Resources
- **Compute Pool**: Powers the notebook kernel and Python processes (credits accrue while service is RUNNING)
- **Query Warehouse**: Used for SQL queries and Snowpark pushdown compute (credits accrue only during queries)

For more details, see [Compute setup for Notebooks in Workspaces](https://docs.snowflake.com/en/user-guide/ui-snowsight/notebooks-in-workspaces/notebooks-in-workspaces-compute-setup)

### Prerequisites (created in `sql/04_notebook_deployment.sql`)

The External Access Integration for PyPI is already created:
```sql
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION TLV_BUILD_HOL_PYPI_EAI
    ALLOWED_NETWORK_RULES = (SNOWFLAKE.EXTERNAL_ACCESS.PYPI_RULE)
    ENABLED = TRUE;
```

### Configure the Notebook Service

When connecting (click **Connect** button), configure:
1. **External access integrations**: Select `TLV_BUILD_HOL_PYPI_EAI`
2. **Python version**: Select `Python 3.11`
3. **Compute pool**: Select `SYSTEM_COMPUTE_POOL_CPU` (or your pool)

### Interactive Setup Steps

**Step 1:** Uncomment and run the pip install cell below

**Step 2:** Restart kernel - From the **Connect** button, select **Restart kernel**

**Step 3:** Run the remaining cells

---
**Note:** When running as a scheduled Task (EXECUTE NOTEBOOK PROJECT), dependencies are installed automatically via `requirements.txt`.

In [None]:
# Uncomment to install in interactive mode (then restart kernel)
# pip install snowpark-connect[jdk]

## 1. Initialize Snowpark Connect Session

**Note:** Session initialization uses `snowpark_connect.init_spark_session()` instead of `SparkSession.builder`.

In [None]:
# Initialize Snowpark Connect session
import snowflake.snowpark_connect
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Start the Snowpark Connect server and get Spark session
spark = snowflake.snowpark_connect.server.init_spark_session()

print("Snowpark Connect session initialized successfully!")
print(f"Spark version: {spark.version}")

## 2. Load Iceberg Tables as DataFrames

SCOS can read directly from Snowflake tables (including Iceberg tables) using `spark.table()`.

In [None]:
# Load Iceberg tables - same syntax as standard Spark
orders_df = spark.table("TLV_BUILD_HOL.EXTERNAL_ICEBERG.EXT_ORDERS")
products_df = spark.table("TLV_BUILD_HOL.EXTERNAL_ICEBERG.EXT_PRODUCTS")
customers_df = spark.table("TLV_BUILD_HOL.EXTERNAL_ICEBERG.EXT_CUSTOMERS")

print(f"Orders count: {orders_df.count()}")
print(f"Products count: {products_df.count()}")
print(f"Customers count: {customers_df.count()}")

In [None]:
# Preview schemas
print("Orders Schema:")
orders_df.printSchema()

print("\nProducts Schema:")
products_df.printSchema()

In [None]:
# Preview data
orders_df.show(5, truncate=False)
products_df.show(5, truncate=False)

## 3. Product Category Performance Analysis

Join orders with products and aggregate by category/subcategory.

In [None]:
# Filter completed/shipped orders and join with products
category_performance = (
    orders_df
    .filter(F.col("status").isin(["COMPLETED", "SHIPPED"]))
    .join(products_df, "product_id")
    .groupBy("category", "subcategory")
    .agg(
        F.count("order_id").alias("order_count"),
        F.countDistinct("customer_id").alias("unique_customers"),
        F.sum("quantity").alias("units_sold"),
        F.sum(F.col("quantity") * F.col("unit_price")).alias("gross_revenue"),
        F.sum(
            F.col("quantity") * F.col("unit_price") * (1 - F.col("discount_pct") / 100)
        ).alias("net_revenue"),
        F.sum(F.col("quantity") * F.col("cost_price")).alias("total_cost"),
        F.avg("quantity").alias("avg_quantity_per_order")
    )
)

print("Category Performance:")
category_performance.orderBy(F.desc("net_revenue")).show(10)

## 4. Calculate Profit Margins and Rankings

Add derived columns and use **Window functions** for ranking.

In [None]:
# Define windows for ranking
window_revenue = Window.orderBy(F.desc("net_revenue"))
window_category = Window.partitionBy("category").orderBy(F.desc("net_revenue"))

# Add profit calculations and rankings
product_analysis = (
    category_performance
    .withColumn("gross_profit", F.col("net_revenue") - F.col("total_cost"))
    .withColumn(
        "profit_margin_pct",
        F.round(F.col("gross_profit") / F.col("net_revenue") * 100, 2)
    )
    .withColumn(
        "revenue_per_customer",
        F.round(F.col("net_revenue") / F.col("unique_customers"), 2)
    )
    .withColumn("overall_revenue_rank", F.dense_rank().over(window_revenue))
    .withColumn("category_revenue_rank", F.dense_rank().over(window_category))
)

print("Product Analysis with Rankings:")
product_analysis.orderBy(F.desc("net_revenue")).show(15)

## 5. Category Summary Statistics

In [None]:
# Aggregate to category level
category_summary = (
    product_analysis
    .groupBy("category")
    .agg(
        F.count("subcategory").alias("subcategory_count"),
        F.sum("order_count").alias("total_orders"),
        F.sum("unique_customers").alias("total_customers"),
        F.sum("units_sold").alias("total_units"),
        F.round(F.sum("net_revenue"), 2).alias("total_revenue"),
        F.round(F.sum("gross_profit"), 2).alias("total_profit"),
        F.round(
            F.sum("gross_profit") / F.sum("net_revenue") * 100, 2
        ).alias("avg_profit_margin")
    )
    .orderBy(F.desc("total_revenue"))
)

print("Category Summary:")
category_summary.show()

## 6. Top Performing Products

In [None]:
# Product-level analysis
top_products = (
    orders_df
    .filter(F.col("status").isin(["COMPLETED", "SHIPPED"]))
    .join(products_df, "product_id")
    .groupBy("product_id", "product_name", "category", "subcategory")
    .agg(
        F.count("order_id").alias("times_ordered"),
        F.sum("quantity").alias("units_sold"),
        F.round(F.sum(F.col("quantity") * F.col("unit_price")), 2).alias("total_revenue"),
        F.round(F.avg("quantity"), 2).alias("avg_quantity_per_order")
    )
    .withColumn("rank", F.dense_rank().over(Window.orderBy(F.desc("total_revenue"))))
    .filter(F.col("rank") <= 10)
    .orderBy("rank")
)

print("Top 10 Products by Revenue:")
top_products.show(truncate=False)

## 7. Save Results to Snowflake Table

Write the analysis results back to Snowflake using `saveAsTable()`.

In [None]:
# Prepare final output
final_analysis = (
    product_analysis
    .select(
        "category",
        "subcategory",
        "order_count",
        "unique_customers",
        "units_sold",
        F.round("gross_revenue", 2).alias("gross_revenue"),
        F.round("net_revenue", 2).alias("net_revenue"),
        F.round("total_cost", 2).alias("total_cost"),
        F.round("gross_profit", 2).alias("gross_profit"),
        "profit_margin_pct",
        "revenue_per_customer",
        "overall_revenue_rank",
        "category_revenue_rank",
        F.current_timestamp().alias("analysis_timestamp")
    )
)

# Write to Snowflake table
final_analysis.write.mode("overwrite").saveAsTable(
    "TLV_BUILD_HOL.DATA_ENG_DEMO.PRODUCT_CATEGORY_ANALYSIS"
)

print("Results saved to PRODUCT_CATEGORY_ANALYSIS table!")

## 8. Verify Results

In [None]:
# Verify saved data
result_df = spark.table("TLV_BUILD_HOL.DATA_ENG_DEMO.PRODUCT_CATEGORY_ANALYSIS")
print(f"Rows written: {result_df.count()}")
result_df.orderBy("overall_revenue_rank").show(10)

## Summary

### What We Demonstrated:

| Feature | Standard Spark | Snowpark Connect |
|---------|---------------|-------------------------|
| Session Init | `SparkSession.builder...` | `snowpark_connect.init_spark_session()` |
| Read Tables | `spark.read.format("iceberg")...` | `spark.table("DB.SCHEMA.TABLE")` |
| DataFrame Ops | Same | Same |
| Window Functions | Same | Same |
| Write Tables | `df.write.format("iceberg")...` | `df.write.saveAsTable(...)` |

### Key Benefits of Snowpark Connect:

1. **No Data Movement** - Processing happens inside Snowflake
2. **Familiar APIs** - Standard PySpark syntax works
3. **Unified Governance** - Same security/access controls as SQL
4. **Easy Migration** - Existing Spark code works with minimal changes

### Output:
- `PRODUCT_CATEGORY_ANALYSIS` table with category/subcategory metrics
- Ready for dashboards, reporting, or downstream pipelines