## This PySpark application programmatically aims to do the following: 

#### 1) Analyze NYC 311 Service Requests from 2010 to Present. 

#### 2) Apply machine learning and build a predictive model to try and predict the amount of time that it will take for an issue to go from “Open” to “Closed” among “Closed” tickets.  

#### 3) Determine which features are useful for prediction.

In [1]:
## Importing necessary libraries

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import functions as F
import pandas as pd

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
2,application_1574426985144_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
## Creating Spark session

mySpark = SparkSession.builder.getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
## Input file

inputFile = "s3://pgarias-bucket-cloud/311_Service_Requests_from_2015_to_Present.csv"

#inputFile = "s3://pgarias-bucket-cloud/311_Service_Requests_from_2015_to_Present_head_1000.csv"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
## Reading input CSV file and creating Spark data frame

dataFrame = mySpark.read.csv(inputFile,header=True)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
## Printing the schema 

dataFrame.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Unique Key: string (nullable = true)
 |-- Created Date: string (nullable = true)
 |-- Closed Date: string (nullable = true)
 |-- Agency: string (nullable = true)
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Descriptor: string (nullable = true)
 |-- Location Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Incident Address: string (nullable = true)
 |-- Street Name: string (nullable = true)
 |-- Cross Street 1: string (nullable = true)
 |-- Cross Street 2: string (nullable = true)
 |-- Intersection Street 1: string (nullable = true)
 |-- Intersection Street 2: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Landmark: string (nullable = true)
 |-- Facility Type: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Due Date: string (nullable = true)
 |-- Resolution Description: string (nullable = true)
 |-- Resolution Action

In [6]:
## Counting total number of records in the dataframe

dataFrame.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

10420594

In [7]:
## Dropping irrelevant columns and columns containing null values

dataFrame = dataFrame.drop('Vehicle Type', 'Taxi Company Borough', 'Taxi Pick Up Location', 'Bridge Highway Name', 
                           'Bridge Highway Direction', 'Road Ramp', 'Bridge Highway Segment', 'Landmark', 'Latitude', 
                           'Longitude', 'Location', 'Incident Address', 'Resolution Description', 'Street Name', 'Cross Street 1',
                           'Cross Street 2', 'Intersection Street 1', 'Intersection Street 2', 'Landmark', 'Agency', 'Descriptor',
                           'City','Facility Type', 'Resolution Description', 'Community Board', 'X Coordinate (State Plane)', 
                           'Y Coordinate (State Plane)', 'Park Facility Name', 'Park Borough', 'Unique Key', 'Resolution Action Updated Date',
                           'Due Date', 'BBL', 'Unique Key')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
## Selecting only 'Closed' status rows in the input dataframe

dataFrame = dataFrame.filter(dataFrame.Status == 'Closed')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
## Analyzing refined dataframe by using suitable methods 

#dataFrame.show(1)

#dataFrame.limit(5).toPandas()

#dataFrame.select('Incident Zip').distinct().show()

#dataFrame.toPandas()['Address Type'].unique()

#dataFrame.select(F.countDistinct("Incident Zip")).show()

#dataFrame.groupBy("Borough").count().orderBy().show()

dataFrame.select(F.countDistinct("Complaint Type")).show()

#dataFrame.select(F.countDistinct("Agency Name")).show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------+
|count(DISTINCT Complaint Type)|
+------------------------------+
|                           218|
+------------------------------+

In [10]:
## Finding out time taken to close a ticket in hours and creating a new output label
## Output label column


timeFmt = "MM/dd/yyyy hh:mm:ss aa"

timeDiff = (F.unix_timestamp('Closed Date', format = timeFmt)
            - F.unix_timestamp('Created Date', format = timeFmt))

dataFrame = dataFrame.withColumn("Resolution_Time_Hours", F.round(timeDiff/3600, 2))

#timeDiff = (F.unix_timestamp('Closed Date', format = timeFmt)
 #           - F.unix_timestamp('Created Date', format = timeFmt))

#dataFrame = dataFrame.withColumn("Resolution_Time_Hours", F.round(timeDiff/3600, 2))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
## Creating new columns for ticket creation month, day, day of week and hour

dataFrame = dataFrame.withColumn("Created_Date_Timestamp", F.unix_timestamp('Created Date', format = timeFmt).cast('timestamp'))

dataFrame = dataFrame.withColumn("Complaint_Created_Day", F.dayofmonth('Created_Date_Timestamp'))

dataFrame = dataFrame.withColumn("Complaint_Created_Month", F.month('Created_Date_Timestamp'))

dataFrame = dataFrame.withColumn("Complaint_Created_DayOfWeek", F.dayofweek('Created_Date_Timestamp'))

dataFrame = dataFrame.withColumn("Complaint_Created_Hour", F.hour('Created_Date_Timestamp'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
## Summary statistics of column Resolution_Time_Hours

dataFrame.describe('Resolution_Time_Hours').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------+
|summary|Resolution_Time_Hours|
+-------+---------------------+
|  count|              9870837|
|   mean|    287.6464866778801|
| stddev|    4514.653800656453|
|    min|          -1028338.01|
|    max|           8855068.03|
+-------+---------------------+

In [13]:
## Dropping irrelevant columns further from the dataframe

dataFrame = dataFrame.drop('Created Date', 'Closed Date', 'Created_Date_Timestamp', 'Status', 
                           'Location Type', 'Resolution_Time')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
## Dropping all null values from the refined dataframe

dataFrame = dataFrame.dropna()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
## Finding distinct values for column Resolution_Time_Hours

for i in dataFrame.columns:
    if i == 'Resolution_Time_Hours':
        resolutionTimeDf = dataFrame.groupBy(i).count().orderBy('count', ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
## Finding distinct values for column Resolution_Time_Hours using cube function

dataFrame.cube("Resolution_Time_Hours").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+-----+
|Resolution_Time_Hours|count|
+---------------------+-----+
|               951.85|    2|
|                 2.89| 4031|
|                 1.74| 5477|
|                 0.85| 8581|
|                 4.14| 2707|
|                 0.31| 6079|
|              1443.08|    3|
|                54.02|  203|
|                 8.25| 1126|
|               172.05|  125|
|                 69.5|  262|
|                 7.77| 1172|
|               635.56|    4|
|               161.37|  102|
|               431.29|   33|
|                29.42|  427|
|               431.37|   34|
|                14.42|  518|
|               1598.4|    2|
|              1004.31|    8|
+---------------------+-----+
only showing top 20 rows

In [17]:
## Finding percentage of each Resolution_Time_Hours in the dataframe

resolutionTimeDf = resolutionTimeDf.withColumn('Percentage',(resolutionTimeDf['count']/dataFrame.count()) * 100)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
## Rounding value of percentage column 

resolutionTimeDf = resolutionTimeDf.withColumn("Percentage", F.round(resolutionTimeDf["Percentage"]))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
## Sorting and identifying the percentage of each Resolution_Time_Hours

from pyspark.sql.functions import desc

resolutionTimeDf.sort(desc('Percentage')).show(50)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------+------+----------+
|Resolution_Time_Hours| count|Percentage|
+---------------------+------+----------+
|                  0.0|253340|       3.0|
|                 24.0| 50848|       1.0|
|                71.64|   165|       0.0|
|                88.53|   170|       0.0|
|              1936.58|     2|       0.0|
|                 1.05|  8170|       0.0|
|               137.38|    96|       0.0|
|                 5.62|  2039|       0.0|
|                 0.72|  8457|       0.0|
|                 45.4|   405|       0.0|
|                 2.27|  5889|       0.0|
|                44.55|   396|       0.0|
|                 6.73|  1564|       0.0|
|                 5.66|  1803|       0.0|
|               105.85|   121|       0.0|
|                91.99|    88|       0.0|
|               113.73|   168|       0.0|
|              1993.29|     2|       0.0|
|               233.78|    41|       0.0|
|                24.75|   800|       0.0|
|                73.79|   185|    

In [20]:
## Removing extreme outlier values from the dataframe for column Resolution_Time_Hours

dataFrame = dataFrame.filter(dataFrame.Resolution_Time_Hours > 0)
dataFrame = dataFrame.filter(dataFrame.Resolution_Time_Hours < 5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
## Grouping Complaint Type column under common issues 'Noise' and 'Street'

from pyspark.sql.functions import lit, when, col

dataFrame = dataFrame.withColumn('Complaint Type', when(
    (col("Complaint Type") == "Noise - Street/Sidewalk") 
    | (col("Complaint Type") == "Noise - Commercial") 
    | (col("Complaint Type") == "Noise - Vehicle") 
    | (col("Complaint Type") == "Noise - Residential")
    | (col("Complaint Type") == "Noise - Helicopter"), "Noise"
).otherwise(col('Complaint Type')))

dataFrame = dataFrame.withColumn('Complaint Type', when(
    (col("Complaint Type") == "Street Condition") 
    | (col("Complaint Type") == "Street Light Condition") 
    | (col("Complaint Type") == "Street Sign - Missing"), "Street Issue"
).otherwise(col('Complaint Type')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
## Summary statistics of column Resolution_Time_Hours

dataFrame.describe('Resolution_Time_Hours').show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------+
|summary|Resolution_Time_Hours|
+-------+---------------------+
|  count|              2574584|
|   mean|   1.9813040669871351|
| stddev|   1.3404318176633905|
|    min|                 0.01|
|    max|                 4.99|
+-------+---------------------+

In [23]:
## Printing the revised schema 

dataFrame.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Agency Name: string (nullable = true)
 |-- Complaint Type: string (nullable = true)
 |-- Incident Zip: string (nullable = true)
 |-- Address Type: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Open Data Channel Type: string (nullable = true)
 |-- Resolution_Time_Hours: double (nullable = true)
 |-- Complaint_Created_Day: integer (nullable = true)
 |-- Complaint_Created_Month: integer (nullable = true)
 |-- Complaint_Created_DayOfWeek: integer (nullable = true)
 |-- Complaint_Created_Hour: integer (nullable = true)

In [24]:
## Checking existence of all null values in columns before building model

dataFrame.filter(dataFrame["Complaint Type"].isNull()).count()
#dataFrame.filter(dataFrame["Incident Zip"].isNull()).count()
#dataFrame.filter(dataFrame["Borough"].isNull()).count()
#dataFrame.filter(dataFrame["Open Data Channel Type"].isNull()).count()
#dataFrame.filter(dataFrame["Agency Name"].isNull()).count()
#dataFrame.filter(dataFrame["Address Type"].isNull()).count()
#dataFrame.filter(dataFrame["Resolution_Time_Hours"].isNull()).count()
#dataFrame.filter(dataFrame["Complaint_Created_Day"].isNull()).count()
#dataFrame.filter(dataFrame["Complaint_Created_Month"].isNull()).count()
#dataFrame.filter(dataFrame["Complaint_Created_DayOfWeek"].isNull()).count()
#dataFrame.filter(dataFrame["Complaint_Created_Hour"].isNull()).count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

0

In [25]:
## Removing rows containing null values for key columns

dataFrame = dataFrame.where(dataFrame["Incident Zip"].isNotNull())
#dataFrame = dataFrame.where(dataFrame["Address Type"].isNotNull())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
## Finding distinct values for column Complaint Type

for i in dataFrame.columns:
    if i == 'Complaint Type':
        complaintDf = dataFrame.groupBy(i).count().orderBy('count', ascending=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
## Finding distinct values for column Complaint Type using cube function

dataFrame.cube("Complaint Type").count().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+
|      Complaint Type|count|
+--------------------+-----+
|            Graffiti|  278|
|       Drug Activity| 3339|
|Emergency Respons...|  146|
| Hazardous Materials|10451|
|            ELECTRIC|  168|
|DOF Property - RP...|  362|
|    Advocate - Other|  250|
|    Alzheimer's Care|    3|
|       Water Quality|  107|
|Recycling Enforce...|   43|
|      Adopt-A-Basket|    4|
|             GENERAL|  302|
|Overflowing Litte...|  519|
|Special Projects ...|   65|
|DOF Property - Ci...|   10|
| Cranes and Derricks|   35|
|DOF Property - Ow...| 2854|
|            Plumbing|   99|
|    Animal in a Park|  809|
|Advocate-Personal...|  721|
+--------------------+-----+
only showing top 20 rows

In [28]:
## Counting total number of records in the dataframe post refinement

dataFrame.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2574584

In [29]:
## Finding percentage of each complaint type in the dataframe

complaintDf = complaintDf.withColumn('Percentage',(complaintDf['count']/dataFrame.count()) * 100)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
## Rounding value of percentage column 

complaintDf = complaintDf.withColumn("Percentage", F.round(complaintDf["Percentage"], 3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
## Sorting and identifying the percentage of each complaint type

from pyspark.sql.functions import desc

complaintDf.sort(desc('Percentage')).show(20)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+----------+
|      Complaint Type|  count|Percentage|
+--------------------+-------+----------+
|               Noise|1154894|    44.857|
|     Illegal Parking| 389730|    15.138|
|    Blocked Driveway| 355023|     13.79|
|        Water System|  89019|     3.458|
|    Derelict Vehicle|  75486|     2.932|
|Homeless Person A...|  73076|     2.838|
|Traffic Signal Co...|  63187|     2.454|
|               Sewer|  62638|     2.433|
|        Animal Abuse|  26834|     1.042|
|Non-Emergency Pol...|  26366|     1.024|
|        Street Issue|  23526|     0.914|
|      HEAT/HOT WATER|  20706|     0.804|
|             Traffic|  16675|     0.648|
|Sanitation Condition|  15439|       0.6|
|             Vending|  14359|     0.558|
| Homeless Encampment|  14342|     0.557|
|        Noise - Park|  13452|     0.522|
|Missed Collection...|  11837|      0.46|
|  Water Conservation|  11520|     0.447|
| Hazardous Materials|  10451|     0.406|
+--------------------+-------+----

In [32]:
## Checking complaint types for a certain percentage of records  

complaintDf.filter(complaintDf.Percentage > 5).show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+-------+----------+
|  Complaint Type|  count|Percentage|
+----------------+-------+----------+
|           Noise|1154894|    44.857|
| Illegal Parking| 389730|    15.138|
|Blocked Driveway| 355023|     13.79|
+----------------+-------+----------+

In [33]:
## Renaming column

dataFrame = dataFrame.withColumnRenamed("Complaint Type", "Complaint_Type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [34]:
## Dropping all rows containing less than 5% records for individual complaint type from the original dataframe

#dataFrame = dataFrame.filter(dataFrame.Complaint_Type != 'Noise')
#dataFrame = dataFrame.filter(dataFrame.Complaint_Type != 'Illegal Parking')
#dataFrame = dataFrame.filter(dataFrame.Complaint_Type != 'Blocked Driveway')

dataFrame = dataFrame.filter((dataFrame.Complaint_Type == 'Noise') | (dataFrame.Complaint_Type == 'Illegal Parking') | (dataFrame.Complaint_Type == 'Blocked Driveway'))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [35]:
## Checking complaint type column contents post filtering

dataFrame.select('Complaint_Type').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+
|  Complaint_Type|
+----------------+
| Illegal Parking|
|Blocked Driveway|
|           Noise|
+----------------+

In [36]:
## Checking contents of column Borough

dataFrame.select('Borough').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|      Borough|
+-------------+
|    01 QUEENS|
|  Unspecified|
|       QUEENS|
|     BROOKLYN|
|        BRONX|
|    13 QUEENS|
|    MANHATTAN|
|STATEN ISLAND|
+-------------+

In [37]:
## Grouping boroughs together

dataFrame = dataFrame.withColumn('Borough', when(
    (col("Borough") == "01 QUEENS") 
    | (col("Borough") == "QUEENS") 
    | (col("Borough") == "13 QUEENS"), "QUEENS"
).otherwise(col('Borough')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [38]:
## Filtering out 'Unspecified' borough type from the dataframe

dataFrame = dataFrame.filter(dataFrame.Borough != 'Unspecified')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
## Checking contents of column Borough

dataFrame.select('Borough').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------+
|      Borough|
+-------------+
|       QUEENS|
|     BROOKLYN|
|        BRONX|
|    MANHATTAN|
|STATEN ISLAND|
+-------------+

In [40]:
## Checking contents of column Open Data Channel Type

dataFrame.select('Open Data Channel Type').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+
|Open Data Channel Type|
+----------------------+
|                MOBILE|
|               1055343|
|               1005837|
|               UNKNOWN|
|                 PHONE|
|                ONLINE|
+----------------------+

In [41]:
## Renaming the column header 

dataFrame = dataFrame.withColumnRenamed("Open Data Channel Type", "Open_Data_Channel_Type")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
## Refining the dataframe

dataFrame = dataFrame.filter((dataFrame.Open_Data_Channel_Type == 'MOBILE') | (dataFrame.Open_Data_Channel_Type == 'PHONE') | (dataFrame.Open_Data_Channel_Type == 'ONLINE'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
dataFrame.select('Open_Data_Channel_Type').distinct().show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------+
|Open_Data_Channel_Type|
+----------------------+
|                MOBILE|
|                 PHONE|
|                ONLINE|
+----------------------+

In [44]:
## Checking final count of records in the dataFrame 

dataFrame.count()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1896599

### The code below indexes each categorical column using the StringIndexer, and then converts the indexed categories into one-hot encoded variables. The resulting output has the binary vectors appended to the end of each row. StringIndexer is used again to encode labels to label indices.

In [45]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler

#categoricalColumns = ["Agency Name", "Complaint Type", "Incident Zip", "Address Type", "Borough", "Open Data Channel Type"]

categoricalColumns = ["Complaint_Type", "Borough", "Open_Data_Channel_Type"]


stages = [] # stages in our Pipeline

for categoricalCol in categoricalColumns:
    # Category Indexing with StringIndexer
    stringIndexer = StringIndexer(inputCol=categoricalCol, outputCol=categoricalCol + "Index")
    # Use OneHotEncoder to convert categorical variables into binary SparseVectors
    # encoder = OneHotEncoderEstimator(inputCol=categoricalCol + "Index", outputCol=categoricalCol + "classVec")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    # Add stages.  These are not run here, but will run all at once later on.
    stages += [stringIndexer, encoder]



VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [46]:
# Converting label into label indices using the StringIndexer

label_stringIdx = StringIndexer(inputCol="Resolution_Time_Hours", outputCol="label")
stages += [label_stringIdx]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Use a VectorAssembler to combine all the feature columns into a single vector column. This includes both the numeric columns and the one-hot encoded binary vector columns in our dataset.

In [47]:
# Transform all features into a vector using VectorAssembler

#numericCols = ["Complaint_Created_Day", "Complaint_Created_Month", "Complaint_Created_DayOfWeek", "Complaint_Created_Hour"]

numericCols = ["Complaint_Created_Day"]



assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Run the stages as a Pipeline. This puts the data through all of the feature transformations we described in a single call.

In [48]:
partialPipeline = Pipeline().setStages(stages)
pipelineModel = partialPipeline.fit(dataFrame)
preppedDataDF = pipelineModel.transform(dataFrame)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
# Keeping relevant columns

selectedcols = ["label", "features"]
dataFrame = preppedDataDF.select(selectedcols)
#dataFrame.limit(5).toPandas()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [50]:
### Randomly split data into training and test sets. set seed for reproducibility

(trainingData, testData) = dataFrame.randomSplit([0.7, 0.3], seed=100)
#print(trainingData.count())
#print(testData.count())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## RandomForest Regressor

In [51]:
from pyspark.ml.regression import RandomForestRegressor

# Create an initial RandomForest model.
rf = RandomForestRegressor(labelCol="label", featuresCol="features")

# Train model with Training Data
rfModel = rf.fit(trainingData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [52]:
#rfModel.featureImportances

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [53]:
def ExtractFeatureImp(featureImp, dataset, featuresCol):
    list_extract = []
    for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
        list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
    varlist = pd.DataFrame(list_extract)
    varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
    return(varlist.sort_values('score', ascending = False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [54]:
ExtractFeatureImp(rfModel.featureImportances, preppedDataDF, "features").head(15)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

   idx                                    name     score
5    4               BoroughclassVec_MANHATTAN  0.376852
6    5                   BoroughclassVec_BRONX  0.264449
1    0            Complaint_TypeclassVec_Noise  0.199831
3    2                BoroughclassVec_BROOKLYN  0.049985
7    6    Open_Data_Channel_TypeclassVec_PHONE  0.037339
4    3                  BoroughclassVec_QUEENS  0.035270
2    1  Complaint_TypeclassVec_Illegal Parking  0.025610
8    7   Open_Data_Channel_TypeclassVec_ONLINE  0.007465
0    8                   Complaint_Created_Day  0.003199

In [55]:
# Make predictions on test data using the transform() method.

predictions = rfModel.transform(testData)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [56]:
# Select example rows to display.

#predictions.select("prediction", "label", "features").limit(5).toPandas()

predictions.select("prediction", "label", "features").show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------+-----+--------------------+
|        prediction|label|            features|
+------------------+-----+--------------------+
| 192.4606066101204|  0.0|(9,[0,2,6,8],[1.0...|
|192.33652446118847|  0.0|(9,[0,2,6,8],[1.0...|
|187.64689838083058|  0.0|(9,[0,2,7,8],[1.0...|
|187.81114287180682|  0.0|(9,[0,2,7,8],[1.0...|
|187.21141688461086|  0.0|(9,[0,2,8],[1.0,1...|
| 187.8628917797629|  0.0|(9,[0,2,8],[1.0,1...|
| 187.8628917797629|  0.0|(9,[0,2,8],[1.0,1...|
| 197.9197281340874|  0.0|(9,[0,3,6,8],[1.0...|
| 190.6034838844941|  0.0|(9,[0,3,7,8],[1.0...|
| 190.6034838844941|  0.0|(9,[0,3,7,8],[1.0...|
+------------------+-----+--------------------+
only showing top 10 rows

In [57]:
# Select (prediction, true label) and compute test error

from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")

rmse = evaluator.evaluate(predictions)

#print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [58]:
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root Mean Squared Error (RMSE) on test data = 133.712

# GBT Regressor

In [59]:
from pyspark.ml.regression import GBTRegressor

gbt = GBTRegressor(featuresCol = 'features', labelCol = 'label', maxIter=10)
gbt_model = gbt.fit(trainingData)
gbt_predictions = gbt_model.transform(testData)

#gbt_predictions.select('prediction', 'label', 'features').show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [60]:
gbt_evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = gbt_evaluator.evaluate(gbt_predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root Mean Squared Error (RMSE) on test data = 133.697

## From the above analysis, it can be seen that the derived metrics are significantly off the actual labels. Hence the results for modelling are inconclusive in this study.