1. Load the CSV Data

In [3]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder.appName("PySparkProject").getOrCreate()

# Load CSV data
df = spark.read.csv("data.csv", header=True, inferSchema=True)
df.show()

+---+------------+----+----------+------+-------------------+
| id|        name| age|department|salary|          join_date|
+---+------------+----+----------+------+-------------------+
|  1|    John Doe|  29|        IT| 70000|2020-01-15 00:00:00|
|  2|  Jane Smith|  34|        HR| 80000|2019-03-22 00:00:00|
|  3| Bob Johnson|null|   Finance| 60000|2018-06-12 00:00:00|
|  4| Alice White|  45|        IT|  null|2017-08-07 00:00:00|
|  5| Chris Evans|  23|        HR| 50000|2021-05-19 00:00:00|
|  6|        null|  31|   Finance| 75000|2020-11-01 00:00:00|
|  7|  Mary Brown|  27|        IT| 72000|2018-09-30 00:00:00|
|  8|James Wilson|  30|        HR| 85000|               null|
+---+------------+----+----------+------+-------------------+



2. Data Cleaning

Handle Missing Values: Replace missing values with appropriate defaults or drop rows.

In [4]:
# Replace missing age with the average age
from pyspark.sql.functions import mean

avg_age = df.select(mean(df['age'])).collect()[0][0]
df = df.na.fill({'age': avg_age})

# Replace missing salary with 0
df = df.na.fill({'salary': 0})

# Drop rows where name or join_date is missing
df = df.na.drop(subset=["name", "join_date"])
df.show()


+---+-----------+---+----------+------+-------------------+
| id|       name|age|department|salary|          join_date|
+---+-----------+---+----------+------+-------------------+
|  1|   John Doe| 29|        IT| 70000|2020-01-15 00:00:00|
|  2| Jane Smith| 34|        HR| 80000|2019-03-22 00:00:00|
|  3|Bob Johnson| 31|   Finance| 60000|2018-06-12 00:00:00|
|  4|Alice White| 45|        IT|     0|2017-08-07 00:00:00|
|  5|Chris Evans| 23|        HR| 50000|2021-05-19 00:00:00|
|  7| Mary Brown| 27|        IT| 72000|2018-09-30 00:00:00|
+---+-----------+---+----------+------+-------------------+



3. Data Validation

Ensure Data Types: Check and enforce the correct data types.

In [5]:
df.printSchema()

# Convert salary to integer if it's not already
from pyspark.sql.functions import col

df = df.withColumn("salary", col("salary").cast("integer"))
df.printSchema()


root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = false)
 |-- join_date: timestamp (nullable = true)

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- department: string (nullable = true)
 |-- salary: integer (nullable = false)
 |-- join_date: timestamp (nullable = true)



Check for Duplicate Records: Remove duplicates if any.

In [6]:
df = df.dropDuplicates()
df.show()


+---+-----------+---+----------+------+-------------------+
| id|       name|age|department|salary|          join_date|
+---+-----------+---+----------+------+-------------------+
|  3|Bob Johnson| 31|   Finance| 60000|2018-06-12 00:00:00|
|  2| Jane Smith| 34|        HR| 80000|2019-03-22 00:00:00|
|  4|Alice White| 45|        IT|     0|2017-08-07 00:00:00|
|  1|   John Doe| 29|        IT| 70000|2020-01-15 00:00:00|
|  7| Mary Brown| 27|        IT| 72000|2018-09-30 00:00:00|
|  5|Chris Evans| 23|        HR| 50000|2021-05-19 00:00:00|
+---+-----------+---+----------+------+-------------------+



4. Data Transformation

Create New Columns: Add a column for the year of joining.

In [7]:
from pyspark.sql.functions import year

df = df.withColumn("join_year", year(df["join_date"]))
df.show()


+---+-----------+---+----------+------+-------------------+---------+
| id|       name|age|department|salary|          join_date|join_year|
+---+-----------+---+----------+------+-------------------+---------+
|  3|Bob Johnson| 31|   Finance| 60000|2018-06-12 00:00:00|     2018|
|  2| Jane Smith| 34|        HR| 80000|2019-03-22 00:00:00|     2019|
|  4|Alice White| 45|        IT|     0|2017-08-07 00:00:00|     2017|
|  1|   John Doe| 29|        IT| 70000|2020-01-15 00:00:00|     2020|
|  7| Mary Brown| 27|        IT| 72000|2018-09-30 00:00:00|     2018|
|  5|Chris Evans| 23|        HR| 50000|2021-05-19 00:00:00|     2021|
+---+-----------+---+----------+------+-------------------+---------+



Group and Aggregate Data: Calculate the average salary by department.

In [8]:
avg_salary_df = df.groupBy("department").avg("salary")
avg_salary_df.show()


+----------+------------------+
|department|       avg(salary)|
+----------+------------------+
|        HR|           65000.0|
|   Finance|           60000.0|
|        IT|47333.333333333336|
+----------+------------------+



Filter Data: Find employees who joined after 2019.

In [9]:
recent_joins_df = df.filter(df["join_year"] > 2019)
recent_joins_df.show()


+---+-----------+---+----------+------+-------------------+---------+
| id|       name|age|department|salary|          join_date|join_year|
+---+-----------+---+----------+------+-------------------+---------+
|  1|   John Doe| 29|        IT| 70000|2020-01-15 00:00:00|     2020|
|  5|Chris Evans| 23|        HR| 50000|2021-05-19 00:00:00|     2021|
+---+-----------+---+----------+------+-------------------+---------+

