In [0]:
%pyspark

In [1]:
var df = spark.read.format("csv").option("header","true").load("/YourMoney_Agency_Payroll.csv")

In [2]:

df.show(1000, true)

In [3]:
df.printSchema()

In [4]:
%spark

In [5]:

//Replace - for null on  Middlename column 

val middle_name_null =df.withColumn("MIDDLE_INITIAL",
        when(col("MIDDLE_INITIAL").equalTo(""), lit("none"))
                .otherwise(col("MIDDLE_INITIAL"))
)
// count()

In [6]:
//Only update nulls in salary_hourly_rate where record_type is detail one                
val salary_hourly_rate_null = middle_name_null.withColumn("SALARY_HOURLY_RATE",
  when(col("SALARY_HOURLY_RATE").isNull && col("RECORD_TYPE") === "MASTER", lit(0))
    .otherwise(col("SALARY_HOURLY_RATE"))
)
val master_ytd_regular_pay_null = salary_hourly_rate_null.withColumn("MASTER_YTD_REGULAR_PAY",
  when(col("MASTER_YTD_REGULAR_PAY").isNull && col("RECORD_TYPE") === "MASTER", lit(0))
    .otherwise(col("MASTER_YTD_REGULAR_PAY"))
)

val master_ytd_overtime_payments_null = master_ytd_regular_pay_null.withColumn("MASTER_YTD_OVERTIME_PAYMENTS",
  when(col("MASTER_YTD_OVERTIME_PAYMENTS").isNull && col("RECORD_TYPE") === "MASTER", lit(0))
    .otherwise(col("MASTER_YTD_OVERTIME_PAYMENTS"))
)

val master_ytd_all_other_payments_null = master_ytd_overtime_payments_null.withColumn("MASTER_YTD_ALL_OTHER_PAYMENTS",
  when(col("MASTER_YTD_ALL_OTHER_PAYMENTS").isNull && col("RECORD_TYPE") === "MASTER", lit(0))
    .otherwise(col("MASTER_YTD_ALL_OTHER_PAYMENTS"))
)

val master_ytd_earnings_null = master_ytd_all_other_payments_null.withColumn("MASTER_YTD_EARNINGS",
  when(col("MASTER_YTD_EARNINGS").isNull && col("RECORD_TYPE") === "MASTER", lit(0))
    .otherwise(col("MASTER_YTD_EARNINGS"))
)

val regular_pay_null = master_ytd_earnings_null.withColumn("REGULAR_PAY",
  when(col("REGULAR_PAY").isNull && col("RECORD_TYPE") === "DETAIL", lit(0))
    .otherwise(col("REGULAR_PAY"))
)

val supplemental_pay_null = regular_pay_null.withColumn("SUPPLEMENTAL_PAY",
  when(col("SUPPLEMENTAL_PAY").isNull && col("RECORD_TYPE") === "DETAIL", lit(0))
    .otherwise(col("SUPPLEMENTAL_PAY"))
)





In [7]:
// Save the cleaned data to a Hive table
supplemental_pay_null.write.saveAsTable("payroll_data_cleaned")

In [8]:
// Calculate total earnings
val total_earnings = supplemental_pay_null.withColumn("TOTAL_EARNINGS", col("REGULAR_PAY") + col("SUPPLEMENTAL_PAY") + col("ONE_TIME_PAYMENTS"))


In [9]:
//Group by calendar year and calculate the average total earnings

import org.apache.spark.sql.functions._

// Group by calendar year and calculate the average total earnings
val avg_earnings_by_year = total_earnings.groupBy("CALENDAR_YEAR").agg(avg("TOTAL_EARNINGS").alias("AVG_TOTAL_EARNINGS"))


In [10]:
// Apply a filter to include only records with salary/hourly rate greater than a certain threshold
val filtered_data = supplemental_pay_null.filter(col("SALARY_HOURLY_RATE") > 20)


In [11]:
// Show the filtered data
filtered_data.show()

In [12]:
// Calculate the average overtime payments per employee
val average_overtime_payments = supplemental_pay_null.groupBy("PAYROLL_ID").agg(avg("OVERTIME_PAYMENTS").alias("AVG_OVERTIME_PAYMENTS"))

In [13]:
// Identify employees with consistently high overtime payments
val high_overtime_employees = average_overtime_payments.filter(col("AVG_OVERTIME_PAYMENTS") > 1000)


In [14]:
// Calculate turnover rates per department
val turnover_rates = supplemental_pay_null.groupBy("MASTER_DEPARTMENT_AGENCY_DESC").agg((countDistinct("PAYROLL_ID") / countDistinct("PAYROLL_ID")).alias("TURNOVER_RATE"))

In [15]:
// Visualize employee earnings
total_earnings.createOrReplaceTempView("payroll_data")
val earnings_by_year = spark.sql("SELECT CALENDAR_YEAR, SUM(TOTAL_EARNINGS) AS TOTAL_EARNINGS FROM payroll_data GROUP BY CALENDAR_YEAR")


In [16]:
%sql
-- Visualize total earnings by year
SELECT CALENDAR_YEAR, SUM(TOTAL_EARNINGS) AS TOTAL_EARNINGS
FROM payroll_data
GROUP BY CALENDAR_YEAR

In [17]:
%spark
val df_cleaned = spark.table("payroll_data")


In [18]:
%spark.pyspark
import pandas as pd

df_pandas = df_cleaned.toPandas()


In [19]:
%spark.pyspark
