
Basic Query
1.Write a PySpark code to display the names and salaries of employees in the "IT" department.

Filtering Data
2.Write a PySpark code to filter out employees who joined after "2021-01-01".

Aggregation
3.Write a PySpark code to calculate the average salary of employees in each department.

Sorting
4.Write a PySpark code to sort the DataFrame by salary in descending order.

Column Operations
5.Write a PySpark code to add a new column named salary_increase which is 10% of the current salary.

Date Manipulation
6.Write a PySpark code to extract the year from the join_date column and create a new column join_year.

Group By and Aggregation
7.Write a PySpark code to group employees by department and count the number of employees in each department.

String Operations
8.Write a PySpark code to create a new column full_name that concatenates name with the department, separated by a hyphen.

Duplicate Rows
9.Write a PySpark code to drop duplicate rows based on the emp_id column.

Joining DataFrames
10.Write a PySpark code to join this DataFrame with another DataFrame df2 on the emp_id column.

Handling Null Values
11.Write a PySpark code to fill null values in the salary column with the average salary of the department.

Filtering with Conditions
12.Write a PySpark code to filter employees who either have a salary greater than 65000 or have joined in 2022.

Window Functions
13.Write a PySpark code to add a column rank which ranks employees within each department based on their salary in descending order.

Pivot Tables
14.Write a PySpark code to pivot the DataFrame to show the sum of salaries for each department by year of join.

Complex Aggregation
15.Write a PySpark code to calculate the maximum salary for each department and the percentage difference from the average salary in that department.

In [32]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark= SparkSession.builder.getOrCreate()
from pyspark.sql.functions import *

data = [
    (1, "John", "IT", 60000, "2022-01-15"),
    (2, "Jane", "HR", 55000, "2021-03-22"),
    (3, "Mike", "IT", 70000, "2022-06-18"),
    (4, "Sara", "Finance", 75000, "2020-11-30"),
    (5, "Amy", "HR", 58000, "2021-05-10"),
    (6, "Tom", "IT", 65000, "2023-02-25"),
    (7, "Lisa", "Finance", 72000, "2022-07-14"),
    (8, "Mark", "IT", 68000, "2021-12-01"),
    (9, "Eva", "HR", 59000, "2020-09-10"),
    (10, "John", "IT", 60000, "2022-01-15")]
df = spark.createDataFrame(data, ["emp_id", "name", "department", "salary", "join_date"])

In [23]:
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
df.createOrReplaceTempView("DF")
df.printSchema()
df.orderBy('join_date',ascending=False)


root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- join_date: string (nullable = true)



emp_id,name,department,salary,join_date
6,Tom,IT,65000,2023-02-25
7,Lisa,Finance,72000,2022-07-14
3,Mike,IT,70000,2022-06-18
1,John,IT,60000,2022-01-15
10,John,IT,60000,2022-01-15
8,Mark,IT,68000,2021-12-01
5,Amy,HR,58000,2021-05-10
2,Jane,HR,55000,2021-03-22
4,Sara,Finance,75000,2020-11-30
9,Eva,HR,59000,2020-09-10


# Basic Query
1.Write a PySpark code to display the names and salaries of employees in the "IT" department.

In [15]:
df.filter(df.department=='IT').select("department","name","salary")

department,name,salary
IT,John,60000
IT,Mike,70000
IT,Tom,65000
IT,Mark,68000
IT,John,60000


In [18]:
spark.sql("select department,name,salary from DF where department=='IT'")

department,name,salary
IT,John,60000
IT,Mike,70000
IT,Tom,65000
IT,Mark,68000
IT,John,60000


# Filtering Data
2.Write a PySpark code to filter out employees who joined after "2021-01-01".

In [25]:
df.filter(df.join_date > '2021-01-01')

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
2,Jane,HR,55000,2021-03-22
3,Mike,IT,70000,2022-06-18
5,Amy,HR,58000,2021-05-10
6,Tom,IT,65000,2023-02-25
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
10,John,IT,60000,2022-01-15


In [26]:
df.filter(df.join_date > '2022-07-14')

emp_id,name,department,salary,join_date
6,Tom,IT,65000,2023-02-25


In [27]:
spark.sql("select * from DF where join_date > '2022-07-14'")

emp_id,name,department,salary,join_date
6,Tom,IT,65000,2023-02-25


# Aggregation
3.Write a PySpark code to calculate the average salary of employees in each department.

In [37]:
df.groupby("department").agg(round(avg("salary"), 2)).alias("avg_salary")

department,"round(avg(salary), 2)"
IT,64600.0
HR,57333.33
Finance,73500.0


In [42]:
spark.sql("select department,round(avg(salary),2) from DF group by department")

department,"round(avg(salary), 2)"
IT,64600.0
HR,57333.33
Finance,73500.0


# Sorting
4.Write a PySpark code to sort the DataFrame by salary in descending order.

In [43]:
df.sort('salary')

emp_id,name,department,salary,join_date
2,Jane,HR,55000,2021-03-22
5,Amy,HR,58000,2021-05-10
9,Eva,HR,59000,2020-09-10
1,John,IT,60000,2022-01-15
10,John,IT,60000,2022-01-15
6,Tom,IT,65000,2023-02-25
8,Mark,IT,68000,2021-12-01
3,Mike,IT,70000,2022-06-18
7,Lisa,Finance,72000,2022-07-14
4,Sara,Finance,75000,2020-11-30


In [44]:
df.orderBy("salary")

emp_id,name,department,salary,join_date
2,Jane,HR,55000,2021-03-22
5,Amy,HR,58000,2021-05-10
9,Eva,HR,59000,2020-09-10
1,John,IT,60000,2022-01-15
10,John,IT,60000,2022-01-15
6,Tom,IT,65000,2023-02-25
8,Mark,IT,68000,2021-12-01
3,Mike,IT,70000,2022-06-18
7,Lisa,Finance,72000,2022-07-14
4,Sara,Finance,75000,2020-11-30


In [45]:
spark.sql("select * from DF order by salary")

emp_id,name,department,salary,join_date
2,Jane,HR,55000,2021-03-22
5,Amy,HR,58000,2021-05-10
9,Eva,HR,59000,2020-09-10
1,John,IT,60000,2022-01-15
10,John,IT,60000,2022-01-15
6,Tom,IT,65000,2023-02-25
8,Mark,IT,68000,2021-12-01
3,Mike,IT,70000,2022-06-18
7,Lisa,Finance,72000,2022-07-14
4,Sara,Finance,75000,2020-11-30


# Column Operations
5.Write a PySpark code to add a new column named salary_increase which is 10% of the current salary.

In [55]:
df.withColumn("salary_increase",col("salary")*0.10)

emp_id,name,department,salary,join_date,salary_increase
1,John,IT,60000,2022-01-15,6000.0
2,Jane,HR,55000,2021-03-22,5500.0
3,Mike,IT,70000,2022-06-18,7000.0
4,Sara,Finance,75000,2020-11-30,7500.0
5,Amy,HR,58000,2021-05-10,5800.0
6,Tom,IT,65000,2023-02-25,6500.0
7,Lisa,Finance,72000,2022-07-14,7200.0
8,Mark,IT,68000,2021-12-01,6800.0
9,Eva,HR,59000,2020-09-10,5900.0
10,John,IT,60000,2022-01-15,6000.0


In [54]:
df.withColumn("salary_increase", expr("salary * 0.10"))

emp_id,name,department,salary,join_date,salary_increase
1,John,IT,60000,2022-01-15,6000.0
2,Jane,HR,55000,2021-03-22,5500.0
3,Mike,IT,70000,2022-06-18,7000.0
4,Sara,Finance,75000,2020-11-30,7500.0
5,Amy,HR,58000,2021-05-10,5800.0
6,Tom,IT,65000,2023-02-25,6500.0
7,Lisa,Finance,72000,2022-07-14,7200.0
8,Mark,IT,68000,2021-12-01,6800.0
9,Eva,HR,59000,2020-09-10,5900.0
10,John,IT,60000,2022-01-15,6000.0


In [58]:
#creating new column new salary after adding 10% increment
df.withColumn("new_salary",expr("(salary*0.10)+salary")).show()

+------+----+----------+------+----------+----------+
|emp_id|name|department|salary| join_date|new_salary|
+------+----+----------+------+----------+----------+
|     1|John|        IT| 60000|2022-01-15|  66000.00|
|     2|Jane|        HR| 55000|2021-03-22|  60500.00|
|     3|Mike|        IT| 70000|2022-06-18|  77000.00|
|     4|Sara|   Finance| 75000|2020-11-30|  82500.00|
|     5| Amy|        HR| 58000|2021-05-10|  63800.00|
|     6| Tom|        IT| 65000|2023-02-25|  71500.00|
|     7|Lisa|   Finance| 72000|2022-07-14|  79200.00|
|     8|Mark|        IT| 68000|2021-12-01|  74800.00|
|     9| Eva|        HR| 59000|2020-09-10|  64900.00|
|    10|John|        IT| 60000|2022-01-15|  66000.00|
+------+----+----------+------+----------+----------+



# Date Manipulation
6.Write a PySpark code to extract the year from the join_date column and create a new column join_year.

In [57]:
df.withColumn("year",year("join_date"))

emp_id,name,department,salary,join_date,year
1,John,IT,60000,2022-01-15,2022
2,Jane,HR,55000,2021-03-22,2021
3,Mike,IT,70000,2022-06-18,2022
4,Sara,Finance,75000,2020-11-30,2020
5,Amy,HR,58000,2021-05-10,2021
6,Tom,IT,65000,2023-02-25,2023
7,Lisa,Finance,72000,2022-07-14,2022
8,Mark,IT,68000,2021-12-01,2021
9,Eva,HR,59000,2020-09-10,2020
10,John,IT,60000,2022-01-15,2022


In [60]:
df.withColumn("month",month("join_date"))

emp_id,name,department,salary,join_date,month
1,John,IT,60000,2022-01-15,1
2,Jane,HR,55000,2021-03-22,3
3,Mike,IT,70000,2022-06-18,6
4,Sara,Finance,75000,2020-11-30,11
5,Amy,HR,58000,2021-05-10,5
6,Tom,IT,65000,2023-02-25,2
7,Lisa,Finance,72000,2022-07-14,7
8,Mark,IT,68000,2021-12-01,12
9,Eva,HR,59000,2020-09-10,9
10,John,IT,60000,2022-01-15,1


In [63]:
df.withColumn("day",dayofmonth("join_date"))

emp_id,name,department,salary,join_date,day
1,John,IT,60000,2022-01-15,15
2,Jane,HR,55000,2021-03-22,22
3,Mike,IT,70000,2022-06-18,18
4,Sara,Finance,75000,2020-11-30,30
5,Amy,HR,58000,2021-05-10,10
6,Tom,IT,65000,2023-02-25,25
7,Lisa,Finance,72000,2022-07-14,14
8,Mark,IT,68000,2021-12-01,1
9,Eva,HR,59000,2020-09-10,10
10,John,IT,60000,2022-01-15,15


In [68]:
spark.sql("select *,year(join_date) as year from DF").show()

+------+----+----------+------+----------+----+
|emp_id|name|department|salary| join_date|year|
+------+----+----------+------+----------+----+
|     1|John|        IT| 60000|2022-01-15|2022|
|     2|Jane|        HR| 55000|2021-03-22|2021|
|     3|Mike|        IT| 70000|2022-06-18|2022|
|     4|Sara|   Finance| 75000|2020-11-30|2020|
|     5| Amy|        HR| 58000|2021-05-10|2021|
|     6| Tom|        IT| 65000|2023-02-25|2023|
|     7|Lisa|   Finance| 72000|2022-07-14|2022|
|     8|Mark|        IT| 68000|2021-12-01|2021|
|     9| Eva|        HR| 59000|2020-09-10|2020|
|    10|John|        IT| 60000|2022-01-15|2022|
+------+----+----------+------+----------+----+



# Group By and Aggregation
7.Write a PySpark code to group employees by department and count the number of employees in each department.# 

In [73]:
df.groupBy('department').agg(count('name')).alias("no_of_employees")

department,count(name)
IT,5
HR,3
Finance,2


In [75]:
spark.sql("select department,count(name) as no_of_employees from DF group by department")

department,no_of_employees
IT,5
HR,3
Finance,2


# String Operations
8.Write a PySpark code to create a new column full_name that concatenates name with the department, separated by a hyphen.

In [84]:
#lit(" - ") creates a literal value for the hyphen and space between the name and department columns.
df.withColumn("full_name",concat('name',lit(" - "),'department'))

emp_id,name,department,salary,join_date,full_name
1,John,IT,60000,2022-01-15,John - IT
2,Jane,HR,55000,2021-03-22,Jane - HR
3,Mike,IT,70000,2022-06-18,Mike - IT
4,Sara,Finance,75000,2020-11-30,Sara - Finance
5,Amy,HR,58000,2021-05-10,Amy - HR
6,Tom,IT,65000,2023-02-25,Tom - IT
7,Lisa,Finance,72000,2022-07-14,Lisa - Finance
8,Mark,IT,68000,2021-12-01,Mark - IT
9,Eva,HR,59000,2020-09-10,Eva - HR
10,John,IT,60000,2022-01-15,John - IT


In [83]:
spark.sql("select *,concat(name,' - ',department) as full_name \
          from DF")

emp_id,name,department,salary,join_date,full_name
1,John,IT,60000,2022-01-15,John - IT
2,Jane,HR,55000,2021-03-22,Jane - HR
3,Mike,IT,70000,2022-06-18,Mike - IT
4,Sara,Finance,75000,2020-11-30,Sara - Finance
5,Amy,HR,58000,2021-05-10,Amy - HR
6,Tom,IT,65000,2023-02-25,Tom - IT
7,Lisa,Finance,72000,2022-07-14,Lisa - Finance
8,Mark,IT,68000,2021-12-01,Mark - IT
9,Eva,HR,59000,2020-09-10,Eva - HR
10,John,IT,60000,2022-01-15,John - IT


# Duplicate Rows
9.Write a PySpark code to drop duplicate rows based on the emp_id column.

In [88]:
#no duplicates in emp_id
df.drop_duplicates(["emp_id"])

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
2,Jane,HR,55000,2021-03-22
3,Mike,IT,70000,2022-06-18
4,Sara,Finance,75000,2020-11-30
5,Amy,HR,58000,2021-05-10
6,Tom,IT,65000,2023-02-25
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
9,Eva,HR,59000,2020-09-10
10,John,IT,60000,2022-01-15


In [93]:
#one duplicate in name so removed and remaining data is printed
df.dropDuplicates(["name"]).orderBy("emp_id")

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
2,Jane,HR,55000,2021-03-22
3,Mike,IT,70000,2022-06-18
4,Sara,Finance,75000,2020-11-30
5,Amy,HR,58000,2021-05-10
6,Tom,IT,65000,2023-02-25
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
9,Eva,HR,59000,2020-09-10


In [109]:
spark.sql(" WITH ranked_employees AS (\
        SELECT *,\
               ROW_NUMBER() OVER (PARTITION BY name ORDER BY emp_id) AS rn\
        FROM DF\
    )\
    SELECT emp_id, name, department, salary, join_date\
    FROM ranked_employees\
    WHERE rn = 1 order by emp_id")

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
2,Jane,HR,55000,2021-03-22
3,Mike,IT,70000,2022-06-18
4,Sara,Finance,75000,2020-11-30
5,Amy,HR,58000,2021-05-10
6,Tom,IT,65000,2023-02-25
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
9,Eva,HR,59000,2020-09-10


# Joining DataFrames
10.Write a PySpark code to join this DataFrame with another DataFrame df2 on the emp_id column.

In [110]:
data2 = [
    (1, "Manager"),
    (2, "Specialist"),
    (3, "Developer"),
    (4, "Analyst"),
    (5, "Consultant"),
    (6, "Lead"),
    (7, "Associate"),
    (8, "Manager"),
    (9, "Executive"),
    (10, "Manager")
]

# Create DataFrame df2
columns2 = ["emp_id", "position"]
df2 = spark.createDataFrame(data2, columns2)

In [111]:
df2.createOrReplaceTempView("DF2")

In [112]:
df2

emp_id,position
1,Manager
2,Specialist
3,Developer
4,Analyst
5,Consultant
6,Lead
7,Associate
8,Manager
9,Executive
10,Manager


In [117]:
df.join(df2,on='emp_id',how='inner')

emp_id,name,department,salary,join_date,position
1,John,IT,60000,2022-01-15,Manager
2,Jane,HR,55000,2021-03-22,Specialist
3,Mike,IT,70000,2022-06-18,Developer
4,Sara,Finance,75000,2020-11-30,Analyst
5,Amy,HR,58000,2021-05-10,Consultant
6,Tom,IT,65000,2023-02-25,Lead
7,Lisa,Finance,72000,2022-07-14,Associate
8,Mark,IT,68000,2021-12-01,Manager
9,Eva,HR,59000,2020-09-10,Executive
10,John,IT,60000,2022-01-15,Manager


In [127]:
spark.sql("""select DF.emp_id,  
        DF.name, 
        DF.department, 
        DF.salary, 
        DF.join_date,
        DF2.position from DF left join DF2 on DF.emp_id==DF2.emp_id""")

emp_id,name,department,salary,join_date,position
1,John,IT,60000,2022-01-15,Manager
2,Jane,HR,55000,2021-03-22,Specialist
3,Mike,IT,70000,2022-06-18,Developer
5,Amy,HR,58000,2021-05-10,Consultant
4,Sara,Finance,75000,2020-11-30,Analyst
6,Tom,IT,65000,2023-02-25,Lead
7,Lisa,Finance,72000,2022-07-14,Associate
8,Mark,IT,68000,2021-12-01,Manager
9,Eva,HR,59000,2020-09-10,Executive
10,John,IT,60000,2022-01-15,Manager


In [130]:
spark.sql("select * from DF left join DF2 on DF.emp_id==DF2.emp_id")

emp_id,name,department,salary,join_date,emp_id.1,position
1,John,IT,60000,2022-01-15,1,Manager
2,Jane,HR,55000,2021-03-22,2,Specialist
3,Mike,IT,70000,2022-06-18,3,Developer
5,Amy,HR,58000,2021-05-10,5,Consultant
4,Sara,Finance,75000,2020-11-30,4,Analyst
6,Tom,IT,65000,2023-02-25,6,Lead
7,Lisa,Finance,72000,2022-07-14,7,Associate
8,Mark,IT,68000,2021-12-01,8,Manager
9,Eva,HR,59000,2020-09-10,9,Executive
10,John,IT,60000,2022-01-15,10,Manager


# Handling Null Values
11.Write a PySpark code to fill null values in the salary column with the average salary of the department.


In [142]:
avg_salary_by_dept = df.groupBy("department").agg(avg("salary").alias("avg_salary"))

In [147]:
df_with_avg = df.join(avg_salary_by_dept, on="department", how="left")

In [148]:
df_filled = df_with_avg.withColumn(
    "salary",
    coalesce(col("salary"), col("avg_salary"))
).drop("avg_salary")

In [149]:
df_filled

department,emp_id,name,salary,join_date
IT,1,John,60000.0,2022-01-15
HR,2,Jane,55000.0,2021-03-22
IT,3,Mike,70000.0,2022-06-18
HR,5,Amy,58000.0,2021-05-10
Finance,4,Sara,75000.0,2020-11-30
IT,6,Tom,65000.0,2023-02-25
Finance,7,Lisa,72000.0,2022-07-14
IT,8,Mark,68000.0,2021-12-01
HR,9,Eva,59000.0,2020-09-10
IT,10,John,60000.0,2022-01-15


# Filtering with Conditions
12.Write a PySpark code to filter employees who either have a salary greater than 65000 or have joined in 2022.

In [158]:
df.filter((col('salary') > 65000) | (year(col('join_date')) == 2022))

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
3,Mike,IT,70000,2022-06-18
4,Sara,Finance,75000,2020-11-30
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
10,John,IT,60000,2022-01-15


In [159]:
spark.sql("select * from DF where salary > 65000 or year(join_date)=2022")

emp_id,name,department,salary,join_date
1,John,IT,60000,2022-01-15
3,Mike,IT,70000,2022-06-18
4,Sara,Finance,75000,2020-11-30
7,Lisa,Finance,72000,2022-07-14
8,Mark,IT,68000,2021-12-01
10,John,IT,60000,2022-01-15


# Window Functions
13.Write a PySpark code to add a column rank which ranks employees within each department based on their salary in descending order.

In [163]:
from pyspark.sql.window import Window
#Define the window specification
window_spec = Window.partitionBy("department").orderBy(col("salary").desc())

In [167]:
df.withColumn("row_number", row_number().over(window_spec))

emp_id,name,department,salary,join_date,row_number
4,Sara,Finance,75000,2020-11-30,1
7,Lisa,Finance,72000,2022-07-14,2
9,Eva,HR,59000,2020-09-10,1
5,Amy,HR,58000,2021-05-10,2
2,Jane,HR,55000,2021-03-22,3
3,Mike,IT,70000,2022-06-18,1
8,Mark,IT,68000,2021-12-01,2
6,Tom,IT,65000,2023-02-25,3
1,John,IT,60000,2022-01-15,4
10,John,IT,60000,2022-01-15,5


In [166]:
df.withColumn("rank", rank().over(window_spec))

emp_id,name,department,salary,join_date,rank
4,Sara,Finance,75000,2020-11-30,1
7,Lisa,Finance,72000,2022-07-14,2
9,Eva,HR,59000,2020-09-10,1
5,Amy,HR,58000,2021-05-10,2
2,Jane,HR,55000,2021-03-22,3
3,Mike,IT,70000,2022-06-18,1
8,Mark,IT,68000,2021-12-01,2
6,Tom,IT,65000,2023-02-25,3
1,John,IT,60000,2022-01-15,4
10,John,IT,60000,2022-01-15,4


In [170]:
spark.sql("""

select *,
rank() over(partition by department order by salary desc) as rn
from DF

""")

emp_id,name,department,salary,join_date,rn
4,Sara,Finance,75000,2020-11-30,1
7,Lisa,Finance,72000,2022-07-14,2
9,Eva,HR,59000,2020-09-10,1
5,Amy,HR,58000,2021-05-10,2
2,Jane,HR,55000,2021-03-22,3
3,Mike,IT,70000,2022-06-18,1
8,Mark,IT,68000,2021-12-01,2
6,Tom,IT,65000,2023-02-25,3
1,John,IT,60000,2022-01-15,4
10,John,IT,60000,2022-01-15,4


# Pivot Tables
14.Write a PySpark code to pivot the DataFrame to show the sum of salaries for each department by year of join.

In [171]:
# Extract the year from the join_date column
df_with_year = df.withColumn("join_year", year(col("join_date")))

# Pivot the DataFrame to show the sum of salaries for each department by year of join
pivot_df = df_with_year.groupBy("department").pivot("join_year").agg(sum("salary"))

In [174]:
df_with_year

emp_id,name,department,salary,join_date,join_year
1,John,IT,60000,2022-01-15,2022
2,Jane,HR,55000,2021-03-22,2021
3,Mike,IT,70000,2022-06-18,2022
4,Sara,Finance,75000,2020-11-30,2020
5,Amy,HR,58000,2021-05-10,2021
6,Tom,IT,65000,2023-02-25,2023
7,Lisa,Finance,72000,2022-07-14,2022
8,Mark,IT,68000,2021-12-01,2021
9,Eva,HR,59000,2020-09-10,2020
10,John,IT,60000,2022-01-15,2022


In [175]:
#The pivot function rearranges data,
#so that the unique values of a specified column become new column headers,
#and the corresponding values in another column are aggregated.
pivot_df

department,2020,2021,2022,2023
HR,59000.0,113000.0,,
Finance,75000.0,,72000.0,
IT,,68000.0,190000.0,65000.0


# Complex Aggregation
15.Write a PySpark code to calculate the maximum salary for each department and the percentage difference from the average salary in that department.

In [190]:
from pyspark.sql.types import *
avg_salary=df.groupBy('department').agg(avg('salary').cast(DoubleType()))

In [192]:
max_salary=df.groupBy('department').agg(max('salary').cast(DoubleType()))

In [195]:
joined_df=avg_salary.join(max_salary,on='department')

In [194]:
joined_df.withColumn(
    "percentage_difference",
    ((col("max_salary") - col("avg_salary")) / col("avg_salary")) * 100
)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `max_salary` cannot be resolved. Did you mean one of the following? [`department`, `CAST(max(salary) AS DOUBLE)`, `CAST(avg(salary) AS DOUBLE)`].;
'Project [department#530, CAST(avg(salary) AS DOUBLE)#5460, CAST(max(salary) AS DOUBLE)#5469, ((('max_salary - 'avg_salary) / 'avg_salary) * 100) AS percentage_difference#5480]
+- Project [department#530, CAST(avg(salary) AS DOUBLE)#5460, CAST(max(salary) AS DOUBLE)#5469]
   +- Join Inner, (department#530 = department#5474)
      :- Aggregate [department#530], [department#530, cast(avg(salary#531L) as double) AS CAST(avg(salary) AS DOUBLE)#5460]
      :  +- LogicalRDD [emp_id#528L, name#529, department#530, salary#531L, join_date#532], false
      +- Aggregate [department#5474], [department#5474, cast(max(salary#5475L) as double) AS CAST(max(salary) AS DOUBLE)#5469]
         +- LogicalRDD [emp_id#5472L, name#5473, department#5474, salary#5475L, join_date#5476], false


In [196]:
spark.stop()