<h3>Creating a spark session</h3>

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.ml.feature import *

spark = SparkSession.builder.appName("Project").getOrCreate()
sc = spark.sparkContext

In [2]:
sc

<h3>Loading csv into a dataframe</h3>

In [3]:
df_data = spark.read.format("csv") \
.option("header", "true") \
.option("infer_schema","true")\
.option("first_row_is_header","true")\
.load(r"C:\Users\myste\Documents\MSCS\Spring 2020\Big Data\nyc311-2010\311_Service_Requests_from_2010_to_Present.csv")
df_data.createOrReplaceTempView("df_data")
df_data.show()

+----------+--------------------+--------------------+------+--------------------+--------------------+--------------------+-------------------+------------+--------------------+------------------+------------------+------------------+---------------------+---------------------+------------+---------------+--------+-------------+------+--------------------+----------------------+------------------------------+---------------+---------+--------------------------+--------------------------+------------------+------------+-----------+-------------+-------------+-----------+-------------------+--------------+-----------+------------+-----------+----------------+----------------------------+------------+--------------------+---------------------+-------------------+------------------------+---------+----------------------+---------------+---------------+-------------------+------------------+------------------+--------------------+
|Unique Key|        Created Date|         Closed Date|Agenc

<b>Displaying columns in the loaded dataframe</b>

In [4]:
df_data.columns

['Unique Key',
 'Created Date',
 'Closed Date',
 'Agency',
 'Agency Name',
 'Complaint Type',
 'Descriptor',
 'Location Type',
 'Incident Zip',
 'Incident Address',
 'Street Name',
 'Cross Street 1',
 'Cross Street 2',
 'Intersection Street 1',
 'Intersection Street 2',
 'Address Type',
 'City',
 'Landmark',
 'Facility Type',
 'Status',
 'Due Date',
 'Resolution Description',
 'Resolution Action Updated Date',
 'Community Board',
 'Borough',
 'X Coordinate (State Plane)',
 'Y Coordinate (State Plane)',
 'Park Facility Name',
 'Park Borough',
 'School Name',
 'School Number',
 'School Region',
 'School Code',
 'School Phone Number',
 'School Address',
 'School City',
 'School State',
 'School Zip',
 'School Not Found',
 'School or Citywide Complaint',
 'Vehicle Type',
 'Taxi Company Borough',
 'Taxi Pick Up Location',
 'Bridge Highway Name',
 'Bridge Highway Direction',
 'Road Ramp',
 'Bridge Highway Segment',
 'Garage Lot Name',
 'Ferry Direction',
 'Ferry Terminal Name',
 'Latitude',


<h3>Making a new dataframe with Relevant Columns</h3>
<br> Removing the columns that are not required for the purpose of our problem.

In [6]:
df_311 = df_data.select([c for c in df_data.columns if c in ['Created Date', 'Closed Date', 'Complaint Type', 'Descriptor', 'Location Type', 
             'Incident Zip', 'Status', 'Resolution Description', 'Resolution Action Updated Date',
             'Latitude', 'Longitude','Borough','City']])
df_311.show()

+--------------------+--------------------+--------------------+--------------------+-------------------+------------+---------------+------+----------------------+------------------------------+---------+------------------+------------------+
|        Created Date|         Closed Date|      Complaint Type|          Descriptor|      Location Type|Incident Zip|           City|Status|Resolution Description|Resolution Action Updated Date|  Borough|          Latitude|         Longitude|
+--------------------+--------------------+--------------------+--------------------+-------------------+------------+---------------+------+----------------------+------------------------------+---------+------------------+------------------+
|12/31/2015 11:59:...|01/01/2016 12:55:...|Noise - Street/Si...|    Loud Music/Party|    Street/Sidewalk|       10034|       NEW YORK|Closed|  The Police Depart...|          01/01/2016 12:55:...|MANHATTAN| 40.86568153633767|-73.92350095571744|
|12/31/2015 11:59:...|01

<h3> Removing unnecessary columns like - School Name, School Zip, etc</h3> 
<br>As they mostly have Null values (which can be seen above), and aren't useful in our analysis. 
<br>If we keep these columns, replacing them with mean of other values in the columns doesn't make sense.
<br>And droping rows based on null values in these columns will result in many rows being dropped.
<br>Hence we want to keep only relevant columns  

In [7]:
df_311.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_311.columns]).show()

+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+
|Created Date|Closed Date|Complaint Type|Descriptor|Location Type|Incident Zip|City|Status|Resolution Description|Resolution Action Updated Date|Borough|Latitude|Longitude|
+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+
|           0|       2381|             0|         0|          133|        2997|2997|     0|                     0|                          2402|      0|    4030|     4030|
+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+



<h3>Dropping rows with null values</h3> 
Given the nature of the data (Zip Codes, complaint types, complain location, etc), any preprocessing technique which replaces the null values with a value computed based on other values from the column (eg - highest frequency, mean, etc) will result in incorrect approximations, as the columns do not carry values like precepetation index or humidity level which follows a pattern.

In [8]:
df_311 = df_311.dropna(subset=['Created Date', 'Closed Date', 'Latitude', 'Longitude'])

In [9]:
df_311.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_311.columns]).show()

+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+
|Created Date|Closed Date|Complaint Type|Descriptor|Location Type|Incident Zip|City|Status|Resolution Description|Resolution Action Updated Date|Borough|Latitude|Longitude|
+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+
|           0|          0|             0|         0|          122|          41|  41|     0|                     0|                            39|      0|       0|        0|
+------------+-----------+--------------+----------+-------------+------------+----+------+----------------------+------------------------------+-------+--------+---------+



Null values in columns like 'created date', 'closed date', 'latitude', 'longitude', etc would not ve acceptable for the prediction model we intend to build. But null values in columns like 'Descriptor', 'Location Type', 'City' would not make a difference to our model 

<h3>Clubbing: Complaints, Resolution Types and Location Types together</h3>
<br> We found that most of the types under these columns could be clubbed under a main catagory. This will improve the analysis.

In [10]:
complaint_type = {'Noise':['Noise - Commercial','Noise - House of Worship','Noise - Park',\
                       'Noise - Street/Sidewalk','Noise - Vehicle'], \
              'Animal Abuse' : ['Animal Abuse'], \
              'Street Conditions' : ['Derelict Vehicle', 'Graffiti', 'Posting Advertisement', 'Sidewalk Condition', \
                                     'Street Condition', 'Homeless Encampment','Panhandling', 'Vending'], \
              'Parking Conditions' : ['Blocked Driveway', 'Illegal Parking'], \
              'Disorderly Behavior' : ['Disorderly Youth', 'Drinking', 'Illegal Fireworks','Urinating in Public', \
                                      'Bike/Roller/Skate Chronic', 'Drug Activity', 'Squeegee'], \
              'Traffic' : ['Traffic']}

In [11]:
resolution_type = {'Police Department Acted':['The Police Department issued a summons in response to the complaint.',
                                       'The Police Department made an arrest in response to the complaint.',
                                       'The Police Department responded to the complaint and took action to fix the condition.',
                                        'The Police Department responded to the complaint and a report was prepared.',
                                       'The Police Department reviewed your complaint and provided additional information below.'],
            'Police Department Unable to Act': ['The Police Department responded and upon arrival those responsible for the condition were gone.',
                                       'The Police Department responded to the complaint but officers were unable to gain entry into the premises.',
                                       "This complaint does not fall under the Police Department's jurisdiction.",
                                        'Your request can not be processed at this time because of insufficient contact information. Please create a new Service Request on NYC.gov and provide more detailed contact information.',
                                        "Your complaint has been forwarded to the New York Police Department for a non-emergency response. If the police determine the vehicle is illegally parked, they will ticket the vehicle and then you may either contact a private towing company to remove the vehicle or ask your local precinct to contact 'rotation tow'. Any fees charged for towing will have to be paid by the vehicle owner. 311 will have additional information in 8 hours. Please note your service request number for future reference."],
            'False Alarm': ['The Police Department responded to the complaint and determined that police action was not necessary.',
                           'The Police Department responded to the complaint and with the information available observed no evidence of the violation at that time.']}

In [12]:
location_type = {
              'Residential' : ['Residential Building/House', 'Residential Building', 'House and Store'], \
              'Commercial' : ['Store/Commercial', 'Commercial'], \
              'Club/Bar/Restaurant' : ['Club/Bar/Restaurant'], \
              'Parking Lot' : ['Parking Lot', 'Vacant Lot'], \
              'Roadway/Highway/Bridge' : ['Bridge', 'Highway', 'Roadway Tunnel'], \
              'House of Worship' : ['House of Worship'],
              'Street/Sidewalk' : ['Street/Sidewalk'],
              'Subway Station' : ['Subway Station'],
              'Park/Playground' : ['Park/Playground']}

In [13]:
@udf
def subgroup_type(entry,subgroups=complaint_type):
    for key,comp_list in subgroups.items():
        if entry in comp_list:
            return key
df_311 = df_311.withColumn("Complaint_Type_Groups",(subgroup_type(lit(df_311["Complaint Type"]))))

In [14]:
@udf
def subgroup_type(entry,subgroups=resolution_type):
    for key,comp_list in subgroups.items():
        if entry in comp_list:
            return key
df_311 = df_311.withColumn("Resolution_Type_Groups",(subgroup_type(lit(df_311["Resolution Description"]))))

In [15]:
@udf 
def subgroup_type(entry,subgroups=location_type):
    for key,comp_list in subgroups.items():
        if entry in comp_list:
            return key
df_311 = df_311.withColumn("Location_Type_Groups",(subgroup_type(lit(df_311["Location Type"]))))
df_311.show()

+--------------------+--------------------+--------------------+--------------------+-------------------+------------+---------------+------+----------------------+------------------------------+---------+------------------+------------------+---------------------+----------------------+--------------------+
|        Created Date|         Closed Date|      Complaint Type|          Descriptor|      Location Type|Incident Zip|           City|Status|Resolution Description|Resolution Action Updated Date|  Borough|          Latitude|         Longitude|Complaint_Type_Groups|Resolution_Type_Groups|Location_Type_Groups|
+--------------------+--------------------+--------------------+--------------------+-------------------+------------+---------------+------+----------------------+------------------------------+---------+------------------+------------------+---------------------+----------------------+--------------------+
|12/31/2015 11:59:...|01/01/2016 12:55:...|Noise - Street/Si...|    Lo

<b>Displaying columns after processing the data.</b>

In [16]:
df_311.columns

['Created Date',
 'Closed Date',
 'Complaint Type',
 'Descriptor',
 'Location Type',
 'Incident Zip',
 'City',
 'Status',
 'Resolution Description',
 'Resolution Action Updated Date',
 'Borough',
 'Latitude',
 'Longitude',
 'Complaint_Type_Groups',
 'Resolution_Type_Groups',
 'Location_Type_Groups']

In [17]:
df_311.describe

<bound method DataFrame.describe of DataFrame[Created Date: string, Closed Date: string, Complaint Type: string, Descriptor: string, Location Type: string, Incident Zip: string, City: string, Status: string, Resolution Description: string, Resolution Action Updated Date: string, Borough: string, Latitude: string, Longitude: string, Complaint_Type_Groups: string, Resolution_Type_Groups: string, Location_Type_Groups: string]>

<h3>Convorting the relevant columns to Timestamp from String values</h3>

In [18]:
df_311_format = df_311.withColumn("Closed Date", to_timestamp(col("Closed Date"), "MM/dd/yyyy HH:mm:ss").cast("timestamp"))\
.withColumn("Created Date", to_timestamp(col("Created Date"), "MM/dd/yyyy HH:mm:ss").cast("timestamp"))\
.withColumn("Resolution Action Updated Date", to_timestamp(col("Resolution Action Updated Date"), "MM/dd/yyyy HH:mm:ss").cast("timestamp"))

In [19]:
df_311_format.describe

<bound method DataFrame.describe of DataFrame[Created Date: timestamp, Closed Date: timestamp, Complaint Type: string, Descriptor: string, Location Type: string, Incident Zip: string, City: string, Status: string, Resolution Description: string, Resolution Action Updated Date: timestamp, Borough: string, Latitude: string, Longitude: string, Complaint_Type_Groups: string, Resolution_Type_Groups: string, Location_Type_Groups: string]>

<h3>Finding the resolution times of the complaints</h3>
<br> Resolution Time = Closed Date - Created Date

In [20]:
resolution_time = col("Closed Date").cast("long") - col("Created Date").cast("long")
df_311_time = df_311_format.withColumn( "Resolution_Time_Secs", resolution_time )\
.withColumn('Resolution_Time_Mins',round(col('Resolution_Time_Secs')/60))\
  .withColumn('Resolution_Time_Hours',round(col('Resolution_Time_Secs')/3600))
df_311_time.show(truncate=False)

+-------------------+-------------------+-----------------------+-----------------------------+-------------------+------------+---------------+------+---------------------------------------------------------------------------------------------------------------------------------------+------------------------------+---------+------------------+------------------+---------------------+-------------------------------+--------------------+--------------------+--------------------+---------------------+
|Created Date       |Closed Date        |Complaint Type         |Descriptor                   |Location Type      |Incident Zip|City           |Status|Resolution Description                                                                                                                 |Resolution Action Updated Date|Borough  |Latitude          |Longitude         |Complaint_Type_Groups|Resolution_Type_Groups         |Location_Type_Groups|Resolution_Time_Secs|Resolution_Time_Mins|Resolution_

<h3>Writing the final Preprocessed file to csv for further analysis</h3>

In [21]:
df_311_time.write.format("csv").option("header","true").save(r'C:\Users\myste\Documents\MSCS\Spring 2020\Big Data\nyc311-2010\311_preprocessed')