## Installs

In [None]:
!pip install pyspark==3.1.2 -q
!pip install findspark -q

## Imports

In [271]:
import kagglehub
import findspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col, sum, when, udf
from pyspark.sql.types import IntegerType
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, to_date, datediff, current_date
from pyspark.sql.functions import explode
from datetime import datetime
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import warnings

## Utilities

In [272]:
def warn(*args, **kwargs):
    pass
findspark.init()

warnings.warn = warn
warnings.filterwarnings('ignore')

## Session Initialization

In [273]:
spark = SparkSession.builder.appName("PySpark_SQL_App").getOrCreate()

## Download the data file and Load the dataset


In [274]:
path = kagglehub.dataset_download("rhuebner/human-resources-data-set")
print("Path to dataset files:", path)

Path to dataset files: /root/.cache/kagglehub/datasets/rhuebner/human-resources-data-set/versions/4


In [275]:
hr_data = spark.read.csv(path, header=True, inferSchema=True) # Load CSV

#df = spark.read.parquet("path_to_file.parquet") # Load parquet
#df = spark.read.json("path_to_file.json") # Load JSON

## Create a temporary view

In [None]:
hr_data.createOrReplaceTempView("hr")

## Select Queries

In [276]:
spark.sql("SELECT * FROM hr").show(3)

+--------------------+-----+---------+---------------+--------+-----------+------+-----------+----------------------+------+-----+----------+--------------------+-----+----+--------+---+-----------+-----------+--------------+--------+----------+-----------------+-----------------+--------------------+-----------------+--------------+---------+-----------------+----------------+----------------+---------------+--------------------+--------------------------+--------------+--------+
|       Employee_Name|EmpID|MarriedID|MaritalStatusID|GenderID|EmpStatusID|DeptID|PerfScoreID|FromDiversityJobFairID|Salary|Termd|PositionID|            Position|State| Zip|     DOB|Sex|MaritalDesc|CitizenDesc|HispanicLatino|RaceDesc|DateofHire|DateofTermination|       TermReason|    EmploymentStatus|       Department|   ManagerName|ManagerID|RecruitmentSource|PerformanceScore|EngagementSurvey|EmpSatisfaction|SpecialProjectsCount|LastPerformanceReview_Date|DaysLateLast30|Absences|
+--------------------+-----+

In [277]:
spark.sql("SELECT * FROM hr where GenderID = 1").show(3)

+--------------------+-----+---------+---------------+--------+-----------+------+-----------+----------------------+------+-----+----------+--------------------+-----+----+--------+---+-----------+-----------+--------------+--------+----------+-----------------+-----------------+--------------------+-----------------+--------------+---------+-----------------+----------------+----------------+---------------+--------------------+--------------------------+--------------+--------+
|       Employee_Name|EmpID|MarriedID|MaritalStatusID|GenderID|EmpStatusID|DeptID|PerfScoreID|FromDiversityJobFairID|Salary|Termd|PositionID|            Position|State| Zip|     DOB|Sex|MaritalDesc|CitizenDesc|HispanicLatino|RaceDesc|DateofHire|DateofTermination|       TermReason|    EmploymentStatus|       Department|   ManagerName|ManagerID|RecruitmentSource|PerformanceScore|EngagementSurvey|EmpSatisfaction|SpecialProjectsCount|LastPerformanceReview_Date|DaysLateLast30|Absences|
+--------------------+-----+

In [278]:
spark.sql("SELECT distinct PerformanceScore FROM hr").show()

+-----------------+
| PerformanceScore|
+-----------------+
|Needs Improvement|
|          Exceeds|
|      Fully Meets|
|              PIP|
+-----------------+



In [279]:
spark.sql("SELECT avg(Salary) as AvgSal, max(Salary) as MaxSal, min(Salary) as MinSal FROM hr where GenderID =0 ").show()

+-----------------+------+------+
|           AvgSal|MaxSal|MinSal|
+-----------------+------+------+
|67786.72727272728|250000| 45046|
+-----------------+------+------+



In [280]:
spark.sql("SELECT count(GenderID), PerformanceScore FROM hr GROUP BY PerformanceScore ").show()

+---------------+-----------------+
|count(GenderID)| PerformanceScore|
+---------------+-----------------+
|             18|Needs Improvement|
|             37|          Exceeds|
|            243|      Fully Meets|
|             13|              PIP|
+---------------+-----------------+



In [292]:
spark.sql("SELECT * FROM hr WHERE SALARY BETWEEN 47000 AND 50000").show()

+--------------------+-----+---------+---------------+--------+-----------+------+-----------+----------------------+------+-----+----------+--------------------+-----+----+--------+---+-----------+-----------+--------------+--------------------+----------+-----------------+-----------------+--------------------+-----------------+------------------+---------+------------------+-----------------+----------------+---------------+--------------------+--------------------------+--------------+--------+
|       Employee_Name|EmpID|MarriedID|MaritalStatusID|GenderID|EmpStatusID|DeptID|PerfScoreID|FromDiversityJobFairID|Salary|Termd|PositionID|            Position|State| Zip|     DOB|Sex|MaritalDesc|CitizenDesc|HispanicLatino|            RaceDesc|DateofHire|DateofTermination|       TermReason|    EmploymentStatus|       Department|       ManagerName|ManagerID| RecruitmentSource| PerformanceScore|EngagementSurvey|EmpSatisfaction|SpecialProjectsCount|LastPerformanceReview_Date|DaysLateLast30|Ab

## Keep the streams running

In case of streaming data (incoming data), we can keep the streaming running

In [305]:
high_salary_stream = spark.sql("SELECT * FROM hr WHERE SALARY > 150000")
high_salary_stream.show()

+----------------+-----+---------+---------------+--------+-----------+------+-----------+----------------------+------+-----+----------+--------------------+-----+----+--------+---+-----------+-------------------+--------------+--------------------+----------+-----------------+-----------------+----------------+-----------------+------------------+---------+------------------+-----------------+----------------+---------------+--------------------+--------------------------+--------------+--------+
|   Employee_Name|EmpID|MarriedID|MaritalStatusID|GenderID|EmpStatusID|DeptID|PerfScoreID|FromDiversityJobFairID|Salary|Termd|PositionID|            Position|State| Zip|     DOB|Sex|MaritalDesc|        CitizenDesc|HispanicLatino|            RaceDesc|DateofHire|DateofTermination|       TermReason|EmploymentStatus|       Department|       ManagerName|ManagerID| RecruitmentSource| PerformanceScore|EngagementSurvey|EmpSatisfaction|SpecialProjectsCount|LastPerformanceReview_Date|DaysLateLast30|Ab

In [None]:
'''high_salary_stream_query = high_salary_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("High Salary") \
    .start() '''

In [None]:
# print("********Critical Temperature Values*******")
# high_salary_stream_query.awaitTermination()

## Stop the session

In [None]:
spark.stop()