# Wrangle

This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

### Part 1

__Read the case, department, and source data into their own spark dataframes.__

In [1]:
import numpy as np
import pandas as pd
import pyspark

In [2]:
#Start the spark cluster
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
#Create the case spark df
case_df = spark.read.csv("case.csv", inferSchema = True, header = True)

In [4]:
case_df.show(2, truncate = False, vertical = True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

In [5]:
#Create the dept spark df
dept_df = spark.read.csv('dept.csv', header = True, inferSchema = True)

In [6]:
dept_df.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [7]:
#Create the source spark df
source_df = spark.read.csv('source.csv', header = True, inferSchema = True)

In [8]:
source_df.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



__Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json__

In [9]:
#Save as csv
source_df.write.csv('sources_csv', mode = 'overwrite')

In [10]:
#Save as json
source_df.write.json('sources_json', mode = 'overwrite')

__Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.__

In [11]:
#Check case_df
case_df.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



Things to change:
* I think case_id should be a string, not an integer
* case_opened_date should be a datetime object
* case_closed_date should be a datetime object
* SLA_due_date should be a datetime object
* case_late should be a boolean
* num_days_late should be an int
* case_closed should be a boolean
* SLA_days should be an int
* Although there's nothing inherently wrong with request_address, I would like to go ahead and create new columns for city and zip code.
* council_district should be a string.

In [12]:
#Rename SLA_due_date to case_due_date to match the other column names
case_df = case_df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [13]:
from pyspark.sql.functions import col

#Cast case_id, and council_district as strings
case_df = case_df.withColumn('case_id', col('case_id').cast('string')).withColumn('council_district', col('council_district').cast('string'))

In [14]:
from pyspark.sql.functions import to_timestamp

#Convert the dates to datetime objects
#First, determine the format 
fmt = "M/d/yy H:mm"
case_df = (
    case_df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)

In [15]:
from pyspark.sql.functions import expr

#Convert case_closed and case_late to booleans
case_df = (
    case_df.withColumn('case_closed', expr("case_closed == 'Yes'"))
    .withColumn('case_late', expr("case_late == 'Yes'"))
)

In [16]:
#Convert num_days_late and SLA_days to ints
case_df = (
    case_df.withColumn('num_days_late', col('num_days_late').cast('integer'))
    .withColumn('SLA_days', col('SLA_days').cast('integer'))
)

In [17]:
from pyspark.sql.functions import regexp_extract, trim, lower

#Strip all leading and trailing whitespace from the request_address and convert to lowercase
case_df = case_df.withColumn('request_address', trim(lower(case_df.request_address)))

#Now create new columns for city and zip code
case_df = (
    case_df.withColumn('zip_code', regexp_extract('request_address', r'\w{5}$', 0))
    .withColumn('city', regexp_extract('request_address', r', (.*),', 1))
)

In [18]:
from pyspark.sql.functions import format_string

#Format the council district string so that there are leading 0s
case_df = case_df.withColumn('council_district', format_string('%03d', col('council_district').cast('integer')))

In [19]:
#Now check the schema for the dept_df
dept_df.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



Things to Change:
* dept_subject_to_SLA should be a boolean

In [20]:
#Convert dept_subject_to_SLA to a boolean
dept_df = dept_df.withColumn('dept_subject_to_SLA', expr("dept_subject_to_SLA == 'Yes'"))

In [21]:
#Now check the schema for source_df
source_df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



Things to Change:
* Nothing. These datatypes seem fine.

### Part 2

__1) How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?__

In [30]:
from pyspark.sql.functions import datediff, current_timestamp

(
    case_df.select(datediff(current_timestamp(), 'case_due_date')
    .alias('days_past_due'))
    .where(case_df.case_closed == False)
    .sort(col('days_past_due').desc())
    .show(5)
)

+-------------+
|days_past_due|
+-------------+
|         1773|
|         1773|
|         1773|
|         1773|
|         1773|
+-------------+
only showing top 5 rows



__2) How many Stray Animal cases are there?__

In [31]:
case_df.filter(case_df.service_request_type == 'Stray Animal').count()

26760

__3) How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?__

In [33]:
(
    case_df.filter(case_df.dept_division == 'Field Operations')
    .filter(case_df.service_request_type != 'Officer Standby')
    .count()
)

113902

__4) Convert the council_district column to a string column.__

This was already done in part 1.

__5) Extract the year from the case_closed_date column.__

In [35]:
from pyspark.sql.functions import month, year, quarter

case_df.select('case_closed_date', year('case_closed_date')).show(5)

+-------------------+----------------------+
|   case_closed_date|year(case_closed_date)|
+-------------------+----------------------+
|2018-01-01 12:29:00|                  2018|
|2018-01-03 08:11:00|                  2018|
|2018-01-02 07:57:00|                  2018|
|2018-01-02 08:13:00|                  2018|
|2018-01-01 13:29:00|                  2018|
+-------------------+----------------------+
only showing top 5 rows



__6) Convert num_days_late from days to hours in new columns num_hours_late.__

In [37]:
(
    case_df.withColumn('num_hours_late', case_df.num_days_late * 24)
    .select('num_days_late', 'num_hours_late')
    .show(10)
)

+-------------+--------------+
|num_days_late|num_hours_late|
+-------------+--------------+
|         -998|        -23952|
|           -2|           -48|
|           -3|           -72|
|          -15|          -360|
|            0|             0|
|          -29|          -696|
|          -14|          -336|
|          -14|          -336|
|          -14|          -336|
|          -14|          -336|
+-------------+--------------+
only showing top 10 rows



__7) Join the case data with the source and department data.__

In [39]:
case_df = case_df.join(dept_df, 'dept_division', 'left').join(source_df, 'source_id', 'left')

__8) Are there any cases that do not have a request source?__

In [41]:
from pyspark.sql.functions import sum

(
    case_df.select(case_df.source_id.isNull().cast('int').alias('is_null'))
    .agg(sum('is_null'))
    .show()
)

+------------+
|sum(is_null)|
+------------+
|           0|
+------------+



__9) What are the top 10 service request types in terms of number of requests?__

In [42]:
(
    case_df.groupby('service_request_type')
    .count()
    .sort(col('count').desc())
    .show(10, truncate=False)
)

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



__10) What are the top 10 service request types in terms of average days late?__

In [53]:
from pyspark.sql.functions import mean, count, desc

(
    case_df.where('case_late') # just the rows where case_late == true
    .groupBy('service_request_type')
    .agg(mean('num_days_late').alias('n_days_late'), count('*').alias('n_cases'))
    .sort(desc('n_days_late'))
    .show(10, truncate=False)
)

+--------------------+-----------+-------+
|service_request_type|n_days_late|n_cases|
+--------------------+-----------+-------+
+--------------------+-----------+-------+



__11) Does number of days late depend on department?__

In [50]:
from pyspark.sql.functions import round

(
    case_df.filter('case_late')
    .groupby('dept_name')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

+---------+---------+------------+
|dept_name|days_late|n_cases_late|
+---------+---------+------------+
+---------+---------+------------+



In [51]:
case_df.groupby('dept_name').count().show(truncate=False)

+-------------------------+------+
|dept_name                |count |
+-------------------------+------+
|Animal Care Services     |119362|
|Solid Waste Management   |286287|
|Development Services     |1397  |
|Trans & Cap Improvements |97841 |
|Customer Service         |2889  |
|Metro Health             |5313  |
|Parks and Recreation     |19964 |
|Code Enforcement Services|321984|
|City Council             |34    |
|null                     |198   |
+-------------------------+------+



__12) How do number of days late depend on department and request type?__

In [55]:
from pyspark.sql.functions import avg

(
    case_df.filter("case_closed")
#     .filter("case_late")
    .groupby("standardized_dept_name", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .where(col('days_late') > 0)
    .sort(desc("days_late"))
    .show(40, truncate=False)
)

+----------------------+--------------------+---------+-------+
|standardized_dept_name|service_request_type|days_late|n_cases|
+----------------------+--------------------+---------+-------+
+----------------------+--------------------+---------+-------+

