In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

These exercises use the `case.csv`, `dept.csv`, and `source.csv` files from the San Antonio 311 call dataset.

# Exercise 1

Read the case, department, and source data into their own spark dataframes.

In [2]:
# read in data
case = spark.read.csv('case.csv', sep=',', header=True, inferSchema=True)
dept = spark.read.csv('dept.csv', sep=',', header=True, inferSchema=True)
source = spark.read.csv('source.csv', sep=',', header=True, inferSchema=True)

# Exercise 2

Let's see how writing to the local disk works in spark:

* Write the code necessary to store the source data in both `csv` and `json` format, store these as `sources_csv` and `sources_json`
* Inspect your folder structure. What do you notice?

In [3]:
# write source file to json and csv
source.write.json('sources_json', mode='overwrite')
source.write.csv('sources_csv', mode='overwrite')
# files are stored within their own folders

# Exercise 3

Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [4]:
# look at first two records for case
case.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [5]:
# check data types
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

* case_opened_date, case_closed_date, SLA_due_date will be converted to date format, from string to allow for time calculations
* case_late, case_closed will be converted to boolean values rather than 'Yes' or 'No' for ease of working with the data
* council_district will be converted from int to string since it represents a category

In [6]:
# use withColumn to make these changes
case = case.withColumn('case_opened_date', to_timestamp('case_opened_date', 'M/d/yy H:mm'))\
.withColumn('case_closed_date', to_timestamp('case_closed_date', 'M/d/yy H:mm'))\
.withColumn('case_due_date', to_timestamp('SLA_due_date', 'M/d/yy H:mm'))\
.withColumn('case_late', expr('case_late == "YES"'))\
.withColumn('case_closed', expr('case_closed == "YES"'))\
.withColumn('council_district', col('council_district').cast('string'))

In [7]:
# check data types to confirm changes were made
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('SLA_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('case_due_date', 'timestamp')]

# Part II

# Exercise 1

How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [45]:
# use select to choose the latest cases submitted that are still open. There are quite a few that were opened on the same day and are still open
case.select(col('case_id'), col('service_request_type'), col('case_opened_date'),
            (datediff(current_timestamp(), 'case_opened_date').alias('days_since_opened')))\
.where(col('case_closed') == False)\
.sort(col('days_since_opened')).show(5)

+----------+--------------------+-------------------+-----------------+
|   case_id|service_request_type|   case_opened_date|days_since_opened|
+----------+--------------------+-------------------+-----------------+
|1014758594|Overgrown Yard/Trash|2018-08-08 01:13:00|             1287|
|1014758601|Front Or Side Yar...|2018-08-08 01:50:00|             1287|
|1014758596|           No Pickup|2018-08-08 01:50:00|             1287|
|1014758597|Minimum Housing-T...|2018-08-08 01:50:00|             1287|
|1014758598|Junk Vehicle On P...|2018-08-08 01:50:00|             1287|
+----------+--------------------+-------------------+-----------------+
only showing top 5 rows



In [73]:
# use select to choose the oldest cases that are still open
case.select(col('case_id'), col('service_request_type'), col('case_opened_date'),
            (datediff(current_timestamp(), 'case_opened_date').alias('days_since_opened')))\
.where(col('case_closed') == False)\
.sort(col('days_since_opened').desc()).show(5)

+----------+--------------------+-------------------+-----------------+
|   case_id|service_request_type|   case_opened_date|days_since_opened|
+----------+--------------------+-------------------+-----------------+
|1013225651|   No Address Posted|2017-01-01 13:57:00|             1871|
|1013225646|   No Address Posted|2017-01-01 13:48:00|             1871|
|1013226813|   No Address Posted|2017-01-02 11:26:00|             1870|
|1013229328|        Bandit Signs|2017-01-03 10:01:00|             1869|
|1013232331|Street Light Exis...|2017-01-04 10:16:00|             1868|
+----------+--------------------+-------------------+-----------------+
only showing top 5 rows



In [74]:
# check that these are accurate by getting max and min dates for cases that are still open
case.where(col('case_closed') == False)\
.select(min(col('case_opened_date')), max(col('case_opened_date'))).show()

+---------------------+---------------------+
|min(case_opened_date)|max(case_opened_date)|
+---------------------+---------------------+
|  2017-01-01 13:48:00|  2018-08-08 10:38:00|
+---------------------+---------------------+



# Exercise 2

How many Stray Animal cases are there?

In [47]:
# filter cases where service request type was stray animal and get the count
case.filter(case.service_request_type == 'Stray Animal').count()

26760

# Exercise 3

How many service requests that are assigned to the Field Operations department (`dept_division`) are not classified as "Officer Standby" request type (`service_request_type`)?

In [50]:
# filter by dept division and service request type and get count
case.filter(case.dept_division == 'Field Operations')\
.filter(case.service_request_type != 'Officer Standby').count()

113902

# Exercise 4

Convert the `council_district` column to a string column.

In [None]:
# already completed in Part I Exercise 3

# Exercise 5

Extract the year from the `case_closed_date` column.

In [53]:
# select year from timestamp data
case.select(year('case_closed_date')).show()

+----------------------+
|year(case_closed_date)|
+----------------------+
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
+----------------------+
only showing top 20 rows



# Exercise 6

Convert `num_days_late` from days to hours in new columns `num_hours_late`.

In [60]:
# create a new column that calculates hours late by multiplying days times 24
case.withColumn('num_hours_late', col('num_days_late') * 24).show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 SLA_due_date         | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 case_due_date        | 2020-09-26 00:42:00  
 num_hours_late       | -23964.2102784       
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 SLA_due_date         | 1/5/18 8:3

# Exercise 7

Join the case data with the source and department data.

In [61]:
# view source data
source.show(2, vertical=True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [62]:
# view dept data
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [64]:
# join all three tables together
df = case.join(dept, 'dept_division', 'left').join(source, 'source_id', 'left')
df.show(2, vertical=True)

-RECORD 0--------------------------------------
 source_id              | svcCRMLS             
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 SLA_due_date           | 9/26/20 0:42         
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 request_address        | 2315  EL PASO ST,... 
 council_district       | 5                    
 case_due_date          | 2020-09-26 00:42:00  
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
 source_username        | svcCRMLS             
-RECORD 1-------------------------------

# Exercise 8

Are there any cases that do not have a request source?

In [66]:
# filter source_id to see if there are any null values
df.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



# Exercise 9

What are the top 10 service request types in terms of number of requests?

In [75]:
# group by service request type getting the counts for each and sorting to get the top 10
df.groupby(col('service_request_type')).count()\
.sort(col('count').desc()).show(10)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



# Exercise 10

What are the top 10 service request types in terms of average days late?

In [76]:
# group by service request type getting the counts for each and sorting to get the top 10
df.groupby(col('service_request_type'))\
.agg(mean('num_days_late').alias('days_late'))\
.sort(col('days_late').desc()).show(10)

+--------------------+------------------+
|service_request_type|         days_late|
+--------------------+------------------+
|  Zoning: Junk Yards|175.95636210420943|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|142.11255641500003|
|Zoning: Recycle Yard|135.92851612479797|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
|Traffic Signal Gr...|101.79846062200002|
|           Complaint|  72.8705023031169|
+--------------------+------------------+
only showing top 10 rows



# Exercise 11

Does number of days late depend on department?

In [93]:
# filter by cases that were late, group by dept_name and get average days late by dept, sort with values descending
df.filter('case_late').groupby('dept_name')\
.agg(round(mean('num_days_late')).alias('days_late'))\
.sort(desc('days_late')).show()

+--------------------+---------+
|           dept_name|days_late|
+--------------------+---------+
|                null|    211.0|
|    Customer Service|     88.0|
|Development Services|     67.0|
|Code Enforcement ...|     48.0|
|Animal Care Services|     23.0|
|Parks and Recreation|     22.0|
|Trans & Cap Impro...|     11.0|
|Solid Waste Manag...|      7.0|
|        Metro Health|      6.0|
+--------------------+---------+



# Exercise 12

How do number of days late depend on department and request type?

In [88]:
# filter by cases that were late, group by dept_name and service request type, get average days late by dept, sort with values descending
df.filter('case_late').groupby('dept_name', 'service_request_type')\
.agg(round(mean('num_days_late')).alias('days_late'))\
.sort(desc('days_late')).show(40, truncate=False)

+-------------------------+--------------------------------------------+---------+
|dept_name                |service_request_type                        |days_late|
+-------------------------+--------------------------------------------+---------+
|null                     |Zoning: Recycle Yard                        |211.0    |
|Code Enforcement Services|Zoning: Junk Yards                          |200.0    |
|Code Enforcement Services|Structure/Housing Maintenance               |190.0    |
|Code Enforcement Services|Donation Container Enforcement              |171.0    |
|Code Enforcement Services|Storage of Used Mattress                    |164.0    |
|Code Enforcement Services|Labeling for Used Mattress                  |162.0    |
|Code Enforcement Services|Record Keeping of Used Mattresses           |154.0    |
|Code Enforcement Services|Signage Requied for Sale of Used Mattr      |152.0    |
|Trans & Cap Improvements |Traffic Signal Graffiti                     |138.0    |
|Cod