# Data Ingestion - Create Delta Tables (Primary Workspace)

**Workspace**: Data Engineering (Primary)  
**Purpose**: Ingest sample data and create Delta tables in Unity Catalog  
**Catalog**: `shared_data`  
**Schema**: `samples`

This notebook demonstrates:
- Creating sample datasets
- Writing data to Delta tables in Unity Catalog
- Setting up data that can be accessed from other workspaces

## 1. Setup and Configuration

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import datetime, timedelta
import random

# Display Spark and Delta Lake versions
print(f"Spark Version: {spark.version}")
print(f"Delta Lake Version: {spark.conf.get('spark.databricks.delta.version', 'Not available')}")

## 2. Verify Unity Catalog Configuration

In [None]:
# Check current catalog
current_catalog = spark.sql("SELECT current_catalog()").collect()[0][0]
print(f"Current Catalog: {current_catalog}")

# List available catalogs
print("\nAvailable Catalogs:")
display(spark.sql("SHOW CATALOGS"))

In [None]:
# Set the working catalog and schema
spark.sql("USE CATALOG shared_data")
spark.sql("USE SCHEMA samples")

print("Using catalog: shared_data")
print("Using schema: samples")

## 3. Create Sample Customer Data

In [None]:
# Generate sample customer data
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType

# Define schema
customer_schema = StructType([
    StructField("customer_id", IntegerType(), False),
    StructField("customer_name", StringType(), False),
    StructField("email", StringType(), True),
    StructField("country", StringType(), True),
    StructField("signup_date", DateType(), True),
    StructField("total_purchases", IntegerType(), True),
    StructField("lifetime_value", DoubleType(), True)
])

# Sample data
customer_data = [
    (1, "Alice Johnson", "alice@example.com", "USA", datetime(2023, 1, 15).date(), 45, 12500.50),
    (2, "Bob Smith", "bob@example.com", "Canada", datetime(2023, 2, 20).date(), 32, 8900.75),
    (3, "Charlie Brown", "charlie@example.com", "UK", datetime(2023, 3, 10).date(), 67, 22340.00),
    (4, "Diana Prince", "diana@example.com", "Australia", datetime(2023, 4, 5).date(), 89, 31200.25),
    (5, "Eve Wilson", "eve@example.com", "USA", datetime(2023, 5, 12).date(), 23, 5600.50),
    (6, "Frank Miller", "frank@example.com", "Germany", datetime(2023, 6, 18).date(), 56, 15800.00),
    (7, "Grace Lee", "grace@example.com", "Singapore", datetime(2023, 7, 25).date(), 78, 28900.75),
    (8, "Henry Davis", "henry@example.com", "USA", datetime(2023, 8, 30).date(), 41, 11200.00),
    (9, "Ivy Chen", "ivy@example.com", "China", datetime(2023, 9, 14).date(), 93, 35600.50),
    (10, "Jack Robinson", "jack@example.com", "Canada", datetime(2023, 10, 8).date(), 28, 7100.25)
]

# Create DataFrame
customers_df = spark.createDataFrame(customer_data, schema=customer_schema)

print(f"Created {customers_df.count()} customer records")
display(customers_df)

## 4. Write Customer Data to Delta Table

In [None]:
# Write to Delta table in Unity Catalog
table_name = "shared_data.samples.customers"

customers_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_name)

print(f"âœ… Successfully created Delta table: {table_name}")

## 5. Create Sample Product Data

In [None]:
# Generate sample product data
product_schema = StructType([
    StructField("product_id", IntegerType(), False),
    StructField("product_name", StringType(), False),
    StructField("category", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("stock_quantity", IntegerType(), True),
    StructField("supplier", StringType(), True)
])

product_data = [
    (101, "Laptop Pro 15", "Electronics", 1299.99, 45, "TechCorp"),
    (102, "Wireless Mouse", "Accessories", 29.99, 230, "Peripherals Inc"),
    (103, "USB-C Hub", "Accessories", 49.99, 156, "ConnectTech"),
    (104, "Monitor 27-inch", "Electronics", 399.99, 67, "DisplayMax"),
    (105, "Keyboard Mechanical", "Accessories", 129.99, 89, "KeyMasters"),
    (106, "Webcam HD", "Electronics", 79.99, 134, "VisionTech"),
    (107, "Headphones Wireless", "Audio", 199.99, 78, "SoundWave"),
    (108, "Desk Lamp LED", "Office", 39.99, 201, "LightCo"),
    (109, "External SSD 1TB", "Storage", 149.99, 112, "DataVault"),
    (110, "Laptop Stand", "Accessories", 59.99, 176, "ErgoTech")
]

products_df = spark.createDataFrame(product_data, schema=product_schema)

print(f"Created {products_df.count()} product records")
display(products_df)

In [None]:
# Write to Delta table
table_name = "shared_data.samples.products"

products_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(table_name)

print(f"âœ… Successfully created Delta table: {table_name}")

## 6. Create Sample Transaction Data

In [None]:
# Generate sample transaction data
transaction_schema = StructType([
    StructField("transaction_id", IntegerType(), False),
    StructField("customer_id", IntegerType(), False),
    StructField("product_id", IntegerType(), False),
    StructField("quantity", IntegerType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("transaction_date", DateType(), True),
    StructField("payment_method", StringType(), True)
])

transaction_data = [
    (1001, 1, 101, 1, 1299.99, datetime(2024, 1, 15).date(), "Credit Card"),
    (1002, 1, 102, 2, 59.98, datetime(2024, 1, 16).date(), "Credit Card"),
    (1003, 2, 104, 1, 399.99, datetime(2024, 1, 20).date(), "PayPal"),
    (1004, 3, 105, 1, 129.99, datetime(2024, 2, 5).date(), "Credit Card"),
    (1005, 3, 106, 1, 79.99, datetime(2024, 2, 5).date(), "Credit Card"),
    (1006, 4, 107, 2, 399.98, datetime(2024, 2, 12).date(), "Debit Card"),
    (1007, 5, 108, 3, 119.97, datetime(2024, 3, 1).date(), "PayPal"),
    (1008, 6, 109, 1, 149.99, datetime(2024, 3, 8).date(), "Credit Card"),
    (1009, 7, 110, 2, 119.98, datetime(2024, 3, 15).date(), "Credit Card"),
    (1010, 8, 101, 1, 1299.99, datetime(2024, 3, 22).date(), "Debit Card"),
    (1011, 9, 103, 3, 149.97, datetime(2024, 4, 1).date(), "PayPal"),
    (1012, 10, 102, 5, 149.95, datetime(2024, 4, 5).date(), "Credit Card")
]

transactions_df = spark.createDataFrame(transaction_data, schema=transaction_schema)

print(f"Created {transactions_df.count()} transaction records")
display(transactions_df)

In [None]:
# Write to Delta table with partitioning
table_name = "shared_data.samples.transactions"

transactions_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("transaction_date") \
    .saveAsTable(table_name)

print(f"âœ… Successfully created Delta table: {table_name}")
print("   Partitioned by: transaction_date")

## 7. Verify Created Tables

In [None]:
# List all tables in the schema
print("Tables in shared_data.samples schema:")
display(spark.sql("SHOW TABLES IN shared_data.samples"))

In [None]:
# Show table details
tables = ["customers", "products", "transactions"]

for table in tables:
    full_table_name = f"shared_data.samples.{table}"
    count = spark.table(full_table_name).count()
    print(f"\nðŸ“Š Table: {full_table_name}")
    print(f"   Record Count: {count}")
    print(f"   Schema:")
    spark.table(full_table_name).printSchema()

## 8. Test Unity Catalog Lineage

In [None]:
# Create a view that joins tables (demonstrates lineage)
spark.sql("""
CREATE OR REPLACE VIEW shared_data.samples.customer_transactions AS
SELECT 
    c.customer_id,
    c.customer_name,
    c.country,
    t.transaction_id,
    t.product_id,
    p.product_name,
    p.category,
    t.quantity,
    t.total_amount,
    t.transaction_date,
    t.payment_method
FROM shared_data.samples.customers c
JOIN shared_data.samples.transactions t ON c.customer_id = t.customer_id
JOIN shared_data.samples.products p ON t.product_id = p.product_id
""")

print("âœ… Created view: shared_data.samples.customer_transactions")
print("   This view demonstrates Unity Catalog lineage tracking")

In [None]:
# Query the view
display(spark.sql("SELECT * FROM shared_data.samples.customer_transactions ORDER BY transaction_date DESC"))

## 9. Summary

### Created Resources:
- âœ… **shared_data.samples.customers** - Customer master data (10 records)
- âœ… **shared_data.samples.products** - Product catalog (10 records)
- âœ… **shared_data.samples.transactions** - Transaction history (12 records, partitioned)
- âœ… **shared_data.samples.customer_transactions** - Joined view

### Key Features Demonstrated:
- Delta Lake table creation in Unity Catalog
- Table partitioning for performance
- Cross-table views with lineage tracking
- Three-level namespace (catalog.schema.table)

### Next Steps:
- These tables are now accessible from **any workspace** with the same Unity Catalog metastore
- Run the **cross-workspace access notebook** in the Analytics workspace to query this data
- Set up fine-grained permissions using Unity Catalog grants