# 8

Given employee data with columns:
employee_id, department, salary, join_date

You need to:

Calculate cumulative salary within each department, ordered by join_date.

Assign dense rank within each department based on salary in descending order.

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum as _sum
from pyspark.sql.window import Window
from pyspark.sql.functions import dense_rank

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    (1, "HR", 50000, "2022-01-01"),
    (2, "IT", 70000, "2021-06-15"),
    (3, "HR", 60000, "2023-03-10"),
    (4, "IT", 80000, "2020-12-01")
]
columns = ["employee_id", "department", "salary", "join_date"]

# Create DataFrame
df_8= spark.createDataFrame(data, columns)


In [0]:
df_8.display()

employee_id,department,salary,join_date
1,HR,50000,2022-01-01
2,IT,70000,2021-06-15
3,HR,60000,2023-03-10
4,IT,80000,2020-12-01


In [0]:

df_8.printSchema()

root
 |-- employee_id: long (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- join_date: date (nullable = true)



In [0]:
df_8=df_8.withColumn("join_date",col("join_date").cast("date"))

In [0]:
df_8=df_8.withColumn("cumulative_salary",_sum("salary").over(Window.partitionBy("department").orderBy(col("join_date"))))
df_8.display()

employee_id,department,salary,join_date,cumulative_salary,dense_rank
1,HR,50000,2022-01-01,50000,2
3,HR,60000,2023-03-10,110000,1
4,IT,80000,2020-12-01,80000,1
2,IT,70000,2021-06-15,150000,2


In [0]:
df_8=df_8.withColumn("dense_rank",dense_rank().over(Window.partitionBy("department").orderBy(col("salary").desc())))
df_8.display()

employee_id,department,salary,join_date,cumulative_salary,dense_rank
3,HR,60000,2023-03-10,110000,1
1,HR,50000,2022-01-01,50000,2
4,IT,80000,2020-12-01,80000,1
2,IT,70000,2021-06-15,150000,2


# 10


Problem Statement:
For a given dataset containing product categories, products, and their sales amount, find the Top 3 products by sales amount per category. However:
Twist 1: If two products have the same sales amount, prioritize the product with the smaller product name (alphabetical order).
Twist 2: Ensure the solution is scalable for large datasets with millions of records.



data = [ ("A", "p1", 100), ("A", "p2", 200), ("A", "p3", 200), ("B", "p4", 300), ("B", "p5", 150), ("B", "p6", 150), ("C", "p7", 400), ("C", "p8", 300), ("C", "p9", 200), ] 

columns = ["category", "product", "amount"]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, row_number
from pyspark.sql.window import Window

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [ 
    ("A", "p1", 100), ("A", "p2", 200), ("A", "p3", 200),
    ("B", "p4", 300), ("B", "p5", 150), ("B", "p6", 150),
    ("C", "p7", 400), ("C", "p8", 300), ("C", "p9", 200),
]
columns = ["category", "product", "amount"]

df_10 = spark.createDataFrame(data, columns)

In [0]:
df_10.display()

category,product,amount
A,p1,100
A,p2,200
A,p3,200
B,p4,300
B,p5,150
B,p6,150
C,p7,400
C,p8,300
C,p9,200


In [0]:
df_10=df_10.withColumn("rn",row_number().over(Window.partitionBy("category").orderBy(col("amount").desc(),col("product").asc())))\
    .filter(col("rn")<=3)\
    .drop("rn")

In [0]:
df_10.display()

category,product,amount
A,p2,200
A,p3,200
A,p1,100
B,p4,300
B,p5,150
B,p6,150
C,p7,400
C,p8,300
C,p9,200


# 11
You are given a dataset containing employee data with their department, year of joining, and monthly salary. Each row contains the department name, the year of joining, and the monthly salary of an employee.
Your task is to:
Calculate the average monthly salary for each department.
Filter out departments with an average salary lower than 3000.
Calculate the total salary paid by each department over all years.
Sort the results by total salary paid in descending order.

𝐬𝐜𝐡𝐞𝐦𝐚 𝐚𝐧𝐝 𝐝𝐚𝐭𝐚𝐬𝐞𝐭
data = [
 ("HR", "2020", 2500), 
 ("HR", "2021", 3200), 
 ("HR", "2022", 2800), 
 ("Engineering", "2020", 5000), 
 ("Engineering", "2021", 6000), 
 ("Engineering", "2022", 5500), 
 ("Marketing", "2020", 4000), 
 ("Marketing", "2021", 3500), 
 ("Marketing", "2022", 3300)
]


Create DataFrame
df = spark.createDataFrame(data, ["Department", "Year", "Salary"])

In [0]:
data = [
    ("HR", "2020", 2500), ("HR", "2021", 3200), ("HR", "2022", 2800),
    ("Engineering", "2020", 5000), ("Engineering", "2021", 6000), ("Engineering", "2022", 5500),
    ("Marketing", "2020", 4000), ("Marketing", "2021", 3500), ("Marketing", "2022", 3300)
]
columns = ["Department", "Year", "Salary"]

df_11 = spark.createDataFrame(data, columns)
df_11.display()


Department,Year,Salary
HR,2020,2500
HR,2021,3200
HR,2022,2800
Engineering,2020,5000
Engineering,2021,6000
Engineering,2022,5500
Marketing,2020,4000
Marketing,2021,3500
Marketing,2022,3300


In [0]:
from pyspark.sql.functions import avg

In [0]:
df_NEW=df_11.groupBy("Department").agg(avg("Salary").alias("avg_salary"))

In [0]:
df_NEW.display()

Department,avg_salary
HR,2833.333333333333
Engineering,5500.0
Marketing,3600.0


In [0]:
df_filtered=df_NEW.filter(col("avg_salary")>3000)
df_filtered.display()

Department,avg_salary
Engineering,5500.0
Marketing,3600.0


In [0]:
df_join=df_11.join(df_filtered,on="Department",how="inner")
df_join.display()

Department,Year,Salary,avg_salary
Engineering,2022,5500,5500.0
Marketing,2022,3300,3600.0
Engineering,2021,6000,5500.0
Marketing,2021,3500,3600.0
Engineering,2020,5000,5500.0
Marketing,2020,4000,3600.0


In [0]:
from pyspark.sql.functions import col, sum

# Convert Salary to integer
df_join = df_join.withColumn("Salary", col("Salary").cast("int"))

# Now do the aggregation
df_final = df_join.groupBy("Department")\
    .agg(sum("Salary").alias("total_salary"))\
    .orderBy(col("total_salary").desc())  # Sort by total salary in descending order

df_final.display()




Department,total_salary
Engineering,16500
Marketing,10800


# 12
You have a dataset of transactions that contains the following fields:
transaction_id (integer): Unique ID for each transaction.
user_id (integer): ID of the user performing the transaction.
transaction_amount (float): Amount of the transaction.
transaction_date (string): Date of the transaction in yyyy-MM-dd format.
From this dataset, perform the following operations:
Find the top 3 users with the highest total transaction amounts.
Among these top 3 users, for each, identify the most recent transaction date.


data = [ (1, 101, 500.0, "2024-01-01"), (2, 102, 200.0, "2024-01-02"), 
(3, 101, 300.0, "2024-01-03"), (4, 103, 100.0, "2024-01-04"), 
(5, 102, 400.0, "2024-01-05"), (6, 103, 600.0, "2024-01-06"), 
(7, 101, 200.0, "2024-01-07"), ] 

columns = ["transaction_id", "user_id", "transaction_amount", "transaction_date"]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, countDistinct, avg

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Step 1: Create DataFrame
data = [
    (1, 101, 500.0, "2024-01-01"), 
    (2, 102, 200.0, "2024-01-02"), 
    (3, 101, 300.0, "2024-01-03"), 
    (4, 103, 100.0, "2024-01-04"), 
    (5, 102, 400.0, "2024-01-05"), 
    (6, 103, 600.0, "2024-01-06"), 
    (7, 101, 200.0, "2024-01-07"),
]
columns = ["transaction_id", "user_id", "transaction_amount", "transaction_date"]

df = spark.createDataFrame(data, columns)

# Step 2: Count distinct dates per user
df_date_count = df.groupBy("user_id")\
    .agg(countDistinct("transaction_date").alias("date_count"))

# Step 3: Filter users with >= 3 transaction dates
df_filtered_users = df_date_count.filter(col("date_count") >= 3)

# Step 4: Join with original to get only relevant users
df_joined = df.join(df_filtered_users, on="user_id", how="inner")

# Step 5: Calculate average transaction amount
df_result = df_joined.groupBy("user_id")\
    .agg(avg("transaction_amount").alias("average_transaction_amount"))

df_result.display()


user_id,average_transaction_amount
101,333.3333333333333


# 18

You have two datasets with different keys, and you need to join them based on a specific mapping logic. Here are the datasets: 

Task: Write a PySpark program to join these datasets such that category_id in Dataset 1 matches with cat_id in Dataset 2, and produce the result with the following schema: | product_id | product_name | category_name |


Create Dataset 1 (Products) 

products_data = [ (1, "Laptop", 101), (2, "Smartphone", 102), (3, "Tablet", 101) ] 

products_schema = ["product_id", "product_name", "category_id"] 

products_df = spark.createDataFrame(products_data, schema=products_schema) 

Create Dataset 2 (Categories) 

categories_data = [ (101, "Electronics"), (102, "Mobile"), (103, "Home Appliance") ] 

categories_schema = ["cat_id", "category_name"]

In [0]:


# Dataset 1: Products
products_data = [
    (1, "Laptop", 101),
    (2, "Smartphone", 102),
    (3, "Tablet", 101)
]
products_schema = ["product_id", "product_name", "category_id"]
products_df = spark.createDataFrame(products_data, schema=products_schema)

# Dataset 2: Categories
categories_data = [
    (101, "Electronics"),
    (102, "Mobile"),
    (103, "Home Appliance")
]
categories_schema = ["cat_id", "category_name"]
categories_df = spark.createDataFrame(categories_data, schema=categories_schema)

In [0]:
products_df.display()
categories_df.display()

product_id,product_name,category_id
1,Laptop,101
2,Smartphone,102
3,Tablet,101


cat_id,category_name
101,Electronics
102,Mobile
103,Home Appliance


In [0]:
df_joined=products_df.join(categories_df,on=products_df.category_id==categories_df.cat_id,how="inner")\
  .select("product_id","product_name","category_name")
df_joined.display()

product_id,product_name,category_name
1,Laptop,Electronics
2,Smartphone,Mobile
3,Tablet,Electronics


# 20
You are working as a Data Engineer for a retail company that operates in multiple regions. The sales data is collected daily, and the company wants to analyze the performance of each product across different regions. The data is stored in a PySpark DataFrame with the following schema:
The management has requested a report where each region becomes a column and the values represent the total sales for each product in that region. Your task is to write PySpark code to generate this pivot table.

Task:
1. Load the sample data into a PySpark DataFrame.
2. Use PySpark's pivot functionality to create a table where:
Each region is a column.
The rows represent the products.
The values are the total sales for each product in each region.
3. Provide the output DataFrame in a user-friendly format for the stakeholders.

𝐬𝐜𝐡𝐞𝐦𝐚 𝐚𝐧𝐝 𝐝𝐚𝐭𝐚𝐬𝐞𝐭
Sample Data data = [ ("A", "North", 100), ("B", "East", 200), ("A", "East", 150), ("C", "North", 300), ("B", "South", 400), ("C", "East", 250) ] columns = ["Product", "Region", "Sales"]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Sample Data
data = [
    ("A", "North", 100),
    ("B", "East", 200),
    ("A", "East", 150),
    ("C", "North", 300),
    ("B", "South", 400),
    ("C", "East", 250)
]
columns = ["Product", "Region", "Sales"]

# Create DataFrame
df_20= spark.createDataFrame(data, columns)
df_20.display()


Product,Region,Sales
A,North,100
B,East,200
A,East,150
C,North,300
B,South,400
C,East,250


In [0]:
 df_20.groupBy("Product")\
             .pivot("Region")\
             .agg(sum("Sales"))\
             .fillna(0)\
             .display()

Product,East,North,South
A,150,100,0
C,250,300,0
B,200,0,400


# 21
You are given a dataset of employees with the following columns: name, department, salary, and hire_date. You need to calculate the ROW_NUMBER() partitioned by department and ordered by salary in descending order. Additionally, the employees should be ranked within each department based on their hiring date if their salaries are the same. Add a new column called rank to the DataFrame that contains the calculated row numbers.

𝐬𝐜𝐡𝐞𝐦𝐚 𝐚𝐧𝐝 𝐝𝐚𝐭𝐚𝐬𝐞𝐭
data = [ ("John", "HR", 5000, "2021-05-01"), 
("Jane", "HR", 6000, "2022-03-15"), 
("Sam", "Engineering", 7000, "2021-06-01"), 
("Anna", "Engineering", 8000, "2020-07-01"), 
("Paul", "HR", 4500, "2021-05-01"), 
("Sara", "Engineering", 7000, "2020-08-01"), 
("Tom", "Engineering", 7500, "2021-07-01") ]

Create DataFrame columns = ["name", "department", "salary", "hire_date"]

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Initialize Spark session
spark = SparkSession.builder.getOrCreate()

# Sample data
data = [
    ("John", "HR", 5000, "2021-05-01"),
    ("Jane", "HR", 6000, "2022-03-15"),
    ("Sam", "Engineering", 7000, "2021-06-01"),
    ("Anna", "Engineering", 8000, "2020-07-01"),
    ("Paul", "HR", 4500, "2021-05-01"),
    ("Sara", "Engineering", 7000, "2020-08-01"),
    ("Tom", "Engineering", 7500, "2021-07-01")
]

columns = ["name", "department", "salary", "hire_date"]
df_21 = spark.createDataFrame(data, columns)
df_21.display()


name,department,salary,hire_date
John,HR,5000,2021-05-01
Jane,HR,6000,2022-03-15
Sam,Engineering,7000,2021-06-01
Anna,Engineering,8000,2020-07-01
Paul,HR,4500,2021-05-01
Sara,Engineering,7000,2020-08-01
Tom,Engineering,7500,2021-07-01


In [0]:
df_21=df_21.withColumn("rank",row_number().over(Window.partitionBy("department").orderBy(col("salary").desc(),col("hire_date").asc())))
df_21.display()

name,department,salary,hire_date,rank
Anna,Engineering,8000,2020-07-01,1
Tom,Engineering,7500,2021-07-01,2
Sara,Engineering,7000,2020-08-01,3
Sam,Engineering,7000,2021-06-01,4
Jane,HR,6000,2022-03-15,1
John,HR,5000,2021-05-01,2
Paul,HR,4500,2021-05-01,3


# 22
You have the following dataset containing sales information for different products and regions. Reshape the data using PySpark's pivot() method to calculate the total sales for each product across regions, and then optimize it further by applying specific transformations.

Task 1: Use pivot() to create a table showing the total sales for each product by region.

Task 2: Add a column calculating the percentage contribution of each region to the total sales for that product.

Task 3: Sort the data in descending order by total sales for each product.




data = [ ("North", "Laptop", 2000, "Q1"), ("South", "Laptop", 3000, "Q1"), ("East", "Laptop", 2500, "Q1"), ("North", "Phone", 1500, "Q1"), 
("South", "Phone", 1000, "Q1"), ("East", "Phone", 2000, "Q1"), 
("North", "Laptop", 3000, "Q2"), ("South", "Laptop", 4000, "Q2"), 
("East", "Laptop", 3500, "Q2"), ("North", "Phone", 2500, "Q2"), 
("South", "Phone", 1500, "Q2"), ("East", "Phone", 3000, "Q2"), ] 

columns = ["Region", "Product", "Sales", "Quarter"] 

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum, col, round

spark = SparkSession.builder.getOrCreate()

data = [
    ("North", "Laptop", 2000, "Q1"),
    ("South", "Laptop", 3000, "Q1"),
    ("East", "Laptop", 2500, "Q1"),
    ("North", "Phone", 1500, "Q1"),
    ("South", "Phone", 1000, "Q1"),
    ("East", "Phone", 2000, "Q1"),
    ("North", "Laptop", 3000, "Q2"),
    ("South", "Laptop", 4000, "Q2"),
    ("East", "Laptop", 3500, "Q2"),
    ("North", "Phone", 2500, "Q2"),
    ("South", "Phone", 1500, "Q2"),
    ("East", "Phone", 3000, "Q2"),
]

columns = ["Region", "Product", "Sales", "Quarter"]

df_22 = spark.createDataFrame(data, columns)
df_22.display()


Region,Product,Sales,Quarter
North,Laptop,2000,Q1
South,Laptop,3000,Q1
East,Laptop,2500,Q1
North,Phone,1500,Q1
South,Phone,1000,Q1
East,Phone,2000,Q1
North,Laptop,3000,Q2
South,Laptop,4000,Q2
East,Laptop,3500,Q2
North,Phone,2500,Q2


In [0]:
df_22_group=df_22.groupBy("Region","Product").agg(sum("Sales").alias("Total_sales"))
df_22_group.display()

Region,Product,Total_sales
North,Laptop,5000
South,Laptop,7000
East,Laptop,6000
North,Phone,4000
South,Phone,2500
East,Phone,5000


# Task 1: Pivot → Total Sales by Product & Region

In [0]:
df_22_group=df_22_group.groupby("Product")\
    .pivot("Region")\
    .agg(sum("Total_sales"))

In [0]:
df_22_group.display()

Product,East,North,South
Laptop,6000,5000,7000
Phone,5000,4000,2500


#  Task 2: Add % Contribution per Region

In [0]:
df_22_group=df_22_group.withColumn("total",col("east")+col("north")+col("south"))
df_22_group.display()

Product,East,North,South,total
Laptop,6000,5000,7000,18000
Phone,5000,4000,2500,11500


In [0]:
df_final_22=df_22_group.withColumn("East_per",round(col("East")/col("total")*100,2))\
    .withColumn("North_per",round((col("North")/col("total"))*100,2))\
    .withColumn("South_per",round((col("South")/col("total"))*100,2))

In [0]:
df_final_22.display()

Product,East,North,South,total,East_per,North_per,South_per
Laptop,6000,5000,7000,18000,33.33,27.78,38.89
Phone,5000,4000,2500,11500,43.48,34.78,21.74


In [0]:
df_final_22.orderBy(col("total").desc())
df_final_22.display()

Product,East,North,South,total,East_per,North_per,South_per
Laptop,6000,5000,7000,18000,33.33,27.78,38.89
Phone,5000,4000,2500,11500,43.48,34.78,21.74


# 23
Reading and Processing CSV Data
Let's assume you have a CSV file named employees.csv with the following content:
You are tasked with analyzing employee data from a CSV file using PySpark. Your tasks are as follows:
Read the CSV file into a PySpark DataFrame.
Find the total number of employees in each department.
Calculate the average salary of employees in each department.
Filter and display employees who joined after 2020-01-01 and belong to the "HR" department.
Save the filtered results to a new CSV file named filtered_employees.csv.

𝐬𝐜𝐡𝐞𝐦𝐚 𝐚𝐧𝐝 𝐝𝐚𝐭𝐚𝐬𝐞𝐭
data = [ (1, "John Doe", "IT", 80000, "2021-01-15"), 
(2, "Jane Smith", "HR", 70000, "2020-05-22"),
(3, "Robert Brown", "IT", 85000, "2019-11-03"), 
(4, "Emily Davis", "Finance", 90000, "2022-07-19"), 
(5, "Michael Johnson", "HR", 75000, "2023-03-11"), ]

 columns = ["EmployeeID", "Name", "Department", "Salary", "JoinDate"]

# 24
You are given a nested JSON file named sample_data.json stored in an S3 bucket at s3://your-bucket/sample_data.json. The JSON file contains details about employees, including their names, departments, and address details (nested fields).
Write a PySpark program to:
Load the JSON file into a DataFrame.
Flatten the nested structure to create a tabular format.
Write the resulting DataFrame as a Parquet file to the output path s3://your-bucket/output/.

𝐬𝐜𝐡𝐞𝐦𝐚 JSON Data (sample_data.json)
[
 {
 "id": 1,
 "name": "Alice",
 "department": "HR",
 "address": {
 "city": "New York",
 "state": "NY"
 }
 },
 {
 "id": 2,
 "name": "Bob",
 "department": "IT",
 "address": {
 "city": "San Francisco",
 "state": "CA"
 }
 },
 {
 "id": 3,
 "name": "Charlie",
 "department": "Finance",
 "address": {
 "city": "Chicago",
 "state": "IL"
 }
 }
]
Explanation:
Step 1: The JSON file is loaded into a DataFrame using spark.read.json().
Step 2: The select() method is used to extract and rename nested fields (address.city and address.state) to create a flattened structure.
Step 3: The resulting DataFrame is written as a Parquet file using write.parquet().



𝐬𝐜𝐡𝐞𝐦𝐚 JSON Data (sample_data.json)
[
 {
 "id": 1,
 "name": "Alice",
 "department": "HR",
 "address": {
 "city": "New York",
 "state": "NY"
 }
 },
 {
 "id": 2,
 "name": "Bob",
 "department": "IT",
 "address": {
 "city": "San Francisco",
 "state": "CA"
 }
 },
 {
 "id": 3,
 "name": "Charlie",
 "department": "Finance",
 "address": {
 "city": "Chicago",
 "state": "IL"
 }
 }
]
Explanation:
Step 1: The JSON file is loaded into a DataFrame using spark.read.json().
Step 2: The select() method is used to extract and rename nested fields (address.city and address.state) to create a flattened structure.
Step 3: The resulting DataFrame is written as a Parquet file using write.parquet().


# 25
Given two datasets, products and sales, write a PySpark program to identify products that have never been sold. Assume the schema of products is (product_id, product_name) and the schema of sales is (sale_id, product_id, sale_date).

𝐬𝐜𝐡𝐞𝐦𝐚 
Create DataFrame for products 

products = spark.createDataFrame([ (1, "Laptop"), (2, "Tablet"), 
(3, "Smartphone"), (4, "Monitor"), 
(5, "Keyboard") ], ["product_id", "product_name"]) 

Create DataFrame for sales 

sales = spark.createDataFrame([ (101, 1, "2025-01-01"), (102, 3, "2025-01-02"), (103, 5, "2025-01-03") ], ["sale_id", "product_id", "sale_date"]) 
