# Data Analysis using Spark

In this project, we focus on mastering Spark SQL. We create a DataFrame from a CSV file and apply transformations and actions using Spark SQL.

- Task 1: Generate DataFrame from CSV data.
- Task 2: Define a schema for the data.
- Task 3: Display schema of DataFrame.
- Task 4: Create a temporary view.
- Task 5: Execute an SQL query.
- Task 6: Calculate Average Salary by Department.
- Task 7: Filter and Display IT Department Employees.
- Task 8: Add 10% Bonus to Salaries.
- Task 9: Find Maximum Salary by Age.
- Task 10: Self-Join on Employee Data.
- Task 11: Calculate Average Employee Age.
- Task 12: Calculate Total Salary by Department.
- Task 13: Sort Data by Age and Salary.
- Task 14: Count Employees in Each Department.
- Task 15: Filter Employees with the letter o in the Name.


### Preliminaries: Installing libraries and downloading data

Install the required libraries

In [1]:
!pip install pyspark
!pip install findspark

Collecting pyspark
  Downloading pyspark-3.4.3.tar.gz (311.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m311.4/311.4 MB[0m [31m1.6 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m32.7 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25h  Created wheel for pyspark: filename=pyspark-3.4.3-py2.py3-none-any.whl size=311885504 sha256=c317162eae69c6ff7c94e93ce21715677642093a507b4e62b99303c4f978a8c3
  Stored in directory: /home/jupyterlab/.cache/pip/wheels/37/bc/bb/77785f6fcd2c83e663647f73225b76f3a3d5fd00762d7daf6f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.7 pyspark-3.4.3
C

Download the required data file

In [2]:
# Download the CSV data first into a local `employees.csv` file
import wget
wget.download("https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBM-BD0225EN-SkillsNetwork/data/employees.csv")

'employees.csv'

### Importing Libraries

Importing the required libraries

In [3]:
import findspark
import warnings


def warn(*args, **kwargs):
    pass


# Suppress generated warnings
warnings.warn = warn
warnings.filterwarnings('ignore')

findspark.init()

from pyspark.sql import SparkSession

### Create a spark session

Ignore any warnings by SparkSession command

In [4]:
spark = SparkSession \
    .builder \
    .appName("Data Analysis using Spark") \
    .getOrCreate()

24/05/02 03:51:50 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


### Tasks


#### Task 1: Generate a Spark DataFrame from the CSV data

Our initial task involves reading the downloaded CSV file named `employees.csv` into a Spark DataFrame called `employees_df`.

* The `spark.read.csv` function loads the data into a dataframe
* The `header=True` indicates that there is a header row in our csv file
* The `inferSchema=True` tells spark to automatically determine the data types of the columns

In [5]:
# Read data from the "emp" CSV file and import it into a DataFrame variable named "employees_df"
employees_df = spark.read.csv("employees.csv", header=True, inferSchema=True)

# Display the dataframe content
employees_df.show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top

#### Task 2: Define a schema for the data
  
Next, we define a schema for the input data and then use the user-defined schema to read the CSV file named `employees.csv` into a Spark DataFrame called `employees_df`.

Here's my approach:
* Explore the data to understand the different data types present in each column
* Determine the appropriate data types for each column
* Define the schema using the `StructType` class in Spark and create a `StructField` for each column along with the column name, data type, and other properties
* Read the input file using the user-defined schema

In [6]:
# Define a Schema for the input data and read the file using the user-defined Schema
from pyspark.sql.types import StructType, StructField, IntegerType, StringType

schema = StructType([
    StructField("Emp_No", IntegerType(), False),
    StructField("Emp_Name", StringType(), False),
    StructField("Salary", IntegerType(), False),
    StructField("Age", IntegerType(), False),
    StructField("Department", StringType(), False),
])

# Create a dataframe on top of a CSV file
employees_df = (spark.read
    .format("csv")
    .schema(schema)
    .option("header", "true")
    .load("employees.csv")
)

# Display the dataframe content
employees_df.show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|
|   102|      Lex| 17000| 37| Marketing|
|   103|Alexander|  9000| 39| Marketing|
|   104|    Bruce|  6000| 38|        IT|
|   105|    David|  4800| 39|        IT|
|   106|    Valli|  4800| 38|     Sales|
|   107|    Diana|  4200| 35|     Sales|
|   108|    Nancy| 12008| 28|     Sales|
|   109|   Daniel|  9000| 35|        HR|
|   110|     John|  8200| 31| Marketing|
+------+---------+------+---+----------+
only showing top

#### Task 3: Display schema of DataFrame

We then display the structure of the `employees_df` DataFrame, including details about all columns and their associated data types.  


In [7]:
# Display all columns of the DataFrame, along with their respective data types
employees_df.printSchema()

root
 |-- Emp_No: integer (nullable = true)
 |-- Emp_Name: string (nullable = true)
 |-- Salary: integer (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Department: string (nullable = true)



#### Task 4: Create a temporary view

Here, we create a temporary view named `employees` for the `employees_df` DataFrame, enabling Spark SQL queries on the data. 


In [8]:
# Create a temporary view named "employees" for the DataFrame
employees_df.createOrReplaceTempView("employees")

#### Task 5: Execute an SQL query

Next, we compose and execute an SQL query to fetch the records from the `employees` view where the age of employees exceeds 30. Then, display the result of the SQL query, showcasing the filtered records.


In [9]:
# SQL query to fetch solely the records from the View where the age exceeds 30
spark.sql("SELECT * FROM employees WHERE Age > 30").show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   199|    Douglas|  2600| 34|     Sales|
|   200|   Jennifer|  4400| 36| Marketing|
|   201|    Michael| 13000| 32|        IT|
|   202|        Pat|  6000| 39|        HR|
|   203|      Susan|  6500| 36| Marketing|
|   205|    Shelley| 12008| 33|   Finance|
|   206|    William|  8300| 37|        IT|
|   100|     Steven| 24000| 39|        IT|
|   102|        Lex| 17000| 37| Marketing|
|   103|  Alexander|  9000| 39| Marketing|
|   104|      Bruce|  6000| 38|        IT|
|   105|      David|  4800| 39|        IT|
|   106|      Valli|  4800| 38|     Sales|
|   107|      Diana|  4200| 35|     Sales|
|   109|     Daniel|  9000| 35|        HR|
|   110|       John|  8200| 31| Marketing|
|   111|     Ismael|  7700| 32|        IT|
|   112|Jose Manuel|  7800| 34|        HR|
|   113|       Luis|  6900| 34|     Sales|
|   116|     Shelli|  2900| 37|   Finance|
+------+---

#### Task 6: Calculate Average Salary by Department

Compose an SQL query to retrieve the average salary of employees grouped by department. Then, display the result.

In [10]:
# SQL query to calculate the average salary of employees grouped by department
spark.sql(
    "SELECT Department, AVG(Salary) AS Avg_Salary "
    "FROM employees "
    "GROUP BY Department"
).show()



+----------+-----------------+
|Department|       Avg_Salary|
+----------+-----------------+
|     Sales|5492.923076923077|
|        HR|           5837.5|
|   Finance|           5730.8|
| Marketing|6633.333333333333|
|        IT|           7400.0|
+----------+-----------------+



                                                                                

#### Task 7: Filter and Display IT Department Employees

We apply a filter to the `employees_df` DataFrame to select records where the department is `'IT'` and display the filtered DataFrame.


In [11]:
# Apply a filter to select records where the department is 'IT'
employees_df.filter(employees_df["Department"] == "IT").show()

+------+--------+------+---+----------+
|Emp_No|Emp_Name|Salary|Age|Department|
+------+--------+------+---+----------+
|   198|  Donald|  2600| 29|        IT|
|   201| Michael| 13000| 32|        IT|
|   206| William|  8300| 37|        IT|
|   100|  Steven| 24000| 39|        IT|
|   104|   Bruce|  6000| 38|        IT|
|   105|   David|  4800| 39|        IT|
|   111|  Ismael|  7700| 32|        IT|
|   129|   Laura|  3300| 38|        IT|
|   132|      TJ|  2100| 34|        IT|
|   136|   Hazel|  2200| 29|        IT|
+------+--------+------+---+----------+



#### Task 8: Add 10% Bonus to Salaries

Add a new column `"SalaryAfterBonus"` to the DataFrame and calculate the new salary by adding a 10% bonus to each employee's salary.

In [12]:
from pyspark.sql.functions import col

# Add a new column "SalaryAfterBonus" with 10% bonus added to the original salary
employees_df.withColumn("SalaryAfterBonus", col("Salary") * 1.1).show()

+------+---------+------+---+----------+------------------+
|Emp_No| Emp_Name|Salary|Age|Department|  SalaryAfterBonus|
+------+---------+------+---+----------+------------------+
|   198|   Donald|  2600| 29|        IT|2860.0000000000005|
|   199|  Douglas|  2600| 34|     Sales|2860.0000000000005|
|   200| Jennifer|  4400| 36| Marketing|            4840.0|
|   201|  Michael| 13000| 32|        IT|14300.000000000002|
|   202|      Pat|  6000| 39|        HR| 6600.000000000001|
|   203|    Susan|  6500| 36| Marketing| 7150.000000000001|
|   204|  Hermann| 10000| 29|   Finance|           11000.0|
|   205|  Shelley| 12008| 33|   Finance|13208.800000000001|
|   206|  William|  8300| 37|        IT|            9130.0|
|   100|   Steven| 24000| 39|        IT|26400.000000000004|
|   101|    Neena| 17000| 27|     Sales|           18700.0|
|   102|      Lex| 17000| 37| Marketing|           18700.0|
|   103|Alexander|  9000| 39| Marketing|            9900.0|
|   104|    Bruce|  6000| 38|        IT|

#### Task 9: Find Maximum Salary by Age

Group the data by age and calculate the maximum salary for each age group. Then, display the result.


In [13]:
from pyspark.sql.functions import max

# Group data by age and calculate the maximum salary for each age group
employees_df.groupby(["Age"]) \
    .agg(max("Salary").alias("Max_Salary")) \
    .sort("Age") \
    .show()



+---+----------+
|Age|Max_Salary|
+---+----------+
| 26|      3600|
| 27|     17000|
| 28|     12008|
| 29|     10000|
| 30|      8000|
| 31|      8200|
| 32|     13000|
| 33|     12008|
| 34|      7800|
| 35|      9000|
| 36|      7900|
| 37|     17000|
| 38|      6000|
| 39|     24000|
+---+----------+



                                                                                

#### Task 10: Self-Join on Employee Data

Next, we join the "employees_df" DataFrame with itself based on the "Emp_No" column and display the result.

In [14]:
# Join the DataFrame with itself based on the "Emp_No" column
employees_df.join(employees_df, "Emp_No", "inner").show()

+------+---------+------+---+----------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+---------+------+---+----------+
|   198|   Donald|  2600| 29|        IT|   Donald|  2600| 29|        IT|
|   199|  Douglas|  2600| 34|     Sales|  Douglas|  2600| 34|     Sales|
|   200| Jennifer|  4400| 36| Marketing| Jennifer|  4400| 36| Marketing|
|   201|  Michael| 13000| 32|        IT|  Michael| 13000| 32|        IT|
|   202|      Pat|  6000| 39|        HR|      Pat|  6000| 39|        HR|
|   203|    Susan|  6500| 36| Marketing|    Susan|  6500| 36| Marketing|
|   204|  Hermann| 10000| 29|   Finance|  Hermann| 10000| 29|   Finance|
|   205|  Shelley| 12008| 33|   Finance|  Shelley| 12008| 33|   Finance|
|   206|  William|  8300| 37|        IT|  William|  8300| 37|        IT|
|   100|   Steven| 24000| 39|        IT|   Steven| 24000| 39|        IT|
|   101|    Neena| 17000| 27|     Sales|    Neena| 

#### Task 11: Calculate Average Employee Age

Calculate the average age of employees using the built-in aggregation function and display the result.

In [15]:
# Calculate the average age of employees
from pyspark.sql.functions import avg

employees_df.agg(avg("Age").alias("Avg_Age")).show()

+-------+
|Avg_Age|
+-------+
|  33.56|
+-------+



#### Task 12: Calculate Total Salary by Department

Calculate the total salary for each department using the built-in aggregation function and display the result.

In [16]:
# Calculate the total salary for each department.
# Hint - Use GroupBy and Aggregate functions
from pyspark.sql.functions import sum 

employees_df.groupBy("Department") \
    .agg(sum("Salary") \
    .alias("Total_Salary")) \
    .show()

                                                                                

+----------+------------+
|Department|Total_Salary|
+----------+------------+
|     Sales|       71408|
|        HR|       46700|
|   Finance|       57308|
| Marketing|       59700|
|        IT|       74000|
+----------+------------+



                                                                                

#### Task 13: Sort Data by Age and Salary

Sort the DataFrame by age in ascending order and then by salary in descending order. Then, display the sorted DataFrame.


In [17]:
# Sort the DataFrame by age in ascending order and then by salary
# in descending order
employees_df.sort(["Age", "Salary"], ascending=[True, False]).show()

+------+---------+------+---+----------+
|Emp_No| Emp_Name|Salary|Age|Department|
+------+---------+------+---+----------+
|   137|   Renske|  3600| 26| Marketing|
|   101|    Neena| 17000| 27|     Sales|
|   114|      Den| 11000| 27|   Finance|
|   108|    Nancy| 12008| 28|     Sales|
|   130|    Mozhe|  2800| 28| Marketing|
|   126|    Irene|  2700| 28|        HR|
|   204|  Hermann| 10000| 29|   Finance|
|   115|Alexander|  3100| 29|   Finance|
|   134|  Michael|  2900| 29|     Sales|
|   198|   Donald|  2600| 29|        IT|
|   140|   Joshua|  2500| 29|   Finance|
|   136|    Hazel|  2200| 29|        IT|
|   120|  Matthew|  8000| 30|        HR|
|   110|     John|  8200| 31| Marketing|
|   127|    James|  2400| 31|        HR|
|   201|  Michael| 13000| 32|        IT|
|   111|   Ismael|  7700| 32|        IT|
|   119|    Karen|  2500| 32|   Finance|
|   205|  Shelley| 12008| 33|   Finance|
|   124|    Kevin|  5800| 33| Marketing|
+------+---------+------+---+----------+
only showing top

#### Task 14: Count Employees in Each Department

Calculate the number of employees in each department and display the result.


In [18]:
from pyspark.sql.functions import count

# Calculate the number of employees in each department
employees_df.groupBy("Department") \
    .agg(count("*").alias("Emp_Count")) \
    .show()

                                                                                

+----------+---------+
|Department|Emp_Count|
+----------+---------+
|     Sales|       13|
|        HR|        8|
|   Finance|       10|
| Marketing|        9|
|        IT|       10|
+----------+---------+



                                                                                

#### Task 15: Filter Employees with the letter 'o' in the Name

Apply a filter to select records where the employee’s name contains the letter `‘o’` and display the filtered DataFrame.


In [19]:
# Apply a filter to select records where the employee's name
# contains the letter 'o'
employees_df.filter(col("Emp_Name").contains("o")).show()

+------+-----------+------+---+----------+
|Emp_No|   Emp_Name|Salary|Age|Department|
+------+-----------+------+---+----------+
|   198|     Donald|  2600| 29|        IT|
|   199|    Douglas|  2600| 34|     Sales|
|   110|       John|  8200| 31| Marketing|
|   112|Jose Manuel|  7800| 34|        HR|
|   130|      Mozhe|  2800| 28| Marketing|
|   133|      Jason|  3300| 38|     Sales|
|   139|       John|  2700| 36|     Sales|
|   140|     Joshua|  2500| 29|   Finance|
+------+-----------+------+---+----------+



### Stop Spark Session

In [20]:
spark.stop()

## Change Log

|Date (YYYY-MM-DD)|Version|Changed By|Change Description|
|-|-|-|-|
|2023-09-01|0.1|Lavanya T S|Initial version|
|2023-09-11|0.2|Pornima More|QA pass with edits|
|2024-04-29|0.3|Pravin Regismond|Modified to fulfill the project requirements|

Copyright © 2023 IBM Corporation. All rights reserved.