# PySpark Project - Analyzing HR Data

### Objective:

***The aim of this project is to analyze HR data regarding employee attrition & performance using PySpark (a Python API for Spark) to query and process data***

* **Loading data from CSV into a Spark DataFrame**
* **Analyze data in a Spark DataFrame using SQL**
* **Saving transformed data back to disk**

_The Dataset used in this project is available from:_

<u>_Note:_</u> The project is commented throughout to aid readability and support readers with different background knowledge to understand the code presented in this notebook

In [118]:
# Import relevant modules
# ! pip install pyspark # Uncomment this line if pyspark module is not installed
import os
from pyspark.sql import SparkSession

from pyspark.sql.functions import collect_set

In [58]:
def load_dataset(spark, path, **options):
    """
    Function to load a CSV file defined in the path with configuration options
    @params:
        - 'spark': the SparkSession object
        - 'path: the path to the dataset
        - 'options': configuration options for how the dataset should be loaded
    @return: DataFrame object
    """
    
    data = spark.read.options(**options).csv(path)
    
    return data

In [100]:
def get_distinct_values(df):
    """
    Function to count the number of unique values for each column in the Spark DataFrame
    @params:
        - 'df': the DataFrame to use
    @return: A Python Dictionary with the following format: {column name: number of unique values}
    """
    
    col_dist_values = {}
    
    for col in df.columns:
        # Use the 'collect_set' function to get all the unique values for each column
        num_unique_values = df.select(collect_set(col))
        # print(num_unique_values)  # DataFrame[collect_set(BusinessTravel): array<string>]
        # print(num_unique_values.first())  # Row(collect_set(BusinessTravel)=['Travel_Frequently', 'Non-Travel', 'Travel_Rarely'])
        
        # Get the length of the array stored in the Row object and add it to the dictionary with the name of the
        # column as the key
        col_dist_values[col] = len(num_unique_values.first()[0])
    
    return col_dist_values


In [75]:
def select_columns(df, columns, show=False):
    """
    Function to reduce dimension by selecting a subset of columns in the DataFrame
    @params:
        - 'df': the DataFrame to use
        - 'columns': the columns to be selected
        - 'show': whether or not to show the first 5 rows of the new DataFrame (default: False)
    @return: a new DataFrame object
    """
    
    subset_data = df.select(columns)
    
    if show:
        subset_data.show(5, truncate=False)
    
    return subset_data

In [109]:
def execute_query(spark, query, show=False):
    """
    Function to run the SQL query passed in as an argument using the current SparkSession
    @params:
        - 'spark': the current SparkSession object
        - 'query': the SQL query to be executed
        - 'show': whether or not to show the first 5 rows of the result of the query (default: False)
    @return: the DataFRame returned from the query
    """
    
    query_result = spark.sql(query)
    
    if show:
        query_result.show(5, truncate=False)
    
    return query_result

In [65]:
def save_dataset(df, path, mode='overwrite', _format='pq'):
    """
    Function to write a DataFrame to disk
    @params:
        - 'df': the DataFrame to be saved
        - 'path': the path where the data should be saved to
        - 'mode': defines what happens if the data already exists (default: overwrite)
        - '_format': the format the data should be saved in (default: parquet)
    """
    
    if _format == 'csv':
        df.write.csv(path, mode)
    else:
        df.write.parquet(path, mode)
    

In [119]:
def main():
    """
    The main function to execute the code
    """
    
    APP_NAME = 'PySpark Project - HR Data Processing'  # Define the name of the project
    DATASET_PATH = './Assets/Datasets/ibm_hr_analytics_employee_attrition_&_performance.csv' # NOTE: The dataset 
    # path might differ depending on where the dataset is saved
    WRITE_PATH = './Assets/Datasets/hr_data_processing/'  # Define the path where transformed data should be saved
    # to

    # 1. - LOADING DATA FROM CSV TO A SPARK DATAFRAME
    
    # Create new SparkSession and assign it to a variable 'spark' (this variable then can be used to reference the
    # SparkSession object later on in the code)
    spark = SparkSession.builder.appName(APP_NAME).getOrCreate()
    
    # Define configuration for loading in the dataset
    df_config = {
        'header': True,  # Indicate that the dataset already contains a header
        'delimiter': ',',  # Indicates that each column is separated by a comma (',') in the CSV
        'inferSchema': True  # Sample a subset of data to determine column types
    }
    
    # Call the 'load_dataset' function to load in the data
    hr_data = load_dataset(spark, DATASET_PATH, **df_config)
    
    hr_data.printSchema() # Check the DataFrame structure, columns, and data types
    
    # 2. ANALYZING DATA IN A SPARK DATAFRAME USING SQL
    
    # Check the number of distinct values for each column
    num_dist_values = get_distinct_values(hr_data)
    
    # Select all the columns that have the same value for evey observation in the DataFrame
    cols_to_drop = [col for col, count in num_dist_values.items() if count == 1]
    
    # print(f'Columns with variance of 0: {cols_to_drop}')
    
    # No extra information can be derived from columns with variance of 0 since it's the same for all observations,
    # hence these columns will be dropped - the 'EmployeeNumber' column will also be dropped since it only contains
    # internal administrative information
    hr_data = hr_data.drop('EmployeeNumber', *cols_to_drop)
    
    # print(hr_data.columns)
    
    # At this point, the DataFrame is ready to be analyzed with SQL, however if one wants to further reduce the
    # number of columns to work on, it can be done using the 'select_columns' function
   
    # columns_to_use = []  # List the name of the columns to use for analysis
    # hr_data = select_columns(columns_to_use)
    
    hr_data.createOrReplaceTempView('hr_data')  # Create a temporary view of the DataFrame in memory to allow to 
    # allow data manipulation on the data using SQL
    
    # Define queries
    query_1 = """
        SELECT Department, AVG(HourlyRate) FROM hr_data
        GROUP BY 1
        ORDER BY 2 DESC
    """
    
    query_2 = """
        SELECT Department, JobRole, COUNT(*) AS Total FROM hr_data
        WHERE JobSatisfaction == 4
        GROUP BY 1, 2
        ORDER BY 3 DESC
    """
    
    # Collect queries in a dictionary - (query name: query) format
    queries = {    
        'department_hourly': query_1,
        'department_satisfaction': query_2
    }
    
    # Loop through the query dictionary, execute the query and save it to disk 
    for df, query in queries.items():
        query_result = execute_query(spark, query)
        query_result.show(truncate=False)
        
        # 3. SAVING QUERY RESULTS TO DISK
        
        save_dataset(query_result, os.path.join(WRITE_PATH, df))
    
    # Stop the SparkSession
    spark.stop()

In [120]:
if __name__ == '__main__':
    main()

root
 |-- Age: integer (nullable = true)
 |-- Attrition: string (nullable = true)
 |-- BusinessTravel: string (nullable = true)
 |-- DailyRate: integer (nullable = true)
 |-- Department: string (nullable = true)
 |-- DistanceFromHome: integer (nullable = true)
 |-- Education: integer (nullable = true)
 |-- EducationField: string (nullable = true)
 |-- EmployeeCount: integer (nullable = true)
 |-- EmployeeNumber: integer (nullable = true)
 |-- EnvironmentSatisfaction: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- HourlyRate: integer (nullable = true)
 |-- JobInvolvement: integer (nullable = true)
 |-- JobLevel: integer (nullable = true)
 |-- JobRole: string (nullable = true)
 |-- JobSatisfaction: integer (nullable = true)
 |-- MaritalStatus: string (nullable = true)
 |-- MonthlyIncome: integer (nullable = true)
 |-- MonthlyRate: integer (nullable = true)
 |-- NumCompaniesWorked: integer (nullable = true)
 |-- Over18: string (nullable = true)
 |-- OverTime: string 

                                                                                

+----------------------+-------------------------+-----+
|Department            |JobRole                  |Total|
+----------------------+-------------------------+-----+
|Sales                 |Sales Executive          |112  |
|Research & Development|Research Scientist       |95   |
|Research & Development|Laboratory Technician    |80   |
|Research & Development|Healthcare Representative|43   |
|Research & Development|Manufacturing Director   |38   |
|Sales                 |Sales Representative     |23   |
|Research & Development|Research Director        |22   |
|Research & Development|Manager                  |17   |
|Human Resources       |Human Resources          |13   |
|Sales                 |Manager                  |12   |
|Human Resources       |Manager                  |4    |
+----------------------+-------------------------+-----+

