In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import DateType, StringType, FloatType, IntegerType
from pyspark.sql.window import Window
import pyspark.sql.functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("PySpark City Payroll Analysis") \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

In [3]:
nycdf = spark.read.option("header",True).csv("City_Employee_Payroll_New_York_City.csv")
ladf = spark.read.option("header",True).csv("City_Employee_Payroll_Los_Angeles.csv")

In [4]:
nycdf.printSchema()

root
 |-- Fiscal Year: string (nullable = true)
 |-- Payroll Number: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Mid Init: string (nullable = true)
 |-- Agency Start Date: string (nullable = true)
 |-- Work Location Borough: string (nullable = true)
 |-- Title Description: string (nullable = true)
 |-- Leave Status as of June 30: string (nullable = true)
 |-- Base Salary: string (nullable = true)
 |-- Pay Basis: string (nullable = true)
 |-- Regular Hours: string (nullable = true)
 |-- Regular Gross Paid: string (nullable = true)
 |-- OT Hours: string (nullable = true)
 |-- Total OT Paid: string (nullable = true)
 |-- Total Other Pay: string (nullable = true)



In [5]:
ladf.printSchema()

root
 |-- RECORD_NBR: string (nullable = true)
 |-- PAY_YEAR: string (nullable = true)
 |-- DEPARTMENT_NO: string (nullable = true)
 |-- DEPARTMENT_TITLE: string (nullable = true)
 |-- JOB_CLASS_PGRADE: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- EMPLOYMENT_TYPE: string (nullable = true)
 |-- JOB_STATUS: string (nullable = true)
 |-- MOU: string (nullable = true)
 |-- MOU_TITLE: string (nullable = true)
 |-- REGULAR_PAY: string (nullable = true)
 |-- OVERTIME_PAY: string (nullable = true)
 |-- ALL_OTHER_PAY: string (nullable = true)
 |-- TOTAL_PAY: string (nullable = true)
 |-- CITY_RETIREMENT_CONTRIBUTIONS: string (nullable = true)
 |-- BENEFIT_PAY: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)



In [6]:
nycdf.sample(False, 0.0002, 999).show(10)

+-----------+--------------+-----------------+-----------+----------+--------+-----------------+---------------------+--------------------+--------------------------+-----------+---------+-------------+------------------+--------+-------------+---------------+
|Fiscal Year|Payroll Number|      Agency Name|  Last Name|First Name|Mid Init|Agency Start Date|Work Location Borough|   Title Description|Leave Status as of June 30|Base Salary|Pay Basis|Regular Hours|Regular Gross Paid|OT Hours|Total OT Paid|Total Other Pay|
+-----------+--------------+-----------------+-----------+----------+--------+-----------------+---------------------+--------------------+--------------------------+-----------+---------+-------------+------------------+--------+-------------+---------------+
|       2020|            56|POLICE DEPARTMENT|     WALKER|     JAMAL|       H|       01/10/2005|             BROOKLYN|     P.O. DA DET GR3|                    ACTIVE|   96502.00|per Annum|         2080|          95892

In [12]:
nycdf = nycdf.withColumn('Agency Start Date', F.col('Agency Start Date').cast(DateType())) \
        .withColumn('Base Salary', F.col('Base Salary').cast(FloatType())) \
        .withColumn('Regular Hours', F.col('Regular Hours').cast(IntegerType())) \
        .withColumn('Regular Gross Paid', F.col('Regular Gross Paid').cast(FloatType())) \
        .withColumn('OT Hours', F.col('OT Hours').cast(FloatType())) \
        .withColumn('Total OT Paid', F.col('Total OT Paid').cast(FloatType())) \
        .withColumn('Total Other Pay', F.col('Total Other Pay').cast(FloatType()))

nycdf.printSchema()

root
 |-- Fiscal Year: string (nullable = true)
 |-- Payroll Number: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Last Name: string (nullable = true)
 |-- First Name: string (nullable = true)
 |-- Mid Init: string (nullable = true)
 |-- Agency Start Date: date (nullable = true)
 |-- Work Location Borough: string (nullable = true)
 |-- Title Description: string (nullable = true)
 |-- Leave Status as of June 30: string (nullable = true)
 |-- Base Salary: float (nullable = true)
 |-- Pay Basis: string (nullable = true)
 |-- Regular Hours: integer (nullable = true)
 |-- Regular Gross Paid: float (nullable = true)
 |-- OT Hours: float (nullable = true)
 |-- Total OT Paid: float (nullable = true)
 |-- Total Other Pay: float (nullable = true)



In [13]:
ladf.sample(False, 0.0002, 999).show(10)

+------------+--------+-------------+----------------+----------------+--------------------+---------------+----------+---+--------------------+-----------+------------+-------------+---------+-----------------------------+-----------+------+---------+
|  RECORD_NBR|PAY_YEAR|DEPARTMENT_NO|DEPARTMENT_TITLE|JOB_CLASS_PGRADE|           JOB_TITLE|EMPLOYMENT_TYPE|JOB_STATUS|MOU|           MOU_TITLE|REGULAR_PAY|OVERTIME_PAY|ALL_OTHER_PAY|TOTAL_PAY|CITY_RETIREMENT_CONTRIBUTIONS|BENEFIT_PAY|GENDER|ETHNICITY|
+------------+--------+-------------+----------------+----------------+--------------------+---------------+----------+---+--------------------+-----------+------------+-------------+---------+-----------------------------+-----------+------+---------+
|303532353731|    2016|           98| WATER AND POWER|          1539-5|            MGT ASST|      FULL_TIME|    ACTIVE|  4|ADMINISTRATIVE RE...|   79284.64|     3478.23|      1452.68| 84215.55|                      5233.00|   22763.98|FEMALE

In [15]:
ladf = ladf.withColumn('REGULAR_PAY', F.col('REGULAR_PAY').cast(FloatType())) \
        .withColumn('OVERTIME_PAY', F.col('OVERTIME_PAY').cast(FloatType())) \
        .withColumn('ALL_OTHER_PAY', F.col('ALL_OTHER_PAY').cast(FloatType())) \
        .withColumn('TOTAL_PAY', F.col('TOTAL_PAY').cast(FloatType())) \
        .withColumn('CITY_RETIREMENT_CONTRIBUTIONS', F.col('CITY_RETIREMENT_CONTRIBUTIONS').cast(FloatType())) \
        .withColumn('BENEFIT_PAY', F.col('BENEFIT_PAY').cast(FloatType()))

ladf.printSchema()

root
 |-- RECORD_NBR: string (nullable = true)
 |-- PAY_YEAR: string (nullable = true)
 |-- DEPARTMENT_NO: string (nullable = true)
 |-- DEPARTMENT_TITLE: string (nullable = true)
 |-- JOB_CLASS_PGRADE: string (nullable = true)
 |-- JOB_TITLE: string (nullable = true)
 |-- EMPLOYMENT_TYPE: string (nullable = true)
 |-- JOB_STATUS: string (nullable = true)
 |-- MOU: string (nullable = true)
 |-- MOU_TITLE: string (nullable = true)
 |-- REGULAR_PAY: float (nullable = true)
 |-- OVERTIME_PAY: float (nullable = true)
 |-- ALL_OTHER_PAY: float (nullable = true)
 |-- TOTAL_PAY: float (nullable = true)
 |-- CITY_RETIREMENT_CONTRIBUTIONS: float (nullable = true)
 |-- BENEFIT_PAY: float (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- ETHNICITY: string (nullable = true)

