# AWS Glue PySpark Example - NYC Property Tax Protest Prediction
This jupyter notebook provides an example of using PySpark running on AWS Glue to conduct feature engineering, descriptive analysis, and finally run a machine learning (random forest) model on a dataset of New York City (NYC) real estate. The size of the dataset is modest, and the three files necessary to run this notebook can be found on my [Google Drive (as parquet files)](https://drive.google.com/drive/folders/1pribtFHJD81t9_h1DM2pUHZt5ud3Y1f0?usp=drive_link). 

These three datasets include two files each with one year of property tax rolls, which include information on each tax lot ("parcel") in the city, as well as a small dataset providing written description of the Building Class variable in the property tax rolls. For analysis, I uploaded these three parquet files to an [S3 Bucket](https://aws.amazon.com/s3/) and read them in as three separate tables to an [AWS Glue Database](https://docs.aws.amazon.com/glue/latest/dg/define-database.html) called "realestate" using an [AWS Glue crawler](https://docs.aws.amazon.com/glue/latest/dg/add-crawler.html). I then ran an interactive [AWS Glue instance](https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html) through a [Jupyter Notebook](https://aws.amazon.com/blogs/big-data/introducing-aws-glue-interactive-sessions-for-jupyter/). The entire analysis was thus run in the cloud on AWS using AWS Glue. As the code below demonstrates, the vast majority of the analysis was conducted used PySpark, with occasional switching to Pandas to examine small extracts of data. PySpark is an API for using Apache Spark in python; Apache Spark is an effective tool for working with big data on a distributed computing architecture. The datasets I am working with in this analysis are modest, so this notebook is more of a proof-of-concept than an actual big data application. In other words, all of the analysis I've done in this notebook could have easily been done with pandas on a laptop, but I am using PySpark to demonstrate its API.

## Research Goals and Background
Besides the primary goal of demonstrating the using of PySpark on AWS Glue, the research goal of this analysis is to evaluate how accurately we can predict (1) property tax protests (appeals) and (2) successful property tax protests, using data available at the time the New York City Department of Finance (DOF) is making its decisions on assessed values (and, therefore, tax bills). The theory is that, if DOF tax assessments are accurate, then it should be fairly difficult to predict _successful_ property tax protests using information available to DOF prior to the protest. This is because, if it _were_ easy for successful protests to be predicted at this point in time, then DOF could use the information it has available to adjust assessed values in order to avoid these successful protests, which is arguably an important goal of assessors.

It turns out that this is exactly what I find: it is fairly easy to predict protest using the available data, but very hard to predict successful protest. This suggests that DOF assessment practices are such that they effectively prevent protest success. Or in other words, most of the protest that do occur (and there are many) have little chance of success, and it is very hard to predict which ones are more likely to be successful with these data.

A small amount of background context is necessary to understand the analysis. In New York City, assessors at DOF use information from the recent past to assess property values throughout the city each year. Each property gets mailed a [Notice of Property Value (NOPV)](https://www.nyc.gov/site/finance/property/notice-of-property-value.page) in January, which describes the value DOF assigns to the property as well as some information on the process that led to that decision. These values are called "tentative roll" values, because the tax roll (i.e., the list of properties being taxed) at this time is not finalized. The property value then leads to a calculated tax bill, with higher assessed property values leading to higher expected tax bills. The property owner then has between January and March to protest (appeal) the value and, if the appeal is successful, have it reduced. In May, the DOF posts an updated [Final Roll](https://www.nyc.gov/site/finance/property/property-tax-bills.page) that contains the property information to be used for bills in the next fiscal year, i.e. from July to June starting in the upcoming summer.

My analysis in this jupyter notebook involves studying the rates of protest and successful protest. Since protest rates are very low for small residential (called Class 1) properties, and utility properties (Class 3) protest follows a different structure, my analysis and data will focus on the two other tax classes. These include apartment buildings, i.e. residential rentals, condos, and coops (class 2), and all commercial and industrial real property (class 4). 

## Setup
After setting up the jupyter notebook to run Glue PySpark through AWS Glue, the following code block sets the roles for an interactive session using the AWS magic commands (iam_role = identity access and management role, and region). The process for getting setup to use these is described on the [AWS website](https://aws.amazon.com/blogs/big-data/introducing-aws-glue-interactive-sessions-for-jupyter/). You will have to replace the roles in these magic commands with those for your account, and the region for your region (this block has been deleted from this public notebook for security).

In [None]:
%iam_role arn:aws:iam::[put your role here]
%region us-east-1

Next I import required libraries and functions.

In [1]:
import numpy as np 
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

Trying to create a Glue session for the kernel.
Session Type: etl
Worker Type: G.1X
Number of Workers: 5
Session ID: [removed]
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session [removed] to get into ready status...
Session [removed] has been created.



I primarily use the PySpark DataFrame API (see [DataCamp post](https://www.datacamp.com/cheat-sheet/pyspark-cheat-sheet-spark-dataframes-in-python)), which provides a way to code in PySpark that looks very similar to the popular data analysis package, pandas. The following few lines are the only SQL code I use in this notebook (although I am well-versed in SQL, since I have extensive SAS coding (PROC SQL) experience).

In [2]:
spark.sql('SHOW TABLES IN realestate').show(truncate=False)

+----------+--------------------------+-----------+
|namespace |tableName                 |isTemporary|
+----------+--------------------------+-----------+
|realestate|bldgclasscrosswalk_parquet|false      |
|realestate|nyc_taxrolls              |false      |
+----------+--------------------------+-----------+


The aws glue table realestate.nyc_taxrolls contains fiscal year (fy) 23 and 24 tax roll data from new york city department of finance. These were loaded from two parquet files, taxrolls_fy23.parquet and taxrolls_fy24.parquet, in an s3 bucket on AWS glue using an AWS glue crawler. (See discussion at the beginning of the notebook.)


In [3]:
spark.sql('SELECT fy, COUNT(*) FROM realestate.nyc_taxrolls GROUP BY fy').show()

+----+--------+
|  fy|count(1)|
+----+--------+
|2024|  445108|
|2023|  438054|
+----+--------+


## Outline of Analysis Plan
The analysis has three parts:

1. Define protest and successful protest, and other needed variables.
    - This will also include a brief exploration of the raw protest variable. 
    - This demonstrates some basic feature engineering in pyspark.
2. Descriptive Analysis of Protest.
    - Examine protest rates by tax class (class 2 apartments, class 4 commercial/industrial)
    - Examine protest rates by top building classes inside tax classes. This demonstrates joining, aggregation, and window functions in pyspark.
3. Predicting Protest with Other Variables.
    - Feature engineering of several variables.
    - Setting up a random forest using Spark ML syntax.
    - Fitting random forests to predict protest and successful protest.
    - Examining confusion matrices and F1 scores to study how good the models are at predicting protests. We expect protests might be easy to predict, but successful protest would be hard to predict, assuming DOF's assessment process is relatively accurate.
    
Each part will be conducted in turn. The parts need to be run in order since some columns and dataframes are defined in earlier parts, and used in later parts.

## 1. Define protest and successful protest, and other needed variables.

In [4]:
tbl = spark.table('realestate.nyc_taxrolls')




In [5]:
print(tbl.columns)

['boro', 'block', 'lot', 'bldg_class', 'fintaxclass', 'zip_code', 'yrbuilt', 'units', 'gross_sqft', 'protest_1', 'tenmkttot', 'tentrntot', 'tentxbextot', 'finmkttot', 'fintrntot', 'fintxbextot', 'fy']


Tabulate the different values for the main protest variable, protest_1, across the two fiscal years.

In [6]:
tmp = spark.sql('SELECT fy, protest_1, COUNT(*) as count FROM realestate.nyc_taxrolls GROUP BY fy, protest_1')
tmp.groupBy('protest_1').pivot('fy').agg(F.first(F.col("count"))).sort('2024',ascending=False).show()

+---------+------+------+
|protest_1|  2023|  2024|
+---------+------+------+
|      9  |195514|200797|
|         |194347|196433|
|      1  | 47922| 47643|
|      6E |    58|    85|
|      6N |    76|    83|
|      6  |   129|    59|
|      8  |     3|     7|
|      1N |  null|     1|
|      4  |     1|  null|
|      9E |     1|  null|
|      0  |     3|  null|
+---------+------+------+


Protest code 9 is the most common (by number of properties protesting), these are condo class 2 or class 4 appeals using the [tc109 form](https://www.nyc.gov/assets/taxcommission/downloads/pdf/tc109.pdf). 

Protest code 1 is the second most common, these are class 2 or class 4 appeals using the [tc101 form](https://www.nyc.gov/assets/taxcommission/downloads/pdf/tc101.pdf). These are all protests _except_ condos.

To confirm understanding, I check that protest code 9 lines up with condos in building class codes. Condo codes are those that start with R (see [NYC DOF building classifications](https://www.nyc.gov/assets/finance/jump/hlpbldgcode.html)).

In [7]:
tbl = tbl.withColumn('bldg_class_1dig',tbl.bldg_class.substr(0,1))
tbl = tbl.withColumn('isCondo',tbl.bldg_class_1dig=='R')
tbl.groupBy('isCondo').count().show()
tbl.groupBy('protest_1','isCondo').count().sort('count',ascending=False).show()

+-------+------+
|isCondo| count|
+-------+------+
|   true|540580|
|  false|342582|
+-------+------+

+---------+-------+------+
|protest_1|isCondo| count|
+---------+-------+------+
|      9  |   true|396306|
|         |  false|246880|
|         |   true|143900|
|      1  |  false| 95405|
|      1  |   true|   160|
|      6  |   true|   125|
|      6E |  false|   107|
|      6N |  false|   107|
|      6  |  false|    63|
|      6N |   true|    52|
|      6E |   true|    36|
|      8  |  false|    10|
|      9  |  false|     5|
|      0  |  false|     3|
|      9E |   true|     1|
|      4  |  false|     1|
|      1N |  false|     1|
+---------+-------+------+


The above confirms that protest code 9 is for condos, and protest code 1 is for non-condos: 396,306 of the '9' protests are condos, 6 are not. 95,405 of the '1' protests are non-condos, 161 are not.

Since I can separate these two almost perfectly through the first digit of building class code, I don't need to keep the protest_1 code separately for 1 vs 9 types of protest. Thus, I define simple indicator variables for protest instead of using these detailed codes.

### Construct a Protest Variable and a Successful Protest Variable

I assume that winning, conditional on protest, means that the tentative assessed value is higher than the final assessed value, indicating that the taxable billable assessed value (TBAV) for the property decreased between the tentative roll and the final roll. While this assumption seems reasonable since the protest process is largely about reducing the assessed value from the tentative roll to the final roll (i.e., arguing that the property is overvalued), it may not be entirely accurate. This is because there are other ways for the tentative and final rolls to differ, even if the protest is deemed "lost." However, this is the best measure available in the data.

Create protest ("any protest") indicator.

In [8]:
tbl = tbl.withColumn('protest',F.trim(tbl.protest_1)!='')
tbl.groupBy('protest').count().show()

+-------+------+
|protest| count|
+-------+------+
|   true|492382|
|  false|390780|
+-------+------+


Create successful protest indicator (require at least $1 reduction from tentative to final).

In [9]:
tbl = tbl.withColumn('successful_protest',(tbl.protest) & (tbl.fintxbextot<tbl.tentxbextot-1))
tbl.groupBy('successful_protest').count().show()

+------------------+------+
|successful_protest| count|
+------------------+------+
|              true| 32348|
|             false|850814|
+------------------+------+


In [10]:
# create integer versions of protest and successful protest columns for use in later analysis
tbl = tbl.withColumn('protestInt', F.when(tbl.protest==True, 1).otherwise(0))
tbl = tbl.withColumn('successful_protestInt', F.when(tbl.successful_protest==True, 1).otherwise(0))




Remove class 3 (utility properties) from the analysis, as they do not have protests in the same way as other properties (see discussion near beginning of the notebook).

In [11]:
tbl = tbl.withColumn('TC',tbl.fintaxclass.substr(1,1))
tbl = tbl.filter('TC != 3') 




# 2. Descriptive Analysis of Protest.

The descriptive analysis will focus on protests in fiscal year 24. I will calculate protest rates by tax class in two distinct ways. The first way involves using the count of properties to determine the rate of protests within each tax class. The second way calculates protest rates based on the full market values, specifically examining the share of the final full market value that undergoes a protest.

The process I took involves doing the aggregation in pyspark using groupBy, then converting the dataframes to pandas dataframes for concatenation. 

In [12]:
tbl24 = tbl.filter(tbl.fy=='2024')

# by counts of properties
column1 = tbl24.groupBy('TC').mean('protestInt').toPandas()
column2 = tbl24.groupBy('TC').mean('successful_protestInt').toPandas()
column3 = tbl24.groupBy('TC').count().toPandas()
column1.set_index('TC',inplace=True)
column2.set_index('TC',inplace=True)
column3.set_index('TC',inplace=True)
res = pd.concat([column1,column2,column3],axis=1)

# by FMV weighting
tbl24 = tbl24.withColumn('FMV',tbl24.finmkttot)
column4 = tbl24.groupBy('TC','protestInt').sum('FMV').toPandas()
column4 = column4.pivot(index='TC',columns='protestInt',values='sum(FMV)')
column4['share'] = column4[1]/(column4[0]+column4[1])
column4['FMV'] = column4[0]+column4[1]
column4 = column4.loc[:,['FMV','share']]
column5 = tbl24.groupBy('TC','successful_protestInt').sum('FMV').toPandas()
column5 = column5.pivot(index='TC',columns='successful_protestInt',values='sum(FMV)')
column5['share'] = column5[1]/(column5[0]+column5[1])
column5 = column5.loc[:,['share']]

# combine all columns
res = pd.concat([column1,column2,column3,column4,column5],axis=1)

# clean up
res['FMV'] = res['FMV']/1e9
res.columns = ['Protest Rate (Count)','Successful Protest (Count)','Count of Properties','FMV (Billions)','Protest Rate (FMV)','Successful Protest (FMV)']
res = res.loc[:,['Protest Rate (Count)','Successful Protest (Count)','Count of Properties','Protest Rate (FMV)','Successful Protest (FMV)','FMV (Billions)']]

# show result (res is a pandas dataframe)
res.T

TC                                      2              4
Protest Rate (Count)             0.634212       0.372318
Successful Protest (Count)       0.056924       0.001675
Count of Properties         323042.000000  117636.000000
Protest Rate (FMV)               0.681682       0.540357
Successful Protest (FMV)         0.127134       0.013857
FMV (Billions)                 401.953336     547.116200


These descriptive statistics show that protests are very common among these properties. 63% of apartment (class 2) properties, and 68% of apartment full market value (FMV), undergoes a protest. Meanwhile, much smaller percents of commercial (class 4) properties, 37%, under go a protest, but a much higher share 54% of commercial property FMV goes under protest.

The rates of successful protest are much lower than the rates of protest. This is especially true for commercial properties: by my measure, less than 0.2% of commercial properties successfully protest (and success is skewed toward more valuable properties).

Within each tax class, I calculate the rates of protest and successful protests by building class, highlighting the top 5 building classes by protest rate followed by a combined group for all other classes. 

I use major building classes (first digit), so for class 2 the top 5 building classes are all the building classes in the data. For class 4, there is a large "other" group.

This analysis will showcase the use of window functions and joins in PySpark, where we'll join building class descriptions to the building class column in the tax rolls.

To obtain building class descriptions, I use the table already loaded into AWS Glue from a parquet file (see discussion at the beginning of the notebook). However, for completeness, here is the script that I used to obtain building class code table locally (this is commented out since pd.read_html() does not work on AWS Glue for Apache Spark due to a missing dependency), and then I uploaded the resulting parquet file.

In [4]:
# import pandas as pd
# bldgclasscrosswalk = pd.read_html('https://www.nyc.gov/assets/finance/jump/hlpbldgcode.html')
# bldgclasscrosswalk = bldgclasscrosswalk[0]
# bldgclasscrosswalk.to_parquet('bldgclasscrosswalk.parquet',index=False)

Add the building class crosswalk to the dataframe.

In [13]:
bldgclasscrosswalk = spark.table('realestate.bldgclasscrosswalk_parquet')




In [14]:
bldgclasscrosswalk = bldgclasscrosswalk.withColumnRenamed('description','bldg_class_description')
bldgclasscrosswalk = bldgclasscrosswalk.withColumnRenamed('building code','bldg_class_1dig')




In [15]:
tbl24 = tbl24.join(bldgclasscrosswalk,on='bldg_class_1dig',how='left')




Now calculate the summaries.

In [16]:
protest_counts = tbl24.groupBy('TC','bldg_class_description','protest').count().sort('count',ascending=False).groupBy('TC','bldg_class_description').pivot('protest').agg(F.first(F.col("count")))
protest_counts = protest_counts.withColumn('share_protest',protest_counts['true']/(protest_counts['true']+protest_counts['false']))
protest_counts = protest_counts.withColumn('share_protest',F.coalesce(protest_counts['share_protest'],F.lit(0)))




Keep top 5 TC-building class combinations by share protest and group the rest into "other." (Note, it turns out there is only an "other" category for commercial Class 4 properties.)

This demonstrates the use of Window functions in pyspark using window.partitionBy(...).orderBy(...). I use Window functions to calculate row_numbers() and keep the top 5 (by share protest) building codes with each tax class. 

In [17]:
windowPartition = Window.partitionBy("TC").orderBy(F.desc("share_protest"))
protest_counts = protest_counts.withColumn('protest_rank',
                          F.row_number().over(windowPartition))




In [18]:
protest_counts_top5 = protest_counts.filter(protest_counts.protest_rank<=5)




In [19]:
protest_counts_other = protest_counts.filter(protest_counts.protest_rank>5)
protest_counts_other = protest_counts_other.groupBy('TC').agg(F.sum('false'),F.sum('true')).withColumn('share_protest',F.col('sum(true)')/(F.col('sum(true)')+F.col('sum(false)')))
protest_counts_other = protest_counts_other.withColumn('Label',F.concat(F.lit('Other Class '), F.col('TC')))
protest_counts_other = protest_counts_other.select('TC','Label','sum(false)','sum(true)','share_protest')
protest_counts_summaries = protest_counts_top5.drop('protest_rank').union(protest_counts_other)
protest_counts_summaries = protest_counts_summaries.withColumn('Number of Properties', protest_counts_summaries['false']+protest_counts_summaries['true'])
protest_counts_summaries = protest_counts_summaries.drop('false','true')
protest_counts_summaries = protest_counts_summaries.withColumnRenamed('share_protest','Share Protest')
protest_counts_summaries.show(truncate=False)

+---+--------------------------+-------------------+--------------------+
|TC |bldg_class_description    |Share Protest      |Number of Properties|
+---+--------------------------+-------------------+--------------------+
|2  |CONDOMINIUMS              |0.7689152311837676 |232326              |
|2  |ELEVATOR APARTMENTS       |0.68978578892372   |15312               |
|2  |WALK UP APARTMENTS        |0.2279880224260958 |62784               |
|2  |PRIMARILY RES. - MIXED USE|0.10792393026941363|12620               |
|4  |HOTELS                    |0.7482332155477032 |1132                |
|4  |THEATRES                  |0.6080402010050251 |199                 |
|4  |CONDOMINIUMS              |0.5350402692631326 |41595               |
|4  |OFFICE BUILDINGS          |0.5115156530133774 |7251                |
|4  |STORE BUILDINGS           |0.43392857142857144|19040               |
|4  |Other Class 4             |0.1776988372333175 |48419               |
+---+--------------------------+------

Now repeated for successful protests.

In [20]:
protest_counts = tbl24.groupBy('TC','bldg_class_description','successful_protest').count().sort('count',ascending=False).groupBy('TC','bldg_class_description').pivot('successful_protest').agg(F.first(F.col("count")))
protest_counts = protest_counts.withColumn('share_protest',protest_counts['true']/(protest_counts['true']+protest_counts['false']))
protest_counts = protest_counts.withColumn('share_protest',F.coalesce(protest_counts['share_protest'],F.lit(0)))
windowPartition = Window.partitionBy("TC").orderBy(F.desc("share_protest"))
protest_counts = protest_counts.withColumn('protest_rank',
                          F.row_number().over(windowPartition))
protest_counts_top5 = protest_counts.filter(protest_counts.protest_rank<=5)

protest_counts_other = protest_counts.filter(protest_counts.protest_rank>5)
protest_counts_other = protest_counts_other.groupBy('TC').agg(F.sum('false'),F.sum('true')).withColumn('share_protest',F.col('sum(true)')/(F.col('sum(true)')+F.col('sum(false)')))
protest_counts_other = protest_counts_other.withColumn('Label',F.concat(F.lit('Other Class '), F.col('TC')))
protest_counts_other = protest_counts_other.select('TC','Label','sum(false)','sum(true)','share_protest')
protest_counts_summaries = protest_counts_top5.drop('protest_rank').union(protest_counts_other)
protest_counts_summaries = protest_counts_summaries.withColumn('Number of Properties', protest_counts_summaries['false']+protest_counts_summaries['true'])
protest_counts_summaries = protest_counts_summaries.drop('false','true')
protest_counts_summaries = protest_counts_summaries.withColumnRenamed('share_protest','Share Successful Protest')
protest_counts_summaries.show(truncate=False)


+---+-------------------------------+------------------------+--------------------+
|TC |bldg_class_description         |Share Successful Protest|Number of Properties|
+---+-------------------------------+------------------------+--------------------+
|2  |ELEVATOR APARTMENTS            |0.1402168234064786      |15312               |
|2  |CONDOMINIUMS                   |0.06690168125823197     |232326              |
|2  |WALK UP APARTMENTS             |0.011101554536187564    |62784               |
|2  |PRIMARILY RES. - MIXED USE     |1.584786053882726E-4    |12620               |
|4  |THEATRES                       |0.005025125628140704    |199                 |
|4  |CONDOMINIUMS                   |0.004014905637696839    |41595               |
|4  |OFFICE BUILDINGS               |0.0016549441456350847   |7251                |
|4  |HOTELS                         |8.833922261484099E-4    |1132                |
|4  |HOSPITALS AND HEALTH FACILITIES|8.326394671107411E-4    |1201          

There are interesting contrasts between the two tables. Residential condos are somewhat (about 12%) more likely to protest than elevator apartments, but elevator apartments are over twice as likely to have a successful protest, meaning that their conditional probabilities of success given protests must have even an higher odds ratio. Similarly, hotels are the most likely to protest among class 4 commercial properties, but they hardly ever win their protests.

To clarify, note that the Tax Class 4 Condominiums are "commercial condos," which is commercial space that is owned in a condo form of ownership, e.g. a retail store might be owned by the retail company as a condo inside a mall, or a corporation might own a floor of an office building as a condo. Importantly, these are _not_ misclassified residential condos.

## 3.  Predicting Protest with Other Variables.

In this section, I build a predictive (random forest) model for protest and successful protest. To inform our predictions, I use a range of explanatory variables drawn from fiscal years (FY) 2023 and 2024. These include the tentative and final values for FY23, the tentative values for FY24, and several property characters as of FY23 and FY24.

I begin with feature engineering: I reshape the data into wide format for the predictive model using PySpark code. First I build a predictors_df, a dataframe containing all predictors. Then I build an outcome_df, a dataframe containing the outcome variables (as integers, not boolean). I then combine them into a final_df.

Note that I use FY23 tentative and final tax roll taxable value (TBAV) information (lagged tax roll values). I also use characteristics for FY23, not FY24, as it is possible that the protest would change these characteristics.

To merge datasets easily, I create a single identifier column out of borough, block, and lot, called BBL. These identify tax lots in the city.

In [21]:
tbl = tbl.withColumn('bbl',F.concat(tbl.boro,F.lit('-'),tbl.block,F.lit('-'),tbl.lot))




Check for duplicate rows. There are a small amount, so I remove them.

In [22]:
tbl.groupBy('bbl','fy').count().groupBy('count').count().show()

+-----+------+
|count| count|
+-----+------+
|    3|    81|
|    4|    23|
|    2|   976|
|    1|871910|
|    5|    10|
|    6|     6|
|    8|     2|
+-----+------+


Remove the small number of duplicate rows.

In [23]:
tbl = tbl.dropDuplicates(['bbl','fy'])




In [24]:
tbl.groupBy('bbl','fy').count().groupBy('count').count().show()

+-----+------+
|count| count|
+-----+------+
|    1|873008|
+-----+------+


In [25]:
predictors_df23 = tbl.filter(tbl.fy=='2023').select('bbl','tentxbextot','fintxbextot','bldg_class_1dig','fintaxclass','zip_code','yrbuilt','units','gross_sqft')
predictors_df23 = predictors_df23.withColumnRenamed('tentxbextot','tentxbextot23')
predictors_df23 = predictors_df23.withColumnRenamed('fintxbextot','fintxbextot23')




I use _only_ tentative tax roll TBAV for fy24, as the DOF doesn't know final tax roll information at the time of the protest (protests occur between tentative and final tax roll information). 

I need to rename these tax roll value columns in order for these tables to be joined together next.

In [26]:
predictors_df24 = tbl.filter(tbl.fy=='2024').select('bbl','tentxbextot')
predictors_df24 = predictors_df24.withColumnRenamed('tentxbextot','tentxbextot24')




Combine the predictors. I use inner joins since I'm only going to try to predict for parcels (BBLs) that were present in both FY23 and FY24. Otherwise I will have almost no predictors, since most of them come from FY23 data.

In [27]:
predictors_df = predictors_df23.join(predictors_df24,on='bbl',how='inner')




Build the outcomes dataframe, and the final dataframe.

In [28]:
outcomes_df = tbl.filter(tbl.fy=='2024').select('bbl','protestInt','successful_protestInt')
final_df = predictors_df.join(outcomes_df,on='bbl',how='inner')




I do a complete case analysis - but it turns out there isn't any missing data. The following block confirms this.

In [29]:
print(final_df.count())
final_df = final_df.dropna()
print(final_df.count())

431625
431625


### Setup Dataset for Random Forest model
The process for running machine learning models, like random forest, using the Spark ML syntax, requires:
1. Converting all string predictor columns into index columns using StringIndexer()s
2. Putting all predictors ("features") into one column called "features" using a VectorAssembler
3. Putting the integer outcome column into a column called "label"

In [30]:
indexer = StringIndexer(inputCol = 'bldg_class_1dig', outputCol = 'bldg_class_1dig_idx')
indexer = indexer.fit(final_df)
final_df = indexer.transform(final_df)

indexer = StringIndexer(inputCol = 'fintaxclass', outputCol = 'fintaxclass_idx')
indexer = indexer.fit(final_df)
final_df = indexer.transform(final_df)




In [31]:
featureCols = ['tentxbextot23', 'fintxbextot23','yrbuilt', 'units', 'gross_sqft', 'tentxbextot24','bldg_class_1dig_idx', 'fintaxclass_idx']
assembler = VectorAssembler(inputCols=featureCols, outputCol='features')
final_df = assembler.transform(final_df)




Split data into an 80% training, 20% testing split

In [32]:
train, test = final_df.select('features','protestInt','successful_protestInt').randomSplit([0.8,0.20],seed=42)
print(train.count())
print(test.count())

345543
86082


### Fit Two Random Forest Models to the Training Dataset
The two models differ base on whether I use protest or success protest as the label (outcome) variable.

In [33]:
# protest
train_protest = train.withColumn('label',train.protestInt)
test_protest = test.withColumn('label',test.protestInt)

randomforest_protest = RandomForestClassifier()
randomforest_protest_model = randomforest_protest.fit(train_protest)

# successful protest
train_successfulprotest = train.withColumn('label',train.successful_protestInt)
test_successfulprotest = test.withColumn('label',test.successful_protestInt)

randomforest_successfulprotest = RandomForestClassifier()
randomforest_successfulprotest_model = randomforest_successfulprotest.fit(train_successfulprotest)




### Create Confusion Matrices on the Test Dataset for Each, and Compare

A confusion matrix puts the actual values along rows, and the predicted values along columns. Cells in the confusion matrix are the number of entries in the test dataset that belong to each of the cells.

In [34]:
prediction_protest = randomforest_protest_model.transform(test_protest)
prediction_successfulprotest = randomforest_successfulprotest_model.transform(test_successfulprotest)




In [35]:
confusion_protest = prediction_protest.groupBy('label','prediction').count().groupBy('label').pivot('prediction').agg(F.first(F.col("count"))).sort('label')
confusion_protest = confusion_protest.toPandas()
confusion_protest = confusion_protest.drop('label',axis=1)
confusion_protest

     0.0    1.0
0  28389   8967
1   7125  41601


For example, this confusion matrix says that using the random forest, I predicted 7838+39925 = 47,763 properties would protest (sum of the column labeled 1), among these, 39,925 actually protested (these are both in the test dataset).

In [36]:
confusion_successfulprotest = prediction_successfulprotest.groupBy('label','prediction').count().groupBy('label').pivot('prediction').agg(F.first(F.col("count"))).sort('label')
confusion_successfulprotest = confusion_successfulprotest.toPandas()
confusion_successfulprotest = confusion_successfulprotest.drop('label',axis=1)
confusion_successfulprotest

     0.0   1.0
0  82066   268
1   2158  1590


Calculate precision (the conditional probability of protest given predicted protest), recall (the conditional probability of predicted protest given protest), and F1 scores (the harmonic mean of precision and recall, used as an omnibus measure of model quality).

In [37]:
def calc_stats(confusion):
    confusion.columns = [0,1]
    precision = confusion[1][1]/(confusion[1][1]+confusion[1][0]) # share of predicted protests that are actual protests
    recall = confusion[1][1]/(confusion[0][1]+confusion[1][1]) # share of actual protests that are predicted
    f1score = 2*(precision*recall)/(precision+recall)
    return(precision,recall,f1score)





In [38]:
print(calc_stats(confusion_protest))
print(calc_stats(confusion_successfulprotest))

(0.8226744186046512, 0.853774165743135, 0.8379358269381835)
(0.8557588805166846, 0.4242262540021345, 0.5672493756689261)


The confusion matrix illustrates that we are quite capable of predicting protest using these variables. The conditional probability of protest given a positive prediction is 0.84 (this is precision), and the conditional probability of a positive prediction given protest is 0.82 (this is recall). The F1 score is 0.83, which is the harmonic mean of precision and recall. This is a good F1 score, so the model is fairly capable of predicting protest.

By contrast, the confusion matrix for successful protest illustrate that we are not very good at predicting successful protest! Note that an F1 score of 0.50 is obtained from a [random guess model](https://stackoverflow.com/a/50226931). We obtain an F1 score just slightly higher than this, of 0.56. This is consistent with DOF effectively assessing properties to limit the room for protest, and the appeals process being primarily driven by "irrational appeals" that have little chance of success.

# Conclusions
This jupyter notebook provides an example of conducting PySpark analysis using AWS Glue. The application was studying property tax protests in New York City. I was able to predict protest, but not successful protest. This is as expected, under the assumption that assessment practices are effective: if I could easily predict successful protest, then the city's assessors (who are trying to avoid successful appeals) would rationally incorporate that information into their assessments _ex ante_, thereby ruining any chance at predicting successful protest.

The analysis illustrated using several PySpark approaches to working with data, including grouping and aggregating, joining, windowing functions, collecting results with show() or toPandas() calls, and finally setting up and running a Random Forest in PySpark. While the data used in my application is modest in size, setting analyses up in PySpark and knowing the API for this tool permits analysis on truly big data, since Apache Spark automatically distributes the computing across nodes in the cluster to optimize performance. Indeed, throughout my running of this Jupyter Notebook, Apache Spark (through AWS Glue) was optimizing performance on AWS in the background. This work demonstrates my preparation for conducting big data analysis on large scale problems in the cloud.