In [1]:
import numpy as np 
import matplotlib.pyplot as plt
import pandas as pd

In [2]:
from pyspark import SparkContext, SparkConf
cf = SparkConf()
cf.set("spark.submit.deployMode","client")
sc = SparkContext.getOrCreate(cf)
from pyspark.sql import SparkSession
spark = SparkSession \
	    .builder \
	    .appName("Python Spark SQL basic example") \
	    .config("spark.some.config.option", "some-value") \
	    .getOrCreate()
                          

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/04/27 21:55:51 INFO org.apache.spark.SparkEnv: Registering MapOutputTracker
23/04/27 21:55:51 INFO org.apache.spark.SparkEnv: Registering BlockManagerMaster
23/04/27 21:55:51 INFO org.apache.spark.SparkEnv: Registering BlockManagerMasterHeartbeat
23/04/27 21:55:51 INFO org.apache.spark.SparkEnv: Registering OutputCommitCoordinator


In [3]:
filepath = 'COVID-19_Relief_Paycheck_Protection_Program.csv'
relief = spark.read.csv(path = filepath, header=True)

23/04/27 21:56:19 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
23/04/27 21:56:34 WARN org.apache.spark.scheduler.cluster.YarnScheduler: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
                                                                                

In [4]:
relief.printSchema()

root
 |-- LoanNumber: string (nullable = true)
 |-- DateApproved_Year: string (nullable = true)
 |-- DateApproved_Month: string (nullable = true)
 |-- DateApproved_Day: string (nullable = true)
 |-- SBAOfficeCode: string (nullable = true)
 |-- ProcessingMethod: string (nullable = true)
 |-- BorrowerName: string (nullable = true)
 |-- BorrowerAddress: string (nullable = true)
 |-- BorrowerCity: string (nullable = true)
 |-- BorrowerState: string (nullable = true)
 |-- BorrowerZip: string (nullable = true)
 |-- LoanStatusDate_Year: string (nullable = true)
 |-- LoanStatusDate_Month: string (nullable = true)
 |-- LoanStatusDate_Day: string (nullable = true)
 |-- LoanStatus: string (nullable = true)
 |-- Term: string (nullable = true)
 |-- SBAGuarantyPercentage: string (nullable = true)
 |-- InitialApprovalAmount: string (nullable = true)
 |-- CurrentApprovalAmount: string (nullable = true)
 |-- UndisbursedAmount: string (nullable = true)
 |-- FranchiseName: string (nullable = true)
 |-- S

In [5]:
relief = relief.select("LoanNumber",
                       "DateApproved_Year",
                       "DateApproved_Month",
                       "LoanStatusDate_Year",
                       "LoanStatusDate_Month",
                       "LoanStatus",
                       "InitialApprovalAmount",
                       "CurrentApprovalAmount",
                       "UndisbursedAmount",
                       "FranchiseName",
                       "BusinessAgeDescription",
                       "ProjectCity",
                       "ProjectCountyName",
                       "ProjectState",
                       "ProjectZip",
                       "CD",
                       "JobsReported",
                       "UTILITIES_PROCEED",
                       "PAYROLL_PROCEED",
                       "MORTGAGE_INTEREST_PROCEED",
                       "RENT_PROCEED",
                       "REFINANCE_EIDL_PROCEED",
                       "HEALTH_CARE_PROCEED",
                       "DEBT_INTEREST_PROCEED",
                       "BusinessType",
                       "ForgivenessAmount",
                       "ForgivenessDate_Year",
                       "ForgivenessDate_Month")

In [6]:
#for querying
relief.createOrReplaceTempView("relief")

23/04/27 21:56:44 WARN org.apache.spark.sql.catalyst.util.package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [7]:
relief.count()

                                                                                

734833

In [8]:
#use master data to clean 
spark.sql("Select ProjectZip, Count(*) as count From Relief Group by ProjectZip ORDER BY count ASC").show()



+----------+-----+
|ProjectZip|count|
+----------+-----+
|13021-9412|    1|
|10468-8109|    1|
|10462-6525|    1|
|10035-4047|    1|
|10460-5677|    1|
|10453-4645|    1|
|10468-2717|    1|
|11372-6853|    1|
|11520-2539|    1|
|11232-4096|    1|
|10456-5607|    1|
|10035-4206|    1|
|12095-3763|    1|
|11550-6616|    1|
|11223-1379|    1|
|11237-3828|    1|
|11520-5138|    1|
|11224-3134|    1|
|12986-1637|    1|
|11706-2309|    1|
+----------+-----+
only showing top 20 rows



[Stage 6:>                                                          (0 + 9) / 9]                                                                                

In [9]:
#shorten zip to identifying first 5 #s
relief = relief.withColumn('ProjectZip', relief.ProjectZip.substr(1,5))

In [10]:
relief.select("ProjectZip").distinct().count()

                                                                                

2017

In [11]:
#aggregate dictionary of zip codes mapped to NYC boroughs
zips = {}
with open("zipcodes.txt") as f:
    for line in f:
        line = line.strip().split()
        for i in range(0,len(line),2):
            zips[line[i]] = line[i+1]
            
zips_lst = list(zips.keys())

#fetch all zip codes associated with borough
def borough_zips(zips, borough):
    lst = [key for key,value in zips.items() if value == borough]
    return lst

#zip codes by borough
manhattan_zips = borough_zips(zips, "Manhattan")
brooklyn_zips= borough_zips(zips, "Brooklyn")
queen_zips = borough_zips(zips, "Queens")
bronx_zips = borough_zips(zips, "Bronx")
statenisland_zips = borough_zips(zips, "Staten")



In [12]:
#filter out for only NYC cases
relief = relief.filter(relief["ProjectZip"].isin(zips_lst))

relief.count()

                                                                                

384579

In [13]:
from pyspark.sql.functions import when

relief = relief.withColumn("Borough", when(relief.ProjectZip.isin(manhattan_zips), "Manhattan")
                                     .when(relief.ProjectZip.isin(brooklyn_zips), "Brooklyn")
                                     .when(relief.ProjectZip.isin(queen_zips), "Queens")
                                     .when(relief.ProjectZip.isin(bronx_zips), "Bronx")
                                     .when(relief.ProjectZip.isin(statenisland_zips), "Staten Island"))

In [14]:
from pyspark.sql.types import IntegerType

#all columns associated with costs
costs = ["UTILITIES_PROCEED",
         "PAYROLL_PROCEED",
         "MORTGAGE_INTEREST_PROCEED",
         "RENT_PROCEED",
         "REFINANCE_EIDL_PROCEED",
         "HEALTH_CARE_PROCEED",
         "DEBT_INTEREST_PROCEED"]

#cast columns to Integer Type
for cost in costs:
    relief = relief.withColumn(cost, relief[cost].cast(IntegerType()))

In [15]:
#before
relief.filter(relief["UTILITIES_PROCEED"].isNull()).count()

                                                                                

251761

In [16]:
#fill all null values in cost columns with 0
relief = relief.fillna(value = 0, subset = costs)

In [17]:
#after
relief.filter(relief["UTILITIES_PROCEED"].isNull()).count()

0

In [18]:
from operator import add
from functools import reduce
relief = relief.withColumn("Total", reduce(add, [relief[cost] for cost in costs]))

In [19]:
small_businesses = relief.filter(relief["JobsReported"] <= 100)

In [21]:
small_businesses.write.option("header", "true").csv("cleaned_PPP.csv")

                                                                                