In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
import random

schema = StructType([
    StructField("policy_id", IntegerType(), True),
    StructField("customer_id", IntegerType(), True),
    StructField("customer_name", StringType(), True),
    StructField("policy_type", StringType(), True),
    StructField("premium_amount", DoubleType(), True),
    StructField("start_date", StringType(), True),
    StructField("end_date", StringType(), True),
    StructField("claim_count", IntegerType(), True),
    StructField("last_claim_date", StringType(), True),
    StructField("agent_id", IntegerType(), True),
    StructField("region", StringType(), True)
])

# Sample insurance data
data = [
    (1001, 501, "Alice Smith", "Health", 1200.50, "2023-01-01", "2024-01-01", 2, "2023-11-15", 301, "East"),
    (1002, 502, "Bob Johnson", "Auto", 800.00, "2022-06-15", "2023-06-15", 1, "2023-05-20", 302, "West"),
    (1003, 503, "Carol Lee", "Home", 950.75, "2023-03-10", "2024-03-10", 0, None, 303, "North"),
    (1004, 504, "David Kim", "Life", 1500.00, "2021-09-01", "2026-09-01", 3, "2024-02-10", 304, "South"),
    (1005, 505, "Eva Brown", "Health", 1100.25, "2022-12-01", "2023-12-01", 1, "2023-08-05", 305, "East"),
    (1006, 506, "Frank White", "Auto", 700.00, "2023-05-20", "2024-05-20", 0, None, 306, "West"),
    (1007, 507, "Grace Green", "Home", 980.00, "2022-07-15", "2023-07-15", 2, "2023-06-30", 307, "North"),
    (1008, 508, "Henry Black", "Life", 1600.00, "2020-10-01", "2025-10-01", 4, "2024-03-12", 308, "South"),
    (1009, 509, "Ivy King", "Health", 1250.00, "2023-02-01", "2024-02-01", 1, "2023-09-18", 309, "East"),
    (1010, 510, "Jack Young", "Auto", 850.00, "2022-11-10", "2023-11-10", 2, "2023-10-25", 310, "West")
]

# Add 100 more rows
policy_types = ["Health", "Auto", "Home", "Life"]
regions = ["East", "West", "North", "South"]
names = ["Alex", "Jordan", "Taylor", "Morgan", "Casey", "Riley", "Jamie", "Drew", "Robin", "Skyler"]

for i in range(1011, 1111):
    customer_id = 500 + i
    customer_name = f"{random.choice(names)} {random.choice(['Smith', 'Johnson', 'Lee', 'Kim', 'Brown', 'White', 'Green', 'Black', 'King', 'Young'])}"
    policy_type = random.choice(policy_types)
    premium_amount = round(random.uniform(700, 1700), 2)
    start_date = f"2023-{random.randint(1,12):02d}-{random.randint(1,28):02d}"
    end_date = f"2024-{random.randint(1,12):02d}-{random.randint(1,28):02d}"
    claim_count = random.randint(0, 4)
    last_claim_date = None if claim_count == 0 else f"2023-{random.randint(1,12):02d}-{random.randint(1,28):02d}"
    agent_id = 300 + random.randint(1, 20)
    region = random.choice(regions)
    data.append((i, customer_id, customer_name, policy_type, premium_amount, start_date, end_date, claim_count, last_claim_date, agent_id, region))

# Create DataFrame
insurance_bronze_df = spark.createDataFrame(data, schema)

# Save as bronze table
insurance_bronze_df.write.mode("overwrite").saveAsTable("insurance_bronze")

print("Bronze layer 'insurance_bronze' table created successfully")

display(spark.table("insurance_bronze"))

In [0]:
from pyspark.sql.functions import col, trim

# Read from bronze table
bronze_df = spark.table("insurance_bronze")

# Remove rows with any null values
clean_df = bronze_df.dropna(how="any")

# Remove duplicate rows
clean_df = clean_df.dropDuplicates()

# Cast all date columns to string and trim whitespaces in string columns
columns_to_trim = ["customer_name", "policy_type", "region"]
for c in columns_to_trim:
    clean_df = clean_df.withColumn(c, trim(col(c)))

date_columns = ["start_date", "end_date", "last_claim_date"]
for d in date_columns:
    clean_df = clean_df.withColumn(d, col(d).cast("string"))

# Save as silver table
clean_df.write.mode("overwrite").saveAsTable("insurance_silver")

print("Silver layer 'insurance_silver' table created successfully")

display(spark.table("insurance_silver"))

In [0]:
from pyspark.sql.functions import countDistinct, sum, min, max

# Read from silver table
silver_df = spark.table("insurance_silver")

# Aggregate: count of distinct customers per policy_type
policy_type_df = silver_df.groupBy("policy_type").agg(
    countDistinct("customer_id").alias("customer_count")
)

# Aggregate: total premium, min/max start_date per customer
customer_df = silver_df.groupBy("customer_id").agg(
    sum("premium_amount").alias("total_premium"),
    min("start_date").alias("min_start_date"),
    max("end_date").alias("max_end_date")
)

# Save as gold tables
policy_type_df.write.mode("overwrite").saveAsTable("insurance_gold_policy_type")
customer_df.write.mode("overwrite").saveAsTable("insurance_gold_customer")

print("Gold layer 'insurance_gold_policy_type' and 'insurance_gold_customer' tables created successfully")

display(spark.table("insurance_gold_policy_type"))
display(spark.table("insurance_gold_customer"))

In [0]:
%sql
CREATE TABLE Employee (
    EmpID INT PRIMARY KEY,
    Name VARCHAR(50) NOT NULL,
    Department VARCHAR(50) NOT NULL,
    Salary DECIMAL(10,2) NOT NULL,
    JoinDate DATE NOT NULL
);

-- Insert sample data
INSERT INTO Employee (EmpID, Name, Department, Salary, JoinDate) VALUES
(101, 'Alice',   'HR',      50000, '2020-01-15'),
(102, 'Bob',     'HR',      60000, '2019-03-10'),
(103, 'Charlie', 'IT',      70000, '2021-07-01'),
(104, 'David',   'IT',      80000, '2018-11-20'),
(105, 'Emma',    'Finance', 75000, '2022-02-05'),
(106, 'Frank',   'Finance', 65000, '2020-06-12'),
(107, 'Grace',   'IT',      90000, '2017-09-25');


In [0]:
%sql
select department,max(salary) from employee group by department

In [0]:
%sql
select max(salary),department, month(joinDate) from employee group by department,month(joinDate)

In [0]:
%sql
select e.empid,e.name,e.department,e.Salary from employee e join (select department,max(salary) as m_sal from employee group by department) d on e.Department=d.department and e.salary=d.m_sal

In [0]:
%sql
SELECT 
  EmpID, Name, Department, Salary,
  RANK() OVER (PARTITION BY Department ORDER BY Salary DESC) AS SalaryRank
FROM Employee;