# Employee Data Processing with PySpark

In [29]:
from pyspark.sql.functions import col, when, lit, to_date, expr, col, add_months, array, avg, collect_list, collect_set
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, DoubleType, BooleanType, StructField, StructType

For **58 KB of data**, the processing requirements are very minimal. You don't need a large Spark cluster or heavy resource allocation. Here's the **optimal Spark configuration** for efficiently handling such a small dataset:

---

### **📌 Recommended Spark Config for 58 KB Data**
```bash
spark-submit \
  --conf spark.executor.instances=1 \
  --conf spark.executor.cores=1 \
  --conf spark.executor.memory=512m \
  --conf spark.executor.memoryOverhead=256m \
  --conf spark.driver.memory=1g \
  --conf spark.sql.shuffle.partitions=2 \
  --conf spark.dynamicAllocation.enabled=false
```

---

### **📖 Explanation of Configurations:**
| **Parameter** | **Value** | **Why?** |
|--------------|----------|----------|
| `spark.executor.instances=1` | 1 Executor | You don’t need multiple executors for such a small dataset. |
| `spark.executor.cores=1` | 1 Core | Single-core processing is sufficient for 58 KB. |
| `spark.executor.memory=512m` | 512MB Memory | More than enough memory to process 58 KB. |
| `spark.executor.memoryOverhead=256m` | 256MB | Small overhead for additional tasks. |
| `spark.driver.memory=1g` | 1GB | Since Spark operates in local mode, the driver needs some memory. |
| `spark.sql.shuffle.partitions=2` | 2 Partitions | Default is 200, which is overkill for 58 KB; reducing to 2 speeds up processing. |
| `spark.dynamicAllocation.enabled=false` | Disabled | No need for dynamic scaling for small data. |

---

### **🔥 Additional Optimizations**
- **Run in local mode** (since data is tiny):
  ```bash
  spark-submit --master local[1] <your_script.py>
  ```
  

# Data Ingestion
- Load employee data from CSV/Parquet.
- Define schema using StructType.
- Read the data into a Spark DataFrame

In [30]:
# creating spark session for this application

spark = SparkSession.builder.appName("Employeee").config("spark.master", "local[*]").config("spark.executor.instances", "1").config("spark.driver.memory","512m").config("spark.executor.core","1").config("spark.executor.memoryOverhead","256m").config("spark.sql.shuffle.partitions","2").config("spark.dynamicAllocation.enabled","false").getOrCreate()


In [31]:
#define the srcture of the columns
schema = StructType([
    StructField("First_Name", StringType(), True),
    StructField("Gender", StringType(), True),
    StructField("Start_Date", StringType(), True),
    StructField("Last_login_Time", StringType(), True),
    StructField("Salary", IntegerType(), True),
    StructField("Bonus", DoubleType(), True),
    StructField("Senior_Management", BooleanType(), True),
    StructField("Team", StringType(), True),
])
df = spark.read.format("csv").option('header', "true").schema(schema).load("/mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv")

In [32]:
df.show()


+----------+------+----------+---------------+------+------+-----------------+--------------------+
|First_Name|Gender|Start_Date|Last_login_Time|Salary| Bonus|Senior_Management|                Team|
+----------+------+----------+---------------+------+------+-----------------+--------------------+
|   Douglas|  Male|  8/6/1993|       12:42 PM| 97308| 6.945|             true|           Marketing|
|    Thomas|  Male| 3/31/1996|        6:53 AM| 61933|  4.17|             true|                null|
|     Maria|Female| 4/23/1993|       11:17 AM|130590|11.858|            false|             Finance|
|     Jerry|  Male|  3/4/2005|        1:00 PM|138705|  9.34|             true|             Finance|
|     Larry|  Male| 1/24/1998|        4:47 PM|101004| 1.389|             true|     Client Services|
|    Dennis|  Male| 4/18/1987|        1:35 AM|115163|10.125|            false|               Legal|
|      Ruby|Female| 8/17/1987|        4:20 PM| 65476|10.012|             true|             Product|


25/02/05 04:59:07 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


# Data Cleaning & Transformation
- Convert Start Date and Last Login Time to proper date formats.
- Handle missing values (fillna for Salary, Bonus %, etc.).
- Remove duplicates.

In [33]:
# Convert date columns to DateType

# ===================== about to_date function
# Converts a ~pyspark.sql.Column into pyspark.sql.types.DateType 
# using the optionally specified format. Specify formats according to datetime pattern_.
# By default, it follows casting rules to pyspark.sql.types.DateType if the format is omitted. 
# Equivalent to col.cast("date").

df = df.withColumn("Formatted_Start_Date", to_date(df["Start_Date"], "M/d/yyyy")).fillna({"Bonus":0.0,"Salary":0, "Team":"Unknown", "Senior_Management":False})
df = df.drop_duplicates(list(df.columns))

# Feature Engineering
- Calculate Total Earnings (Salary + Bonus).
- Add a Probation End Date (Start Date + 3 months).
- Generate Full Name as an array column.

In [34]:
# df.withColumn("Total_Earning", expr("Salary + Bonus")).show()
# or
df = df.withColumn("Total_Earning", col("Salary") + (col("Salary") * col("Bonus") / 100))
df = df.withColumn("last_Probation", add_months(to_date(df['start_date'], 'M/d/yyyy'), 3))
df = df.withColumn("Full_name", array("First_Name", "Team"))




# Data Aggregation
- Calculate average salary per team.
- Collect team members using collect_list().
- Collect unique team members using collect_set().

In [35]:
df_aggregated  = df.groupBy("Team").agg(
    avg(df['salary']).alias("avg_Salary"),
    collect_list("First_Name").alias("Team_Members"),
    collect_set("First_Name").alias("Unique_Team_Members")
)

# df.groupBy("Team").agg(avg("Salary").alias("Avg Salary"))

In [36]:
df_final = df.join(df_aggregated, on="Team", how="left")


# Performance Optimization
- Cache DataFrame to avoid recomputation.
- Reduce shuffle partitions for small datasets (spark.sql.shuffle.partitions).

In [37]:
# Cache the DataFrame to optimize repeated operations
df_final.cache()

# Reduce shuffle partitions for small data
spark.conf.set("spark.sql.shuffle.partitions", "2")


25/02/05 04:59:07 WARN CacheManager: Asked to cache already cached data.


# Unit Testing for Data Quality
- Check for duplicate records.
- Check for null values in key columns.
- Ensure salary is non-negative.

In [38]:
df_final.columns

['Team',
 'First_Name',
 'Gender',
 'Start_Date',
 'Last_login_Time',
 'Salary',
 'Bonus',
 'Senior_Management',
 'Formatted_Start_Date',
 'Total_Earning',
 'last_Probation',
 'Full_name',
 'avg_Salary',
 'Team_Members',
 'Unique_Team_Members']

In [44]:
# Check for duplicate records
df_duplicates = df_final.groupBy(df_final.columns).count().filter(col("count") > 1)
assert df_duplicates.count() == 0, "Duplicates found in data!"

# Check for null values in key columns
assert df_final.filter(col("First_Name").isNull()).count() == 0, "Null values found in First Name!"
assert df.filter(col("Salary").isNull()).count() == 0, "Null values found in Salary!"

# # Check if salary is non-negative
assert df.filter(col("Salary") < 0).count() == 0, "Negative salaries found!"


25/02/05 04:59:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv
25/02/05 04:59:48 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


AssertionError: Null values found in First Name!

In [46]:
df.filter(col("First_Name").isNull()).show()


+----------+------+----------+---------------+------+------+-----------------+--------------------+--------------------+-------------+--------------+--------------------+
|First_Name|Gender|Start_Date|Last_login_Time|Salary| Bonus|Senior_Management|                Team|Formatted_Start_Date|Total_Earning|last_Probation|           Full_name|
+----------+------+----------+---------------+------+------+-----------------+--------------------+--------------------+-------------+--------------+--------------------+
|      null|Female| 7/20/2015|       10:43 AM| 45906|11.598|            false|             finance|          2015-07-20|  51230.17788|    2015-10-20|     [null, Finance]|
|      null|  Male| 8/21/1998|        2:27 PM|122340| 6.417|            false|             unknown|          1998-08-21|  130190.5578|    1998-11-21|     [null, Unknown]|
|      null|  Male| 1/29/2016|        2:33 AM|122173| 7.797|            false|     client services|          2016-01-29| 131698.82881|    2016-04

25/02/05 05:00:39 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


In [59]:
df = df.filter(col("Team").isNotNull())
df_aggregated = df_aggregated.filter(col("Team").isNotNull())
from pyspark.sql.functions import trim, lower

df = df.withColumn("Team", trim(lower(col("Team"))))
df_aggregated = df_aggregated.withColumn("Team", trim(lower(col("Team"))))
df_final = df.join(df_aggregated, on="Team", how="left")
df_final.select("Team", "Avg_Salary").show()
df_final.filter(col("Avg_Salary").isNull()).show()
df_final.dropDuplicates()

25/02/05 05:26:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


+--------------------+-----------------+
|                Team|       Avg_Salary|
+--------------------+-----------------+
|             unknown|90763.13953488372|
|             finance|92219.48039215687|
|         engineering|94269.19565217392|
|             unknown|90763.13953488372|
|             finance|92219.48039215687|
|             product| 88665.5052631579|
|     human resources|90944.52747252748|
|             product| 88665.5052631579|
|             product| 88665.5052631579|
|           marketing|90435.59183673469|
|     client services|88224.42452830188|
|     client services|88224.42452830188|
|     client services|88224.42452830188|
|               legal|89303.61363636363|
|         engineering|94269.19565217392|
|             product| 88665.5052631579|
|             unknown|90763.13953488372|
|business development|91866.31683168317|
|     client services|88224.42452830188|
|business development|91866.31683168317|
+--------------------+-----------------+
only showing top

25/02/05 05:26:32 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


+----+----------+------+----------+---------------+------+-----+-----------------+--------------------+-------------+--------------+---------+----------+------------+-------------------+
|Team|First_Name|Gender|Start_Date|Last_login_Time|Salary|Bonus|Senior_Management|Formatted_Start_Date|Total_Earning|last_Probation|Full_name|avg_Salary|Team_Members|Unique_Team_Members|
+----+----------+------+----------+---------------+------+-----+-----------------+--------------------+-------------+--------------+---------+----------+------------+-------------------+
+----+----------+------+----------+---------------+------+-----+-----------------+--------------------+-------------+--------------+---------+----------+------------+-------------------+



DataFrame[Team: string, First_Name: string, Gender: string, Start_Date: string, Last_login_Time: string, Salary: int, Bonus: double, Senior_Management: boolean, Formatted_Start_Date: date, Total_Earning: double, last_Probation: date, Full_name: array<string>, avg_Salary: double, Team_Members: array<string>, Unique_Team_Members: array<string>]

In [55]:
df.count()
# df_final.count()

25/02/05 05:16:31 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: First Name, Gender, Start Date, Last Login Time, Salary, Bonus %, Senior Management, Team
 Schema: First_Name, Gender, Start_Date, Last_login_Time, Salary, Bonus, Senior_Management, Team
Expected: First_Name but found: First Name
CSV file: file:///mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees.csv


1000

In [51]:
df_final.printSchema

<bound method DataFrame.printSchema of DataFrame[Team: string, First_Name: string, Gender: string, Start_Date: string, Last_login_Time: string, Salary: int, Bonus: double, Senior_Management: boolean, Formatted_Start_Date: date, Total_Earning: double, last_Probation: date, Full_name: array<string>, avg_Salary: double, Team_Members: array<string>, Unique_Team_Members: array<string>]>

# Save Processed Data
- Write processed data to Parquet format.

In [3]:
# df.repartition(1).write.format("parquet").mode("overwrite").save("/mount_folder/alpha/NYC_Taxi_Data_Pipeline_git/Practice/pyspark_functions/Employeee/employees_processed.parquet")

NameError: name 'df' is not defined