In [1]:
import pyspark
import pandas as pd
import numpy as np
from pydataset import data

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [3]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/07/06 08:48:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


1) Read the case, department, and source data into their own spark dataframes.

In [4]:
import env

In [5]:
url = env.get_db_url('311_data')

In [6]:
source = pd.read_sql('''SELECT * FROM source''', url, index_col='index')
dept = pd.read_sql('''SELECT * FROM dept''', url)
case = pd.read_sql('''SELECT * FROM cases''', url)

In [7]:
source = spark.createDataFrame(source)
dept = spark.createDataFrame(dept)
case = spark.createDataFrame(case)

2) Let's see how writing to the local disk works in spark:

* Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
* Inspect your folder structure. What do you notice?

In [8]:
source.write.json('sources_json', mode='overwrite')

                                                                                

In [9]:
source.write.csv('sources_csv', mode='overwrite')

Both folders saved have partitioned the data into several files.

3) Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [10]:
source.show(2)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
+---------+----------------+
only showing top 2 rows



In [11]:
source.schema

StructType([StructField('source_id', StringType(), True), StructField('source_username', StringType(), True)])

In [12]:
dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



In [13]:
dept.schema

StructType([StructField('dept_division', StringType(), True), StructField('dept_name', StringType(), True), StructField('standardized_dept_name', StringType(), True), StructField('dept_subject_to_SLA', StringType(), True)])

In [14]:
dept = dept.withColumn('dept_subject_to_SLA', expr('dept_subject_to_SLA == "YES"'))

In [15]:
dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|               true|
|          Brush|Solid Waste Manag...|           Solid Waste|               true|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



In [16]:
case.show(2, vertical=True)

23/07/06 08:50:34 WARN TaskSetManager: Stage 5 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 5:>                                                          (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

23/07/06 08:50:38 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 5 (TID 19): Attempting to kill Python Worker
                                                                                

In [17]:
case.schema

StructType([StructField('case_id', LongType(), True), StructField('case_opened_date', StringType(), True), StructField('case_closed_date', StringType(), True), StructField('SLA_due_date', StringType(), True), StructField('case_late', StringType(), True), StructField('num_days_late', DoubleType(), True), StructField('case_closed', StringType(), True), StructField('dept_division', StringType(), True), StructField('service_request_type', StringType(), True), StructField('SLA_days', DoubleType(), True), StructField('case_status', StringType(), True), StructField('source_id', StringType(), True), StructField('request_address', StringType(), True), StructField('council_district', LongType(), True)])

In [18]:
case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

In [19]:
fmt = "M/d/yy H:mm"
case = (
    case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt))
)

case.show(2, vertical=True)

23/07/06 08:50:39 WARN TaskSetManager: Stage 6 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 6:>                                                          (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-01 00:46:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | NO                   
 num_days_late        | -2.0126041

23/07/06 08:50:43 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 6 (TID 20): Attempting to kill Python Worker
                                                                                

In [20]:
case = case.withColumn('case_late', expr('case_late == "YES"'))\
            .withColumn('case_closed', expr('case_closed == "YES"'))\
            .withColumn('council_district', col('council_district').cast(StringType()))

In [21]:
case.show(2, vertical=True)

23/07/06 08:50:43 WARN TaskSetManager: Stage 7 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 7:>                                                          (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616         
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-01 00:46:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

23/07/06 08:50:47 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 7 (TID 21): Attempting to kill Python Worker
                                                                                

---------------------------

1) How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [22]:
case.printSchema()

root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [23]:
case.filter('! case_closed').filter('! isnan("num_days_late")')\
.select('case_opened_date', 'case_due_date', 'num_days_late').sort('num_days_late', ascending=False).count()

23/07/06 08:50:47 WARN TaskSetManager: Stage 8 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

18110

In [24]:
case.filter(case.case_closed).sort('num_days_late', ascending=False).show(vertical=True)

23/07/06 08:50:55 WARN TaskSetManager: Stage 11 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.

-RECORD 0------------------------------------
 case_id              | 1013936527           
 case_opened_date     | 2017-10-09 12:11:00  
 case_closed_date     | 2017-10-09 12:11:00  
 case_due_date        | null                 
 case_late            | false                
 num_days_late        | NaN                  
 case_closed          | true                 
 dept_division        | District 1           
 service_request_type | Request for Resea... 
 SLA_days             | NaN                  
 case_status          | Closed               
 source_id            | 140508               
 request_address      | 1013  SACRAMENTO,... 
 council_district     | 1                    
-RECORD 1------------------------------------
 case_id              | 1014565742           
 case_opened_date     | 2018-06-01 11:52:00  
 case_closed_date     | 2018-06-01 11:52:00  
 case_due_date        | null                 
 case_late            | false                
 num_days_late        | NaN       

                                                                                

In [25]:
case.filter(case.case_closed == False).filter(case.num_days_late != 'NaN')\
.show(vertical=True)

23/07/06 08:51:02 WARN TaskSetManager: Stage 12 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 12:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014128388           
 case_opened_date     | 2018-01-02 09:39:00  
 case_closed_date     | 2018-01-02 09:39:00  
 case_due_date        | 2018-01-09 09:39:00  
 case_late            | true                 
 num_days_late        | 211.5974884          
 case_closed          | false                
 dept_division        | 311 Call Center      
 service_request_type | Complaint            
 SLA_days             | 7.0                  
 case_status          | Open                 
 source_id            | mt13131              
 request_address      | 7326  WESTGLADE P... 
 council_district     | 6                    
-RECORD 1------------------------------------
 case_id              | 1014128790           
 case_opened_date     | 2018-01-02 10:49:00  
 case_closed_date     | 2018-01-02 10:49:00  
 case_due_date        | 2018-05-10 10:49:00  
 case_late            | true                 
 num_days_late        | 90.5492592

23/07/06 08:51:06 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 12 (TID 39): Attempting to kill Python Worker
                                                                                

2) How many Stray Animal cases are there?

In [26]:
case.filter(case.service_request_type == 'Stray Animal').count()

23/07/06 08:51:06 WARN TaskSetManager: Stage 13 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

26760

3) How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [27]:
case.filter(case.dept_division == 'Field Operations').filter(case.service_request_type != 'Officer Standby').count()

23/07/06 08:51:07 WARN TaskSetManager: Stage 16 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

113902

4) Convert the council_district column to a string column.

In [28]:
# case = case.withColumn('council_district', col('council_district').cast(StringType()))

5) Extract the year from the case_closed_date column.

In [29]:
case.withColumn('year', year('case_closed_date')).select('year').show()

23/07/06 08:51:09 WARN TaskSetManager: Stage 19 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 19:>                                                         (0 + 1) / 1]

+----+
|year|
+----+
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
|2018|
+----+
only showing top 20 rows



23/07/06 08:51:13 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 19 (TID 58): Attempting to kill Python Worker
                                                                                

6) Convert num_days_late from days to hours in new columns num_hours_late.

In [30]:
case.selectExpr('num_days_late * 24 AS num_hours_late').show(2)

23/07/06 08:51:13 WARN TaskSetManager: Stage 20 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 20:>                                                         (0 + 1) / 1]

+-------------------+
|     num_hours_late|
+-------------------+
|-23964.210278399998|
|      -48.302500008|
+-------------------+
only showing top 2 rows



23/07/06 08:51:17 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 20 (TID 59): Attempting to kill Python Worker
                                                                                

7) Join the case data with the source and department data.

In [31]:
case.join(source, on='source_id').join(dept, on='dept_division').show(1)

23/07/06 08:51:17 WARN TaskSetManager: Stage 21 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 27:>                                                         (0 + 8) / 8]

+-------------+---------+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+--------------------+--------+-----------+--------------------+----------------+---------------+--------------------+----------------------+-------------------+
|dept_division|source_id|   case_id|   case_opened_date|   case_closed_date|      case_due_date|case_late|num_days_late|case_closed|service_request_type|SLA_days|case_status|     request_address|council_district|source_username|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+-------------+---------+----------+-------------------+-------------------+-------------------+---------+-------------+-----------+--------------------+--------+-----------+--------------------+----------------+---------------+--------------------+----------------------+-------------------+
|Miscellaneous|   141239|1014129438|2018-01-02 12:16:00|2018-01-02 12:16:00|2018-01-04 12:16:00|    false| -1.935416667| 

                                                                                

8) Are there any cases that do not have a request source?

In [32]:
case.filter(case.source_id == np.nan).show()

23/07/06 08:51:21 WARN TaskSetManager: Stage 32 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
23/07/06 08:51:22 WARN TaskSetManager: Stage 33 contains a task of very large size (19002 KiB). The maximum recommended task size is 1000 KiB.
23/07/06 08:51:23 WARN TaskSetManager: Stage 34 contains a task of very large size (19069 KiB). The maximum recommended task size is 1000 KiB.

+-------+----------------+----------------+-------------+---------+-------------+-----------+-------------+--------------------+--------+-----------+---------+---------------+----------------+
|case_id|case_opened_date|case_closed_date|case_due_date|case_late|num_days_late|case_closed|dept_division|service_request_type|SLA_days|case_status|source_id|request_address|council_district|
+-------+----------------+----------------+-------------+---------+-------------+-----------+-------------+--------------------+--------+-----------+---------+---------------+----------------+
+-------+----------------+----------------+-------------+---------+-------------+-----------+-------------+--------------------+--------+-----------+---------+---------------+----------------+



                                                                                

9) What are the top 10 service request types in terms of number of requests?

In [33]:
case.groupBy('service_request_type').count().sort('count', ascending=False).show(10)

23/07/06 08:51:24 WARN TaskSetManager: Stage 35 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 35:>                                                         (0 + 8) / 8]

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|86855|
|Overgrown Yard/Trash|65895|
|        Bandit Signs|32910|
|        Damaged Cart|30338|
|Front Or Side Yar...|28794|
|        Stray Animal|26760|
|Aggressive Animal...|24882|
|Cart Exchange Req...|22024|
|Junk Vehicle On P...|21473|
|     Pot Hole Repair|20616|
+--------------------+-----+
only showing top 10 rows



                                                                                

10) What are the top 10 service request types in terms of average days late?

In [34]:
case.groupBy('service_request_type').agg(mean('num_days_late').alias('avg_days_late')).sort('avg_days_late').show(10)

23/07/06 08:51:25 WARN TaskSetManager: Stage 38 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 38:>                                                         (0 + 8) / 8]

+--------------------+-------------------+
|service_request_type|      avg_days_late|
+--------------------+-------------------+
|  Engineering Design|-1399.1272334999999|
|Signal Timing Mod...|-1247.0797799732143|
|        Stray Animal| -998.8045726160839|
|Major Park Improv...| -280.2546235360405|
|Sidewalk Cost Sha...|-184.87626063647144|
|Multi Tenant Exte...|-135.71588128047625|
|   CPS Energy Towers|-129.84778717829747|
|CPS Energy Wood P...| -129.3090520272122|
|CPS Energy Metal ...|-129.17919786427768|
|Multi Tenant Inte...| -125.1431856354651|
+--------------------+-------------------+
only showing top 10 rows



                                                                                

11) Does number of days late depend on department?

In [36]:
case.groupBy('dept_division').agg(mean('num_days_late').alias('avg_days_late')).sort('avg_days_late', ascending=False).show()

23/07/06 08:54:01 WARN TaskSetManager: Stage 41 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 41:>                                                         (0 + 8) / 8]

+--------------------+-------------------+
|       dept_division|      avg_days_late|
+--------------------+-------------------+
|          District 2|                NaN|
|          District 1|                NaN|
|          District 7|                NaN|
|          District 3|                NaN|
|          District 6|                NaN|
|          District 9|                NaN|
|          District 8|                NaN|
|         District 10|                NaN|
|Code Enforcement ...| 135.92851612479797|
|        Reservations|        66.03116319|
|     311 Call Center|  59.49019459221512|
|Director's Office...|  36.87389387687183|
|Engineering Division| 13.148054014077813|
|               Shops| 10.114974371919644|
|           Tree Crew| 4.7453029838890215|
|         Solid Waste|  3.541793412154981|
|              Trades| 3.1521799543913445|
|Clean and Green N...|  1.691468919487805|
|              Vector|-1.1180635370092933|
|    Facility License| -1.433144064152706|
+----------

                                                                                

12) How do number of days late depend on department and request type?

In [37]:
case.groupBy('dept_division', 'service_request_type').agg(mean('num_days_late').alias('avg_days_late'))\
.sort('avg_days_late', ascending=False).show()

23/07/06 08:54:06 WARN TaskSetManager: Stage 44 contains a task of very large size (18868 KiB). The maximum recommended task size is 1000 KiB.
[Stage 44:>                                                         (0 + 8) / 8]

+--------------------+--------------------+------------------+
|       dept_division|service_request_type|     avg_days_late|
+--------------------+--------------------+------------------+
|          District 2|Request for Resea...|               NaN|
|          District 1|Request for Resea...|               NaN|
|          District 9|Request for Resea...|               NaN|
|          District 7|Request for Resea...|               NaN|
|          District 8|Request for Resea...|               NaN|
|          District 6|Request for Resea...|               NaN|
|          District 3|Request for Resea...|               NaN|
|         District 10|CCO_Request for R...|               NaN|
|    Code Enforcement|  Zoning: Junk Yards|175.95636210420952|
|Code Enforcement ...|Labeling for Used...|162.43032902285717|
|Code Enforcement ...|Record Keeping of...| 153.9972403942857|
|Code Enforcement ...|Signage Requied f...|151.63868055333333|
|Code Enforcement ...|Storage of Used M...|142.11255641

                                                                                