## PySpark COde Practice

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark.sql.functions import *  # Import the function
spark = SparkSession.builder.getOrCreate()
from pyspark.sql.functions import regexp_replace, col
from google.colab import drive

In [2]:
#Initial Saprk Session
spark = SparkSession.builder.appName("PySparkPractice").getOrCreate()


In [3]:
data = [
    ("1/1/2023", "C1", 20),
    ("1/1/2023", "C2", 20),
    ("1/2/2023", "C2", 50),
    ("1/2/2023", "C3", 12),
    ("1/3/2023", "C4", 20),
    ("1/3/2023", "C5", 100),
    ("1/3/2023", "C1", 123),
]
column = ['Date_fld', 'Custome_name', 'Amount']

df = spark.createDataFrame(data, column)
df.show()

# Convert date in proper format
df = df.withColumn("Date_fld", to_date(col("Date_fld"), "M/d/yyyy"))
df.show()

# Define window partition by customer orderd by date
window_space = Window.partitionBy('Custome_name').orderBy('Date_fld')

#Assign row number within each customer number
df = df.withColumn('Row_Number', row_number().over(window_space))
df.show()

#filter only new customer
df_new_custome = df.filter(col('Row_Number') == 1)
df_new_custome.show()

# Count distinct new customers per date
result_df = df_new_custome.groupBy('Date_fld').agg(countDistinct('Custome_name').alias('Price_Count'))
result_df.show()




+--------+------------+------+
|Date_fld|Custome_name|Amount|
+--------+------------+------+
|1/1/2023|          C1|    20|
|1/1/2023|          C2|    20|
|1/2/2023|          C2|    50|
|1/2/2023|          C3|    12|
|1/3/2023|          C4|    20|
|1/3/2023|          C5|   100|
|1/3/2023|          C1|   123|
+--------+------------+------+

+----------+------------+------+
|  Date_fld|Custome_name|Amount|
+----------+------------+------+
|2023-01-01|          C1|    20|
|2023-01-01|          C2|    20|
|2023-01-02|          C2|    50|
|2023-01-02|          C3|    12|
|2023-01-03|          C4|    20|
|2023-01-03|          C5|   100|
|2023-01-03|          C1|   123|
+----------+------------+------+

+----------+------------+------+----------+
|  Date_fld|Custome_name|Amount|Row_Number|
+----------+------------+------+----------+
|2023-01-01|          C1|    20|         1|
|2023-01-03|          C1|   123|         2|
|2023-01-01|          C2|    20|         1|
|2023-01-02|          C2|    5

## Pyspark code to solve

In [4]:
#Craete Spark Session
spark2 = SparkSession\
.builder\
.config('spark.shuffle.useOldFetchers', 'true')\
.config('spark.ui.port','0')\
.config('spark.sql.warehouse.dir', '/user/itv008042/warehouse')\
.enableHiveSupport()\
.master('yarn')\
.appName('PySparkPractice')\
.getOrCreate()

#Create Schema

schema = StructType([
    StructField("ActorId", IntegerType(), True),
    StructField("DirectorId", IntegerType(), True),
    StructField("TimeStamp", IntegerType(), True)
])

data = [
    (1,1,0),
    (1,1,1),
    (1,1,2),
    (1,2,3),
    (1,2,4),
    (1,1,5),
    (1,1,6)
]

# Create Data frame
df = spark2.createDataFrame(data, schema)
df.show()


+-------+----------+---------+
|ActorId|DirectorId|TimeStamp|
+-------+----------+---------+
|      1|         1|        0|
|      1|         1|        1|
|      1|         1|        2|
|      1|         2|        3|
|      1|         2|        4|
|      1|         1|        5|
|      1|         1|        6|
+-------+----------+---------+



### Create Group By ActorId','DirectorId

In [5]:
df_group = df.groupBy('ActorId', 'DirectorId').count()
df_group.show()

+-------+----------+-----+
|ActorId|DirectorId|count|
+-------+----------+-----+
|      1|         1|    5|
|      1|         2|    2|
+-------+----------+-----+



In [6]:
df_group.filter(df_group['count']> 3).show()


+-------+----------+-----+
|ActorId|DirectorId|count|
+-------+----------+-----+
|      1|         1|    5|
+-------+----------+-----+



02: Write an pyspark code to find the ctr of each Ad.Round ctr to 2
decimal points. Order the result table by ctr in descending order
and by ad_id in ascending order in case of a tie.

In [7]:
schema = StructType([
    StructField("ad_id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("action", StringType(), True)
])
data = [
   (1, 1, 'Clicked'),
  (2, 2, 'Clicked'),
  (3, 3, 'Viewed'),
  (5, 5, 'Ignored'),
  (1, 7, 'Ignored'),
  (2, 7, 'Viewed'),
  (3, 5, 'Clicked'),
  (1, 4, 'Viewed'),
  (2, 11, 'Viewed'),
  (1, 2, 'Clicked')
]

#Create Datframe
df = spark.createDataFrame(data, schema)
df.show()

+-----+-------+-------+
|ad_id|user_id| action|
+-----+-------+-------+
|    1|      1|Clicked|
|    2|      2|Clicked|
|    3|      3| Viewed|
|    5|      5|Ignored|
|    1|      7|Ignored|
|    2|      7| Viewed|
|    3|      5|Clicked|
|    1|      4| Viewed|
|    2|     11| Viewed|
|    1|      2|Clicked|
+-----+-------+-------+



03: Write a Pyspark program to report the first name, last name, city, and state of each person in the
Person dataframe. If the address of a personId is not present in the Address dataframe,
report null instead.

In [8]:
# Define schema for the 'persons' table
persons_schema = StructType([
  StructField("personId", IntegerType(), True),
  StructField("lastName", StringType(), True),
  StructField("firstName", StringType(), True)
])
# Define schema for the 'addresses' table
addresses_schema = StructType([
  StructField("addressId", IntegerType(), True),
  StructField("personId", IntegerType(), True),
  StructField("city", StringType(), True),
  StructField("state", StringType(), True)
])
# Define data for the 'persons' table
persons_data = [
  (1, 'Wang', 'Allen'),
  (2, 'Alice', 'Bob')
]
# Define data for the 'addresses' table
addresses_data = [
  (1, 2, 'New York City', 'New York'),
  (2, 3, 'Leetcode', 'California')
]

#Create Data Frame
person_df = spark.createDataFrame(persons_data, persons_schema)
address_df = spark.createDataFrame(addresses_data, addresses_schema)

person_df.show()
address_df.show()


+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

+---------+--------+-------------+----------+
|addressId|personId|         city|     state|
+---------+--------+-------------+----------+
|        1|       2|New York City|  New York|
|        2|       3|     Leetcode|California|
+---------+--------+-------------+----------+



In [9]:
df_join = person_df.join(address_df, person_df["personId"] == address_df["personId"], "left")
df_join.select('firstName', 'lastName','city','state').show()

##first name, last name, city, and state

+---------+--------+-------------+--------+
|firstName|lastName|         city|   state|
+---------+--------+-------------+--------+
|    Allen|    Wang|         NULL|    NULL|
|      Bob|   Alice|New York City|New York|
+---------+--------+-------------+--------+



04: Employees Earning More Than Their Managers
Write a Pyspark program to find Employees Earning More Than Their
Managers

In [10]:
# Define the schema for the "employees"
employees_schema = StructType([
  StructField("id", IntegerType(), True),
  StructField("name", StringType(), True),
  StructField("salary", IntegerType(), True),
  StructField("managerId", IntegerType(), True)
])
# Define data for the "employees"
employees_data = [
  (1, 'Joe', 70000, 3),
  (2, 'Henry', 80000, 4),
  (3, 'Sam', 60000, None),
  (4, 'Max', 90000, None)
]

emp_df = spark.createDataFrame(employees_data, employees_schema)
emp_df.show()


+---+-----+------+---------+
| id| name|salary|managerId|
+---+-----+------+---------+
|  1|  Joe| 70000|        3|
|  2|Henry| 80000|        4|
|  3|  Sam| 60000|     NULL|
|  4|  Max| 90000|     NULL|
+---+-----+------+---------+



In [11]:
emp_df_1 = emp_df.alias('e1')
emp_df_2 = emp_df.alias('e2')

self_joined_df = emp_df_1.join(emp_df_2, col("e1.id") == col("e2.managerId"))
self_joined_df.show()


+---+----+------+---------+---+-----+------+---------+
| id|name|salary|managerId| id| name|salary|managerId|
+---+----+------+---------+---+-----+------+---------+
|  3| Sam| 60000|     NULL|  1|  Joe| 70000|        3|
|  4| Max| 90000|     NULL|  2|Henry| 80000|        4|
+---+----+------+---------+---+-----+------+---------+



In [12]:
self_joined_after_df = self_joined_df.select(col("e2.name"), col('e2.salary'), col("e1.salary").alias("manager_salary"))
self_joined_after_df.show()

+-----+------+--------------+
| name|salary|manager_salary|
+-----+------+--------------+
|  Joe| 70000|         60000|
|Henry| 80000|         90000|
+-----+------+--------------+



In [13]:
self_joined_after_df.filter(self_joined_after_df.salary > self_joined_after_df.manager_salary).select('name').show()

+----+
|name|
+----+
| Joe|
+----+



Write a Pyspark program to report all the duplicate emails.
Note that it's guaranteed that the email field is not NULL.

In [14]:
# Define the schema for the "emails" table
emails_schema = StructType([
StructField("id", IntegerType(), True),
StructField("email", StringType(), True)
])
# Define data for the "emails" table
emails_data = [
(1, 'a@b.com'),
(2, 'c@d.com'),
(3, 'a@b.com')
]

# Create a PySpark DataFrame
email_df = spark.createDataFrame(emails_data, emails_schema)
email_df.show()


+---+-------+
| id|  email|
+---+-------+
|  1|a@b.com|
|  2|c@d.com|
|  3|a@b.com|
+---+-------+



In [15]:
df_email_group = email_df.groupBy('email').count()
df_email_group.show()

df_email_group.filter(df_email_group['count']>1).show()

+-------+-----+
|  email|count|
+-------+-----+
|a@b.com|    2|
|c@d.com|    1|
+-------+-----+

+-------+-----+
|  email|count|
+-------+-----+
|a@b.com|    2|
+-------+-----+



Need to start page 21

## Write a program in pyspark last 3 days product sell rolling avarage price?

In [16]:
# Sample data: product_id, date, price
data = [
    ("p1", "2025-04-10", 100),
    ("p1", "2025-04-11", 110),
    ("p1", "2025-04-12", 105),
    ("p1", "2025-04-13", 120),
    ("p1", "2025-04-14", 130),
    ("p2", "2025-04-10", 200),
    ("p2", "2025-04-11", 220),
    ("p2", "2025-04-12", 210),
    ("p2", "2025-04-14", 230),
]

# Create DataFrame
df = spark.createDataFrame(data, ["product_id", "date", "price"])
# Convert date to DateType
df = df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

df.show()

# Define window: partition by product_id, ordered by date, looking back 2 rows (3-day window)
windowSpace = Window.partitionBy("product_id").orderBy("date").rowsBetween(-2, 0)

# Add rolling average column
df_with_rolling_avg_column = df.withColumn("rolling_avg_price", avg("price").over(windowSpace))

df_with_rolling_avg_column.orderBy("product_id", "date").show()




+----------+----------+-----+
|product_id|      date|price|
+----------+----------+-----+
|        p1|2025-04-10|  100|
|        p1|2025-04-11|  110|
|        p1|2025-04-12|  105|
|        p1|2025-04-13|  120|
|        p1|2025-04-14|  130|
|        p2|2025-04-10|  200|
|        p2|2025-04-11|  220|
|        p2|2025-04-12|  210|
|        p2|2025-04-14|  230|
+----------+----------+-----+

+----------+----------+-----+------------------+
|product_id|      date|price| rolling_avg_price|
+----------+----------+-----+------------------+
|        p1|2025-04-10|  100|             100.0|
|        p1|2025-04-11|  110|             105.0|
|        p1|2025-04-12|  105|             105.0|
|        p1|2025-04-13|  120|111.66666666666667|
|        p1|2025-04-14|  130|118.33333333333333|
|        p2|2025-04-10|  200|             200.0|
|        p2|2025-04-11|  220|             210.0|
|        p2|2025-04-12|  210|             210.0|
|        p2|2025-04-14|  230|             220.0|
+----------+---------

You’re given a dataset containing employee names, departments, and their salaries.

 Your task is to rank the employees by salary within each department and classify them as:

"High Growth" if the salary is higher than the previous person in that department

"Low Growth" if the salary is lower

"No Growth" if the salary is the same or it's the first employee in the department

In [17]:
data = [
    ("Alice", "HR", 50000),
    ("Bob", "IT", 60000),
    ("Charlie", "HR", 55000),
    ("David", "IT", 70000),
    ("Eve", "HR", 52000),
    ("Frank", "IT", 65000),
    ("Grace", "HR", 51000),
    ("Hank", "IT", 68000),
    ("Ivy", "HR", 53000),
    ("Jack", "IT", 72000),
    ("Kelly", "HR", 54000),
    ("Luke", "IT", 69000),

]
columns = ['name', 'department', 'salary']

df = spark.createDataFrame(data, columns)
df.show()

# Defin window function and use LAG to compare the salary
windowSpace = Window.partitionBy("department").orderBy("salary")
df_with_lag = df.withColumn("previous_salary", lag("salary").over(windowSpace))
df_with_lag.show()

#Apply Classification Logic
final_df = df_with_lag.withColumn(
    "growth_status",
    when(col("previous_salary").isNull(), "No Growth")\
    .when(col("salary")>col("previous_salary"), "High Growth")\
    .when(col("salary")<col("previous_salary"), "Low Growth")\
    .otherwise("No Growth")
)

#Display the result
final_df.show()



+-------+----------+------+
|   name|department|salary|
+-------+----------+------+
|  Alice|        HR| 50000|
|    Bob|        IT| 60000|
|Charlie|        HR| 55000|
|  David|        IT| 70000|
|    Eve|        HR| 52000|
|  Frank|        IT| 65000|
|  Grace|        HR| 51000|
|   Hank|        IT| 68000|
|    Ivy|        HR| 53000|
|   Jack|        IT| 72000|
|  Kelly|        HR| 54000|
|   Luke|        IT| 69000|
+-------+----------+------+

+-------+----------+------+---------------+
|   name|department|salary|previous_salary|
+-------+----------+------+---------------+
|  Alice|        HR| 50000|           NULL|
|  Grace|        HR| 51000|          50000|
|    Eve|        HR| 52000|          51000|
|    Ivy|        HR| 53000|          52000|
|  Kelly|        HR| 54000|          53000|
|Charlie|        HR| 55000|          54000|
|    Bob|        IT| 60000|           NULL|
|  Frank|        IT| 65000|          60000|
|   Hank|        IT| 68000|          65000|
|   Luke|        IT| 69

Write a solution to find the average selling price for each product. average_price should be rounded to 2 decimal places. If a product does not have any sold units, its average selling price is assumed to be 0.


In [39]:
from pyspark.sql.functions import to_date
#Sample Data
prices_data = [
    (1, 100.0, "2023-01-01", "2023-01-31"),
    (2, 200.0, "2023-01-10", "2023-02-10"),
    (3, 150.0, "2023-01-15", "2023-01-25"),
]

units_sold_data =[
    (1,"2023-01-05",10),
    (2,"2023-01-15",5),
    (2,"2023-02-05",5),
    (4,"2023-01-25",5)
]

# Define Schema
prices_column = ['product_id', 'price', 'start_date', 'end_date']
units_sold_column = ['product_id', 'purchase_date', 'units']

prices_df = spark.createDataFrame(prices_data, prices_column)
units_sold_df = spark.createDataFrame(units_sold_data, units_sold_column)


prices_df = prices_df.withColumn("start_data", to_date("start_date")).withColumn("end_date", to_date("end_date"))
prices_df.show()

units_sold_df = units_sold_df.withColumn("purchase_date", to_date("purchase_date"))
units_sold_df.show()

#Join wih date  filtering condition
join_df = prices_df.join(units_sold_df, (prices_df.product_id == units_sold_df.product_id)&(units_sold_df.purchase_date.between(prices_df.start_date, prices_df.end_date)), "left")
join_df.show()

#Calculate total revenue and unit sold per product
total_revenue_df = join_df.groupBy(prices_df.product_id)\
  .agg(
      sum(col("price") * col("units")).alias("total_revenue"),
      sum("units").alias("total_units")
  )\
  .withColumn("average_price",
      when(col("total_units").isNull(), 0.0)
      .otherwise(round(col("total_revenue") / col("total_units"), 2))
  )\
  .select("product_id", "average_price")

total_revenue_df.show()

+----------+-----+----------+----------+----------+
|product_id|price|start_date|  end_date|start_data|
+----------+-----+----------+----------+----------+
|         1|100.0|2023-01-01|2023-01-31|2023-01-01|
|         2|200.0|2023-01-10|2023-02-10|2023-01-10|
|         3|150.0|2023-01-15|2023-01-25|2023-01-15|
+----------+-----+----------+----------+----------+

+----------+-------------+-----+
|product_id|purchase_date|units|
+----------+-------------+-----+
|         1|   2023-01-05|   10|
|         2|   2023-01-15|    5|
|         2|   2023-02-05|    5|
|         4|   2023-01-25|    5|
+----------+-------------+-----+

+----------+-----+----------+----------+----------+----------+-------------+-----+
|product_id|price|start_date|  end_date|start_data|product_id|purchase_date|units|
+----------+-----+----------+----------+----------+----------+-------------+-----+
|         1|100.0|2023-01-01|2023-01-31|2023-01-01|         1|   2023-01-05|   10|
|         3|150.0|2023-01-15|2023-01-2