# Spark HR Data Pipeline Project

## Prerequisites
1. Install required libraries and prepare spark environment

In [1]:
# Installing requuired packages
%pip install pyspark  findspark wget


Defaulting to user installation because normal site-packages is not writeable
Collecting pyspark
  Downloading pyspark-4.0.1.tar.gz (434.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m434.2/434.2 MB[0m [31m37.2 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting findspark
  Using cached findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Collecting wget
  Downloading wget-3.2.zip (10 kB)
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting py4j==0.10.9.9 (from pyspark)
  Using cached py4j-0.10.9.9-py2.py3-none-any.whl.metadata (1.3 kB)
Using cached py4j-0.10.9.9-py2.py3-none-any.whl (203 kB)
Using cached findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)


In [4]:
import findspark

findspark.init()

In [None]:
# PySpark is the Spark API for Python. We use PySpark to initialize the SparkContext.

from pyspark import SparkContext, SparkConf

from pyspark.sql import SparkSession

In [None]:
# Creating a SparkContext object
sc = SparkContext.getOrCreate()

# Creating a SparkSession

spark = SparkSession.builder.appName(
    "Python Spark HR Data Pipeline Project"
).getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/09/10 15:32:52 WARN Utils: Your hostname, maishuji, resolves to a loopback address: 127.0.1.1; using 192.168.0.14 instead (on interface wlp4s0)
25/09/10 15:32:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/10 15:32:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/10 15:32:53 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


2. Download the CSV data

In [None]:
# Download the CSV data first into a local `employees.csv` file
import wget

wget.download(
    "https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv"
)

'employees.csv'

## Tasks

1. Generate a Spark DataFrame from the CSV data

In [7]:
# Read data from the "employees" CSV file and import it into a DataFrame variable named "employees_df"
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)
employees_df.show()


+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top

2. Define q schema for the data

In [None]:
# Lets first print the inferred schema
employees_df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



In [None]:
from pyspark.sql.types import (
    StructType,
    StructField,
    StringType,
    IntegerType,
    DoubleType,
)

# Now we define an improved schema with precise data types
employees_schema_improved = StructType(
    [
    StructField("Emp_No", IntegerType(), True),
    StructField("Emp_Name", StringType(), True),
    StructField("Salary", DoubleType(), True),  # Matches CSV header
    StructField("Age", IntegerType(), True),
    StructField("Department", StringType(), True)  # Matches CSV header
    ]
)

In [21]:
# Apply the new schema
employees_df = spark.read.csv(
    "employees.csv", schema=employees_schema_improved, header=True
)

In [None]:
# Rename columns to have consistent naming conventions
employees_df = employees_df \
    .withColumnRenamed("Emp_No", "employee_id") \
    .withColumnRenamed("Emp_Name", "employee_name") \
    .withColumnRenamed("Salary", "salary") \
    .withColumnRenamed("Department", "department_name") \
    .withColumnRenamed("Age", "age")

3. Dislay schema of DataFrame

In [22]:
employees_df.printSchema()

root
 |-- employee_id: integer (nullable = true)
 |-- employee_name: string (nullable = true)
 |-- salary: double (nullable = true)
 |-- age: integer (nullable = true)
 |-- department_name: string (nullable = true)



4. Create a temporary view

In [23]:
# Create a temporary view so that we can run SQL queries against the DataFrame
employees_df.createOrReplaceTempView("employees")

5. Execute an SQL query

In [24]:
# SQL query to fetch only the records from the view where the age is greater than 30
sql_query = "SELECT * FROM employees WHERE age > 30"
result_df = spark.sql(sql_query)
result_df.show()

+-----------+-------------+-------+---+---------------+
|employee_id|employee_name| salary|age|department_name|
+-----------+-------------+-------+---+---------------+
|        199|      Douglas| 2600.0| 34|          Sales|
|        200|     Jennifer| 4400.0| 36|      Marketing|
|        201|      Michael|13000.0| 32|             IT|
|        202|          Pat| 6000.0| 39|             HR|
|        203|        Susan| 6500.0| 36|      Marketing|
|        205|      Shelley|12008.0| 33|        Finance|
|        206|      William| 8300.0| 37|             IT|
|        100|       Steven|24000.0| 39|             IT|
|        102|          Lex|17000.0| 37|      Marketing|
|        103|    Alexander| 9000.0| 39|      Marketing|
|        104|        Bruce| 6000.0| 38|             IT|
|        105|        David| 4800.0| 39|             IT|
|        106|        Valli| 4800.0| 38|          Sales|
|        107|        Diana| 4200.0| 35|          Sales|
|        109|       Daniel| 9000.0| 35|         

25/09/10 16:10:01 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Emp_No, Emp_Name, Salary, Age, Department
 Schema: employee_id, employee_name, salary, age, department_name
Expected: employee_id but found: Emp_No
CSV file: file:///home/maishuji/Workplace/ibm-data-engineer/project-spark-processing/employees.csv


6. Calculate Average Salary by Department

In [25]:
sql_query_avg_salary = """
SELECT department_name, AVG(salary) AS average_salary
FROM employees
GROUP BY department_name
ORDER BY average_salary DESC
"""
avg_salary_df = spark.sql(sql_query_avg_salary)
avg_salary_df.show()

+---------------+-----------------+
|department_name|   average_salary|
+---------------+-----------------+
|             IT|           7400.0|
|      Marketing|6633.333333333333|
|             HR|           5837.5|
|        Finance|           5730.8|
|          Sales|5492.923076923077|
+---------------+-----------------+



25/09/10 16:10:05 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: Salary, Department
 Schema: salary, department_name
Expected: department_name but found: Department
CSV file: file:///home/maishuji/Workplace/ibm-data-engineer/project-spark-processing/employees.csv
