# Exploring Heating Problems in Manhattan

In this notebook we will investigate New York City's 311 Complaints data, which is available as part of NYC Open Data. 

In particular we are interested in Heating Complaints within the Manhattan Borough. In this notebook, we will be exploring the data, using Spark's SQL module and the visualization tool Brunel.

During the months of October to May, residents of NYC can call 311 to report that a building doesn't have enough heat or hot water. In the remaining months, June to September, complaints can be made that heating has been left on. 

There may be a number of different factors that contribute to heating complaints, we will select a few of the features available in our data to see if they suggest any correlation.

***

## Read the Data


In [2]:
from pyspark.sql import SparkSession
import brunel

In [3]:
spark = SparkSession.builder.appName('NewYorkCase').getOrCreate()

In [4]:
nyc311DF=spark.read.csv('../datasets/311_Service_Requests_from_2015_subset.csv',header=True)

In [5]:
nyc311DF.count()

9999

In [9]:
nyc311DF.printSchema()

root
 |-- UniqueKey: string (nullable = true)
 |-- CreatedDate: string (nullable = true)
 |-- ClosedDate: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- AgencyName: string (nullable = true)
 |-- ComplaintType: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- LocationType: string (nullable = true)
 |-- IncidentZip: string (nullable = true)
 |-- IncidentAddress: string (nullable = true)
 |-- StreetName: string (nullable = true)
 |-- CrossStreet1: string (nullable = true)
 |-- CrossStreet2: string (nullable = true)
 |-- IntersectionStreet1: string (nullable = true)
 |-- IntersectionStreet2: string (nullable = true)
 |-- AddressType: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- FacilityType: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- DueDate: string (nullable = true)
 |-- ResolutionDescription: string (nullable = true)
 |-- ResolutionActionUpdatedDate: string (n

## Spark SQL Exploration

SparkSQL is a powerful tool allowing users a (often) familiar and (relatively) intuitive way to explore the data. In order to refer to the data within an SQL query, it needs to be stored as a view. The below query creates a temporary view named *nyc311ct*

In [6]:
nyc311DF.createOrReplaceTempView("nyc311ct")

We wish to limit our data to Manhattan, let's see the exact Borough value to use

In [12]:
spark.sql("select distinct Borough from nyc311ct").show()

+-------------+
|      Borough|
+-------------+
|  Unspecified|
|       QUEENS|
|     BROOKLYN|
|        BRONX|
|    MANHATTAN|
|STATEN ISLAND|
+-------------+



Let's find the complaint type with the most complaints in Manhattan.

Note that we are calling the cache function, this means that when the next action is called ("show", "count", etc.) it will store the dataframe *nyc311Agr_df* in memory for much quicker retrieval in the future. However, this must be small enough to fit.

In [13]:
nyc311Agr_df = spark.sql("select `Complaint Type` as Complaint_Type, count(`Unique Key`) as Complaint_Count "+
                            "from nyc311ct where Borough = 'MANHATTAN' "+
                            "group by `Complaint Type` order by Complaint_Count desc").cache()

In [10]:
nyc311Agr_df.show()

+--------------------+---------------+
|      Complaint_Type|Complaint_Count|
+--------------------+---------------+
|Noise - Street/Si...|            286|
|  Noise - Commercial|            164|
|     Illegal Parking|            162|
|            Graffiti|            104|
|             Smoking|            103|
|      Taxi Complaint|            102|
|Non-Residential Heat|             85|
|  Consumer Complaint|             73|
|   Broken Muni Meter|             68|
|     Noise - Vehicle|             62|
| Homeless Encampment|             59|
|    Street Condition|             58|
|  Food Establishment|             45|
|      HEAT/HOT WATER|             42|
|Fire Safety Direc...|             38|
|Maintenance or Fa...|             35|
|  Indoor Air Quality|             35|
|UNSANITARY CONDITION|             31|
|             Vending|             30|
|    Blocked Driveway|             29|
+--------------------+---------------+
only showing top 20 rows



## Visualize with Brunel

Convert spark dataframe to Pandas Dataframe as brunel supports Pandas Dataframe show directly

In [14]:
nyc311Agr_df_pd = nyc311Agr_df.toPandas()

Let's get a visual representation of the data within nyc311Agr_df. We are creating a bubble chart, where the size of the bubble represents the number of complaints. The complaint type is assigned a color, and, if large enough, the bubble is labeled, else the type displayed when hovered over.

In [15]:
%brunel data('nyc311Agr_df_pd') bubble size(Complaint_Count) color(Complaint_Type) label(Complaint_Type) legends(none) tooltip(Complaint_Type)

<IPython.core.display.Javascript object>

## Complaints by Zip code
How does the number of complaints vary by Zip code? Let's remove any data points where a zip code hasn't been provided and filter to those that are of type 'HEAT/HOT WATER'.

Note: If just exploring the data, where you do not intend to re-use the resulting dataframe, you can just use Spark SQL with the function "show" without assigning it to a variable. 

In [16]:
spark.sql("select `Incident Zip` as Zip, count(*) as ZipHeatingCnt " +
          "from nyc311ct " +
          "where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' group by `Incident Zip`").show()

+-----+-------------+
|  Zip|ZipHeatingCnt|
+-----+-------------+
|11236|            2|
|11106|            2|
|10452|            1|
|11237|            1|
|11379|            1|
|11249|            3|
|10012|            1|
|11427|            1|
|11239|            1|
|11213|            6|
|10457|            1|
|11226|            6|
|11224|            1|
|10011|            1|
|10473|            1|
|11229|            1|
|11230|            1|
|10014|            1|
|10463|            1|
|10462|            1|
+-----+-------------+
only showing top 20 rows



Similarly, if you wish to use the result of these queries for future queries but do not require the data as a dataframe, you can create a table directly from the query as follows"

In [17]:
spark.sql("select `Incident Zip` as Zip, count(*) as ZipHeatingCnt " + 
          "from nyc311ct " +
          "where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' group by `Incident Zip`").createOrReplaceTempView("zipHeatingCnt")

Additional SQL functions are available such as "max", "min", "cast", "split", "limit", etc. Some will be used below:

In [18]:
spark.sql("select max(ZipHeatingCnt) as max, min(ZipHeatingCnt) as min, avg(ZipHeatingCnt) as avg from zipHeatingCnt").show()

+---+---+------------------+
|max|min|               avg|
+---+---+------------------+
|  7|  1|2.0135135135135136|
+---+---+------------------+



## Complaints by Zip code and date
Let's see which date and zip codes had the most complaints. The 'Created Date' field includes a time, therefore we are using the "split" function to just use the date. We are also limiting the data to only heat/hot water complaints.

In [19]:
spark.sql("select split(`Created Date`, ' ')[0] as Incident_Date, `Incident Zip` as Incident_Zip, "+
          "count(`Unique Key`) as HeatingComplaintCount "+
          "from nyc311ct where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' "+
          "group by split(`Created Date`, ' ')[0], `Incident Zip` order by HeatingComplaintCount desc limit 50").show()

+-------------+------------+---------------------+
|Incident_Date|Incident_Zip|HeatingComplaintCount|
+-------------+------------+---------------------+
|   10/27/2015|       11212|                    2|
|   11/09/2015|       11212|                    2|
|   10/19/2015|       10456|                    2|
|   11/08/2015|       10456|                    2|
|   10/24/2015|       11209|                    2|
|   10/17/2015|       10467|                    1|
|   10/19/2015|       11211|                    1|
|   10/19/2015|       11237|                    1|
|   10/10/2015|       10451|                    1|
|   10/19/2015|       10011|                    1|
|   10/19/2015|       11373|                    1|
|   10/06/2015|       11207|                    1|
|   12/22/2015|       11213|                    1|
|   10/18/2015|       10453|                    1|
|   11/09/2015|       10021|                    1|
|   10/19/2015|       10024|                    1|
|   11/08/2015|       10454|   

It looks like Jan 8th and Feb 16th were cold days!

## Complaints by address

Let's explore complaints by address:

In [20]:
addressNycAgr = spark.sql("select `Incident Address` as IncidentAddress, count(`Unique Key`) as IncidentCount "+
                              "from nyc311ct "+
                              "where `Complaint Type` = 'HEAT/HOT WATER' and `Incident Zip` <> '' group by `Incident Address`")

In [21]:
addressNycAgr.createOrReplaceTempView("nycaddHeat")

Count how many addresses have complained about heating / hot water

In [22]:
addressNycAgr.count()

145

Let's investigate the distribution of the data; we tally up the number of addresses for each incident count.

In [23]:
adrHeatDistr = spark.sql("select IncidentCount, count(*) as AddressDistr from nycaddHeat group by IncidentCount order by IncidentCount limit 100")

In [24]:
adrHeatDistr.createOrReplaceTempView("nycaddHeatDistr")

In [25]:
adrHeatDistr.show()

+-------------+------------+
|IncidentCount|AddressDistr|
+-------------+------------+
|            1|         141|
|            2|           4|
+-------------+------------+



Let's use Brunel again to get a visualization in the hope it gives up a quick understanding of the data.

In [26]:
adrHeatDistr_pd = adrHeatDistr.toPandas()
%brunel data('adrHeatDistr_pd') x(IncidentCount) y(AddressDistr)

<IPython.core.display.Javascript object>

The query took the first 100 incident counts but it looks like we have a long tail distribution; the bulk of the addresses only have 1 or a few complaints. The number of addresses that complained tends to decrease as the number of incidents counted increases.

Let's have a quick look at the top number of complaints and how many addresses made that many:

In [27]:
spark.sql("select * from nycaddHeatDistr order by IncidentCount desc limit 5").show()

+-------------+------------+
|IncidentCount|AddressDistr|
+-------------+------------+
|            2|           4|
|            1|         141|
+-------------+------------+



As expected, only one or two addresses can be found making each of number of complaints

## Combining with PLUTO Data

We are now going to join the NYC 311 Complaints data with another data set, *PLUTO*. This data is also publically available with NYC Open Data. It is available in a CSV format and contains land use and geographic data, including more than seventy fields. If we can join these two datasets, we might be able to get a better understanding of attributes that contribute to complaints being made.

We now read similar to before:

In [28]:
plutoDF=spark.read.csv('../datasets/MN.csv',header=True)

Let's confirm it worked by printing the schema

In [29]:
plutoDF.printSchema()

root
 |-- Borough: string (nullable = true)
 |-- Block: string (nullable = true)
 |-- Lot: string (nullable = true)
 |-- CD: string (nullable = true)
 |-- CT2010: string (nullable = true)
 |-- CB2010: string (nullable = true)
 |-- SchoolDist: string (nullable = true)
 |-- Council: string (nullable = true)
 |-- ZipCode: string (nullable = true)
 |-- FireComp: string (nullable = true)
 |-- PolicePrct: string (nullable = true)
 |-- Address: string (nullable = true)
 |-- ZoneDist1: string (nullable = true)
 |-- ZoneDist2: string (nullable = true)
 |-- ZoneDist3: string (nullable = true)
 |-- ZoneDist4: string (nullable = true)
 |-- Overlay1: string (nullable = true)
 |-- Overlay2: string (nullable = true)
 |-- SPDist1: string (nullable = true)
 |-- SPDist2: string (nullable = true)
 |-- LtdHeight: string (nullable = true)
 |-- AllZoning1: string (nullable = true)
 |-- AllZoning2: string (nullable = true)
 |-- SplitZone: string (nullable = true)
 |-- BldgClass: string (nullable = true)
 |--

We create a temporary view to be able to run SQL queries

In [30]:
plutoDF.createOrReplaceTempView("plutot")

## Data Transformation & Feature Engineering

Sometimes a feature needs to be engineered to make it more valuable, whether for visual purposes or modeling purposes. This can be done in many ways, including aggregation, bucketing, combining multiple features.

Rather than investigate the data at an address level, we are going to aggregate to zipcode. We are also only selecting a subset of these fields to focus our investigation. The features we are extracting are Borough, ZipCode and the average for: Building Area, Building Age (by subtracting the build year from 2016), and X & Y co-ordinates.

In [31]:
plutoSubdf = spark.sql("select Borough, ZipCode, count(*) as BldgCount, "+
                           "round(Avg(BldgArea)) as avgBldgArea, round(Avg(2015 - YearBuilt)) as avgBldgAge, "+
                           "round(Avg(XCoord)) as avgXcrd, round(Avg(YCoord)) as avgYcrd "+
                           "from plutot group by Borough, ZipCode")

In [32]:
plutoSubdf.createOrReplaceTempView("plutosubt")

## Merge the Data
We are now joining the NYC 311 complaints data with the PLUTO data where the two Zip codes match

In [33]:
mergedDf = spark.sql("select * from zipHeatingCnt, plutosubt where Zip = ZipCode").cache()

In [36]:
mergedDf.createOrReplaceTempView("modelt")

In [37]:
mergedDf.count()

25

We have only 46 Zip codes in Manhattan that made complaints in 2015, let's have a look at some of the attributes we have calculated

In [38]:
spark.sql("select * from modelt").show(100)

+-----+-------------+-------+-------+---------+-----------+----------+---------+--------+
|  Zip|ZipHeatingCnt|Borough|ZipCode|BldgCount|avgBldgArea|avgBldgAge|  avgXcrd| avgYcrd|
+-----+-------------+-------+-------+---------+-----------+----------+---------+--------+
|10012|            1|     MN|  10012|     1168|    23483.0|     143.0| 984755.0|203548.0|
|10011|            1|     MN|  10011|     2080|    26692.0|     145.0| 984425.0|209172.0|
|10014|            1|     MN|  10014|     1959|    17391.0|     148.0| 982940.0|206799.0|
|10463|            1|     MN|  10463|      220|    15722.0|     204.0|1008961.0|258548.0|
|10003|            3|     MN|  10003|     1881|    28940.0|     139.0| 987540.0|205715.0|
|10040|            1|     MN|  10040|      429|    40602.0|     188.0|1003705.0|251912.0|
|10022|            3|     MN|  10022|     1101|    82191.0|     189.0| 993309.0|215562.0|
|10065|            1|     MN|  10065|     1052|    42394.0|     127.0| 993913.0|217993.0|
|10035|   

Let's select the desired target (ZipHeatingCnt) and features (Borough, BldgCount, avgBldAge, avgBldgArea, avgXCrd, avgYCrd) to a dataframe ready to be saved for future use.

In [39]:
mdlDataDF = spark.sql("select Zip, ZipHeatingCnt, Borough, BldgCount, avgBldgAge, avgBldgArea, avgXcrd, avgYCrd from modelt")

## Exploring Complaint Density
Rather than look at complaint value as an absolute value, we are calculating and exploring Complaint Density by dividing the number of complaints by the number of buildings in that zipcode.

We are also bucking the ages to see how the age of the building relates to the number of complaints.

In [40]:
complaintDensityZipCode = spark.sql("select Borough, Zip, round((ZipHeatingCnt/BldgCount), 1) as ComplaintDensity, "+
                                        "round(avgBldgArea) as AvgBldgArea, round(avgBldgAge) as AvgBldgAge, "+
                                        "round(avgXcrd) as AvgXCoord, round(avgYcrd) as AvgYCoord from modelt").cache()

Note: The "false" states that the results should not be truncated

In [41]:
complaintDensityZipCode.show()

+-------+-----+----------------+-----------+----------+---------+---------+
|Borough|  Zip|ComplaintDensity|AvgBldgArea|AvgBldgAge|AvgXCoord|AvgYCoord|
+-------+-----+----------------+-----------+----------+---------+---------+
|     MN|10012|             0.0|    23483.0|     143.0| 984755.0| 203548.0|
|     MN|10011|             0.0|    26692.0|     145.0| 984425.0| 209172.0|
|     MN|10014|             0.0|    17391.0|     148.0| 982940.0| 206799.0|
|     MN|10463|             0.0|    15722.0|     204.0|1008961.0| 258548.0|
|     MN|10003|             0.0|    28940.0|     139.0| 987540.0| 205715.0|
|     MN|10040|             0.0|    40602.0|     188.0|1003705.0| 251912.0|
|     MN|10022|             0.0|    82191.0|     189.0| 993309.0| 215562.0|
|     MN|10065|             0.0|    42394.0|     127.0| 993913.0| 217993.0|
|     MN|10035|             0.0|    13521.0|     443.0|1001455.0| 231364.0|
|     MN|10028|             0.0|    31134.0|     103.0| 997215.0| 222054.0|
|     MN|100

In [42]:
complaintDensityZipCode.createOrReplaceTempView("cdzt")

We are once again utilizing Spark SQL's rich functionality and creating case statements to assign the zip code into either either *\<100* if the building is less than 100 years old, *100\<age\<200* if between 100 and 200 and *200+* if greater than 200.

Similarly, the complaint density has been classified as *Low* if less than 0.1, H*igh* if between 0.1 and 1 and *Very High* if greater than 1.

In [43]:
cdztop = spark.sql("select (case when AvgBldgAge < 100 then '<100' when AvgBldgAge >= 100 and AvgBldgAge < 200 then '100<age<200' else '>200' end) "+
                           "as AvgBldgAgeCat, "+
                       "(case when ComplaintDensity < 0.1 then 'Low' when ComplaintDensity >= 0.1 and ComplaintDensity < 1 then 'High' else 'Very High' end) "+
                           "as ComplaintDensityLevel "+
                       "from cdzt")

Let's use the Chord graphic to visually interpret any connections between age and complaint density

In [44]:
cdztop_pd = cdztop.toPandas()
%brunel  data('cdztop_pd') chord x(AvgBldgAgeCat) y(ComplaintDensityLevel) color(#count) tooltip(#all)

<IPython.core.display.Javascript object>

There is only one zipcode with average building age less than 100 years old so this segmentation could be improved, or more data would be required to get a further understanding. However, as expected, the houses that are greater than 200 years old have a tendency to have a complaint density level or either High or Very High.

## Saving the Data

When saving, Spark will partition the data. It is worth understading how many rows are in your data as you can explicitly specify how many partitions you would like created

In [45]:
mdlDataDF.count()

25

Since we only have a small amount of data we can keep it all together, we do this by calling "coalesce" and specifying the value *1*

In [46]:
mergedMNcmp = mdlDataDF.coalesce(1)

Similar to how we specified there was a header in the raw file, we pass this parameter to "options" after calling "write". We specify that we wish for the data to be saved in a csv format by passing the appropriate path and filename to the "csv" method.

In [47]:
mergedMNcmp.write.csv('../datasets/mnModelData.csv',header=True,mode="overwrite")


We repeat for the subset of pluto data

In [48]:
plutoSubdf.count()

56

In [49]:
plutoSubcmp = plutoSubdf.coalesce(1)

In [50]:
plutoSubcmp.write.csv('../datasets/mnAgrData.csv',header="true",mode="overwrite")

In [51]:
!ls -l ../datasets


total 19230
-rwx------. 1 999 root  6710815 Aug  1 18:46 311_Service_Requests_from_2015_subset.csv
drwxrwxrwx. 2 999 999      4096 Aug  1 19:22 mnAgrData.csv
-rwx------. 1 999 root 12972032 Aug  1 19:17 MN.csv
drwxrwxrwx. 2 999 999      4096 Aug  1 19:22 mnModelData.csv
