
## New York City 2022 Fiscal Year Salary Prediction

In [0]:
from pyspark.sql.types import  StructField, StructType, StringType, IntegerType,DateType, FloatType, DoubleType

In [0]:
filePath = "/FileStore/tables/Citywide_Payroll_Data__Fiscal_Year_-1.csv"

rawDF = spark.read.csv(filePath, header="true", inferSchema="true", multiLine="true", escape='"')

display(rawDF.head())

Row(Fiscal Year=2020, Payroll Number=17, Agency Name='OFFICE OF EMERGENCY MANAGEMENT', Last Name='BEREZIN', First Name='MIKHAIL', Mid Init=None, Agency Start Date=datetime.date(2015, 8, 10), Work Location Borough='BROOKLYN', Title Description='EMERGENCY PREPAREDNESS MANAGER', Leave Status as of June 30='ACTIVE', Base Salary=86005.0, Pay Basis='per Annum', Regular Hours=1820.0, Regular Gross Paid=84698.21, OT Hours=0.0, Total OT Paid=0.0, Total Other Pay=0.0)

### Data Cleaning


In [0]:
from pyspark.sql.functions import *

In [0]:
rawDF = rawDF.na.drop('any')

In [0]:
yeardf = rawDF.withColumn("Year", year(rawDF["Agency Start Date"]))

In [0]:
diffdf = yeardf.withColumn("Years_Working", col("Fiscal Year") - col("Year"))

In [0]:
hoursdf = diffdf[(diffdf['Regular Hours'] > 0) & (diffdf['OT Hours'] > 0) & (diffdf['Total OT Paid'] > 0) & (diffdf['Total Other Pay'] > 0) ]

In [0]:
presentdf = hoursdf[(hoursdf['Leave Status as of June 30'] == 'ACTIVE')]

In [0]:
display(presentdf.groupBy("Work Location Borough").count().orderBy(desc("count")))

Work Location Borough,count
MANHATTAN,128683
QUEENS,103441
BROOKLYN,97448
BRONX,54504
RICHMOND,18342
WESTCHESTER,869
ULSTER,353
SULLIVAN,226
DELAWARE,190
PUTNAM,59


In [0]:
rows = ["MANHATTAN", "QUEENS", "BROOKLYN", "BRONX", "RICHMOND"]

In [0]:
loc_df = presentdf.filter(col("Work Location Borough").isin(rows))

In [0]:
display(loc_df.groupBy("Agency Name").count().orderBy(desc("count")))

Agency Name,count
POLICE DEPARTMENT,158006
FIRE DEPARTMENT,59648
DEPARTMENT OF SANITATION,32321
DEPARTMENT OF CORRECTION,22034
DEPT OF PARKS & RECREATION,17454
NYC HOUSING AUTHORITY,17044
HRA/DEPT OF SOCIAL SERVICES,14602
ADMIN FOR CHILDREN'S SVCS,13969
DEPARTMENT OF TRANSPORTATION,12787
DEPARTMENT OF EDUCATION ADMIN,9777


In [0]:
agency_rows = ["POLICE DEPARTMENT","FIRE DEPARTMENT","DEPARTMENT OF SANITATION","DEPARTMENT OF CORRECTION","DEPT OF PARKS & RECREATION","NYC HOUSING AUTHORITY"]

In [0]:
agency_df = loc_df.filter(col("Agency Name").isin(agency_rows))

In [0]:
display(agency_df.groupBy("Title Description").count().orderBy(desc("count")))

Title Description,count
POLICE OFFICER,78588
FIREFIGHTER,32578
SANITATION WORKER,22572
CORRECTION OFFICER,16810
SCHOOL SAFETY AGENT,14556
SERGEANT-,13621
P.O. DA DET GR3,12316
LIEUTENANT,11245
EMERGENCY MEDICAL SPECIALIST-EMT,8934
CARETAKER,7317


In [0]:
title_rows = ["POLICE OFFICER", "FIREFIGHTER", "SANITATION WORKER", "CORRECTION OFFICER", "SCHOOL SAFETY AGENT","SERGEANT-","P.O. DA DET GR3","LIEUTENANT","EMERGENCY MEDICAL SPECIALIST-EMT","CARETAKER","CAPTAIN"]

In [0]:
job_df = agency_df.filter(col("Title Description").isin(title_rows))

In [0]:
display(job_df.summary())

summary,Fiscal Year,Payroll Number,Agency Name,Last Name,First Name,Mid Init,Work Location Borough,Title Description,Leave Status as of June 30,Base Salary,Pay Basis,Regular Hours,Regular Gross Paid,OT Hours,Total OT Paid,Total Other Pay,Year,Years_Working
count,223590.0,223590.0,223590,223590,223590,223590,223590,223590,223590,223590.0,223590,223590.0,223590.0,223590.0,223590.0,223590.0,223590.0,223590.0
mean,2019.9617693098976,166.16562458070575,,,,,,,,75378.97500782683,,2043.7847135406887,73918.59835077709,354.599840690548,18073.957837738617,12584.375584550891,2009.2992665146023,10.66250279529496
stddev,1.405692076336649,277.1495713967898,,,,,,,,23740.0365050578,,152.73283285877068,25501.21536604218,263.43931861633865,14975.625476924466,7532.994936903355,7.241558228058092,7.125440580287705
min,2018.0,56.0,DEPARTMENT OF CORRECTION,AADAMS,A,A,BRONX,CAPTAIN,ACTIVE,29636.0,per Annum,125.72,-684.53,0.08,1.89,0.02,1966.0,0.0
25%,2019.0,56.0,,,,,,,,50207.0,,2045.72,49635.86,159.3,6479.04,5903.99,2004.0,4.0
50%,2020.0,56.0,,,,,,,,85292.0,,2080.0,84346.17,305.1,14186.75,13480.84,2010.0,10.0
75%,2021.0,57.0,,,,,,,,85292.0,,2080.0,85058.23,482.48,26481.92,18017.72,2016.0,16.0
max,2022.0,996.0,POLICE DEPARTMENT,ZYTKOWICZ,ZYNAISHA,Z,RICHMOND,SERGEANT-,ACTIVE,176449.0,per Annum,2475.4,186505.23,3340.77,188594.12,99512.69,2022.0,54.0


In [0]:
zero_df = job_df[(job_df['Regular Gross Paid'] > 0)]

In [0]:
dup_df = zero_df.dropDuplicates()

In [0]:
columnsToKeep = ["Work Location Borough","Agency Name","Title Description","Regular Hours","OT Hours","Total OT Paid","Total Other Pay", "Years_Working","Regular Gross Paid"]

df = dup_df.select(columnsToKeep)

In [0]:
df.cache().count()
display(df)

Work Location Borough,Agency Name,Title Description,Regular Hours,OT Hours,Total OT Paid,Total Other Pay,Years_Working,Regular Gross Paid
MANHATTAN,POLICE DEPARTMENT,LIEUTENANT,2080.0,750.75,52691.31,21609.12,28,131514.59
MANHATTAN,POLICE DEPARTMENT,CAPTAIN,2080.0,190.5,15869.74,19861.11,17,168276.88
BROOKLYN,POLICE DEPARTMENT,LIEUTENANT,2080.0,650.33,49690.24,17137.17,18,131514.59
BROOKLYN,POLICE DEPARTMENT,LIEUTENANT,2080.0,739.17,39928.66,23249.97,14,117530.0
MANHATTAN,POLICE DEPARTMENT,LIEUTENANT,2080.0,203.83,19886.48,17125.8,21,131514.59
MANHATTAN,POLICE DEPARTMENT,LIEUTENANT,2080.0,213.83,12113.79,24564.61,24,131514.59
MANHATTAN,POLICE DEPARTMENT,SERGEANT-,2080.0,724.0,40351.2,18894.73,14,108768.97
MANHATTAN,POLICE DEPARTMENT,SERGEANT-,2080.0,594.67,41156.85,17680.79,17,108768.97
MANHATTAN,POLICE DEPARTMENT,SERGEANT-,2080.0,628.9,35599.09,23157.87,16,108768.97
MANHATTAN,POLICE DEPARTMENT,SERGEANT-,2080.0,555.67,41161.92,16860.99,13,105247.17


Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

Databricks visualization. Run in Databricks to view.

### Visualizations 


In [0]:
display(df.groupBy("Years_Working").agg(avg("Regular Gross Paid").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc()))


Years_Working,Avg_Salary
43,130762.79
42,115751.53222222225
40,109435.44341463414
41,109106.24818181815
39,108437.4842857143
37,108027.85376237628
36,107608.2099218751
35,105031.13360465114
38,104646.4848484848
27,102477.1090034762


Databricks visualization. Run in Databricks to view.

In [0]:
display(df.groupBy("Agency Name").agg(avg("Regular Gross Paid").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc()))


Agency Name,Avg_Salary
FIRE DEPARTMENT,77666.7815768527
DEPARTMENT OF CORRECTION,75723.51377640612
POLICE DEPARTMENT,75645.59088981565
DEPARTMENT OF SANITATION,66691.18563707253
NYC HOUSING AUTHORITY,36425.094031706976


Databricks visualization. Run in Databricks to view.

In [0]:
display(df.groupBy("Work Location Borough").agg(avg("Regular Gross Paid").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc()))


Work Location Borough,Avg_Salary
MANHATTAN,78694.98970116806
RICHMOND,76563.48916805787
QUEENS,73068.7869773628
BROOKLYN,71271.58703179717
BRONX,70571.50640989789


Databricks visualization. Run in Databricks to view.

In [0]:
display(df.groupBy("Title Description").agg(avg("Regular Gross Paid").alias("Avg_Salary")).orderBy(col("Avg_Salary").desc()))


Title Description,Avg_Salary
CAPTAIN,117025.41448644374
LIEUTENANT,114329.11427034235
SERGEANT-,105090.68398722564
P.O. DA DET GR3,97554.02021516724
FIREFIGHTER,77614.2160556204
CORRECTION OFFICER,72951.42560440215
POLICE OFFICER,69578.52396770465
SANITATION WORKER,66691.18563707253
EMERGENCY MEDICAL SPECIALIST-EMT,44540.60889411239
SCHOOL SAFETY AGENT,42936.64108683704


Databricks visualization. Run in Databricks to view.

In [0]:
table_name = "FISCAL_SALARY"
#noNullsDF.write.format("parquet").saveAsTable(table_name)

### Preprocessing Data


In [0]:
trainDF, testDF = df.randomSplit([.8, .2], seed=42)

In [0]:
print(testDF.count())
print(trainDF.count())

44616
178973


In [0]:
from pyspark.ml.feature import StringIndexer, VectorAssembler

categoricalCols = [field for (field, dataType) in trainDF.dtypes if dataType == "string"]

indexOutputCols = [x + "Index" for x in categoricalCols]
stringIndexer = StringIndexer(inputCols=categoricalCols, outputCols=indexOutputCols, handleInvalid="skip")

numericCols = [field for (field, dataType) in trainDF.dtypes if ((dataType == "double") & (field != "Gross Regular Paid"))]

assemblerInputs = indexOutputCols + numericCols
vecAssembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")

### Gradient Boosted Trees Regression

In [0]:
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [0]:
gbt = GBTRegressor(labelCol="Regular Gross Paid", maxIter=10, featuresCol="features")

In [0]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[stringIndexer,vecAssembler, gbt])


In [0]:
model = pipeline.fit(trainDF)

In [0]:
pred= model.transform(testDF)

In [0]:
pred.select("features", "Regular Gross Paid", "prediction").show()

+--------------------+------------------+------------------+
|            features|Regular Gross Paid|        prediction|
+--------------------+------------------+------------------+
|[3.0,3.0,3.0,1952...|           79532.7| 78788.80092707623|
|[3.0,3.0,3.0,2080...|           66949.1| 67276.81760499463|
|[3.0,2.0,2.0,1405...|           24998.8| 26736.25681986374|
|[3.0,2.0,2.0,2080...|          75836.92| 74272.00506463072|
|[3.0,2.0,2.0,2085...|          41096.53| 40589.49949711799|
|[3.0,1.0,8.0,705....|          12607.58|15185.352978628509|
|[3.0,1.0,8.0,2239...|          46380.46| 46087.76450023704|
|[3.0,1.0,1.0,2045...|          85058.22| 84910.08515817893|
|[3.0,1.0,1.0,2080...|          54838.72| 53515.18494073205|
|[3.0,1.0,1.0,2080...|          84831.14| 84800.27283982863|
|[3.0,1.0,1.0,2080...|          107815.2|105584.88409309446|
|[3.0,1.0,1.0,2080...|          72201.24| 69985.63745284909|
|[3.0,1.0,1.0,2080...|          85051.84| 84934.91966111492|
|[3.0,1.0,1.0,2080...|  

In [0]:
from pyspark.ml.tuning import ParamGridBuilder
paramGrid = ParamGridBuilder().addGrid(gbt.maxDepth, [2, 5]).addGrid(gbt.maxIter, [10, 100]).build()

evaluator = RegressionEvaluator(labelCol="Regular Gross Paid", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(pred)
r2 = evaluator.setMetricName("r2").evaluate(pred)

print(rmse)
print(r2)

1531.3997480772275
0.9963841700338091


In [0]:
import pyspark.sql.functions as F

residuals = pred.withColumn("residual", (F.col("Regular Gross Paid") - F.col("prediction")))
display(residuals.agg({'residual': 'mean'}))

avg(residual)
-1.5446005075490723


In [0]:
display(residuals.select("prediction", "residual"))

prediction,residual
78788.80092707623,743.8990729237703
67276.81760499463,-327.71760499462835
26736.25681986374,-1737.456819863739
74272.00506463072,1564.9149353692774
40589.49949711799,507.0305028820076
15185.352978628507,-2577.772978628509
46087.76450023704,292.6954997629582
84910.08515817893,148.13484182106913
53515.18494073205,1323.535059267953
84800.27283982863,30.867160171372237


Databricks visualization. Run in Databricks to view.


### Random Forest Regression

In [0]:
from pyspark.ml.regression import RandomForestRegressor
rf = RandomForestRegressor(labelCol="Regular Gross Paid", maxBins=40, seed=42)
pipeline = Pipeline(stages = [stringIndexer, vecAssembler, rf])

In [0]:
model = pipeline.fit(trainDF)

In [0]:
pred= model.transform(testDF)

In [0]:
evaluator = RegressionEvaluator(labelCol="Regular Gross Paid", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(pred)
r2 = evaluator.setMetricName("r2").evaluate(pred)

print(rmse)
print(r2)

3508.2342378381754
0.9810238535975079


In [0]:
pred.select("features", "Regular Gross Paid", "prediction").show()

+--------------------+------------------+------------------+
|            features|Regular Gross Paid|        prediction|
+--------------------+------------------+------------------+
|[3.0,3.0,3.0,1952...|           79532.7| 81063.53292932124|
|[3.0,3.0,3.0,2080...|           66949.1| 73402.04629638793|
|[3.0,2.0,2.0,1405...|           24998.8| 27215.88303701086|
|[3.0,2.0,2.0,2080...|          75836.92| 76607.77316028628|
|[3.0,2.0,2.0,2085...|          41096.53| 44515.82719220669|
|[3.0,1.0,8.0,705....|          12607.58| 17727.80255808838|
|[3.0,1.0,8.0,2239...|          46380.46| 46031.18789976569|
|[3.0,1.0,1.0,2045...|          85058.22| 85626.48860629727|
|[3.0,1.0,1.0,2080...|          54838.72|55372.622225957966|
|[3.0,1.0,1.0,2080...|          84831.14| 85860.06621383007|
|[3.0,1.0,1.0,2080...|          107815.2|102340.70575505587|
|[3.0,1.0,1.0,2080...|          72201.24| 76557.67371224613|
|[3.0,1.0,1.0,2080...|          85051.84| 87548.55901279824|
|[3.0,1.0,1.0,2080...|  

In [0]:
import pyspark.sql.functions as F

residuals = pred.withColumn("residual", (F.col("Regular Gross Paid") - F.col("prediction")))
display(residuals.agg({'residual': 'mean'}))

avg(residual)
-3.7160203827914913


In [0]:
display(residuals.select("prediction", "residual"))

prediction,residual
81063.53292932124,-1530.8329293212446
73402.04629638793,-6452.94629638792
27215.88303701086,-2217.08303701086
76607.77316028628,-770.8531602862786
44515.82719220669,-3419.2971922066936
17727.80255808838,-5120.2225580883805
46031.18789976569,349.2721002343096
85626.48860629727,-568.2686062972643
55372.622225957966,-533.9022259579651
85860.06621383007,-1028.9262138300692


Databricks visualization. Run in Databricks to view.

In [0]:
model.stages[-1].featureImportances

Out[193]: SparseVector(8, {0: 0.0, 1: 0.0146, 2: 0.0944, 3: 0.0189, 4: 0.0012, 5: 0.024, 6: 0.3197, 7: 0.5272})

In [0]:
featureImportance = model.stages[-1].featureImportances.toArray()
featureNames = map(lambda s: s.name, DF.schema.fields)
featureImportanceMap = zip(featureImportance, featureNames)

In [0]:
importancesDf = spark.createDataFrame(sc.parallelize(featureImportanceMap).map(lambda r: [r[1], float(r[0])]))

In [0]:
Feature_DF = importancesDf.withColumnRenamed("_1", "Features").withColumnRenamed("_2", "Importance")

display(Feature_DF.orderBy(desc("Importance")))

Features,Importance
Years_Working,0.5271753989933409
Total Other Pay,0.3197387531659057
Title Description,0.094446004029329
Total OT Paid,0.0239601200164581
Regular Hours,0.0188520097601898
Agency Name,0.0145967635537012
OT Hours,0.0012255581540514
Work Location Borough,5.392327023854675e-06


Databricks visualization. Run in Databricks to view.