# Data Wrangling

# Exercises
Data Acquisition
These exercises should go in a notebook or script named wrangle. Add, commit, and push your changes.

This exercises use the cases, dept, and source tables from the 311_data on the Codeup MySQL server.

# Imports

In [101]:
# standard imports
import pandas as pd


import os
import pyspark
from pydataset import data
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import month, year, quarter, round


#create the spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

# my imp
from env import get_db_url

# ignore warnings
import warnings
warnings.filterwarnings('ignore')

In [2]:
url = get_db_url("311_data")

### 1. Read the case, department, and source data into their own spark dataframes.

In [3]:
query = """SELECT * FROM cases"""
cases = pd.read_sql(query, url)
cases = spark.createDataFrame(cases)
cases.show(4)

23/07/05 14:56:50 WARN TaskSetManager: Stage 0 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 14:56:54 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 0 (TID 0): Attempting to kill Python Worker
                                                                                

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

In [4]:
query = """SELECT * FROM dept"""
dept = pd.read_sql(query, url)
dept = spark.createDataFrame(dept)
dept.show(4)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 4 rows



In [5]:
query = """SELECT * FROM source"""
source = pd.read_sql(query, url)
source = spark.createDataFrame(source)
source.show(4)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
|    1|   103582|     Carmen Cura|
|    2|   106463| Richard Sanchez|
|    3|   119403|  Betty De Hoyos|
+-----+---------+----------------+
only showing top 4 rows



### 2. Let's see how writing to the local disk works in spark:

 >• Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json     
 >• Inspect your folder structure. What do you notice?    

#### write file to json

In [6]:
cases.write.json('cases_json', mode='overwrite')

23/07/05 14:56:58 WARN TaskSetManager: Stage 4 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [7]:
dept.write.json('dept_json', mode='overwrite')

In [8]:
source.write.json('source_json', mode='overwrite')

#### write dataframe to csv

In [9]:
# Or...
(
    cases.write.format('csv')
    .mode('overwrite')
    .option('header', True)
    .save('cases_csv')
)

23/07/05 14:57:06 WARN TaskSetManager: Stage 7 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [10]:
# Or...
(
    dept.write.format('csv')
    .mode('overwrite')
    .option('header', True)
    .save('dept_csv')
)

In [11]:
# Or...
(
    source.write.format('csv')
    .mode('overwrite')
    .option('header', True)
    .save('source_csv')
)

In [113]:
os.listdir('source_json')

['.part-00011-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 '.part-00008-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 'part-00001-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 '.part-00001-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 'part-00011-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 'part-00009-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 'part-00004-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 '.part-00002-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 '._SUCCESS.crc',
 'part-00006-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 '.part-00005-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 'part-00003-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 '.part-00006-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 'part-00002-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json',
 '.part-00010-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 '.part-00009-15f6914e-5a8e-45d8-8639-f87de4531a31-c000.json.crc',
 '.part-0000

In [114]:
os.listdir('source_csv')

['part-00002-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 'part-00003-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '.part-00000-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00000-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '._SUCCESS.crc',
 '.part-00001-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00001-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '.part-00003-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 '.part-00002-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00011-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '.part-00005-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 '.part-00004-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00006-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '.part-00006-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00010-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv',
 '.part-00007-6506bf7b-6b9f-436d-b659-9d0c2204209d-c000.csv.crc',
 'part-00007-6506bf7b-6b9f-

### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

#### CASES

In [115]:
cases.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string'),
 ('num_weeks_late', 'double'),
 ('zipcode', 'string'),
 ('case_age', 'int'),
 ('days_to_closed', 'int'),
 ('case_lifetime', 'int'),
 ('num_hours_late', 'double'),
 ('department', 'string'),
 ('dept_subject_to_SLA', 'boolean')]

In [116]:
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- days_to_closed: integer (nullable = true)
 |-- case_lifetime: integer (nullable = true)
 |-- num_hours_late: double (nullable = true)
 |-- department: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)



In [12]:
cases.show(2, vertical=True)

23/07/05 14:57:10 WARN TaskSetManager: Stage 10 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
[Stage 10:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616         
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

23/07/05 14:57:14 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 10 (TID 76): Attempting to kill Python Worker
                                                                                

In [13]:
cases.summary()

DataFrame[summary: string, case_id: string, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: string, case_closed: string, dept_division: string, service_request_type: string, SLA_days: string, case_status: string, source_id: string, request_address: string, council_district: string]

In [14]:
cols = ["council_district", "SLA_days", "num_days_late"]

In [15]:
for col_name in cols:
    cases = cases.withColumn(col_name, col(col_name).cast('float'))

In [17]:
cases.show(2, vertical=True)

23/07/05 14:57:15 WARN TaskSetManager: Stage 11 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
[Stage 11:>                                                         (0 + 1) / 1]

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5088            
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5.0                  
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126042

23/07/05 14:57:19 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 11 (TID 77): Attempting to kill Python Worker
                                                                                

#### DEPT

In [18]:
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [19]:
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [20]:
dept = dept.withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA').cast('boolean'))

In [21]:
dept

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: boolean]

#### SOURCE

In [22]:
source.show(2, vertical=True)

-RECORD 0---------------------------
 index           | 0                
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 index           | 1                
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [23]:
source

DataFrame[index: bigint, source_id: string, source_username: string]

In [49]:
cases = spark.read.csv("case.csv", header=True, inferSchema=True)
cases.show(2, vertical=True, truncate=False) #Default: True = truncate strings longer than 20 chars

                                                                                

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
-RECORD 1----------------------------------------------------
 case_id

We will now cover various pieces of data preparation we wish to do and how to do those with spark.

## Rename Columns

We'll rename this column to match with the other date-type columns.

In [50]:
cases = cases.withColumnRenamed('SLA_due_date', 'case_due_date')
cases.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

<div class="alert alert-info" role="alert">

Two columns, `case_closed` and `case_late` store yes/no values. Currently spark thinks they are strings
- Let's turn them into booleans:

</div>

In [51]:
# demonstrating we only have yes/no in each field
cases.groupBy('case_closed', 'case_late').count().show()

[Stage 30:>                                                       (0 + 12) / 12]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



                                                                                

In [52]:
#Let's create two new boolean columns
cases = cases.withColumn('case_closed', expr('case_closed == "YES"')).withColumn(
        'case_late', expr('case_late == "YES"'))

#Select just the two columns
cases.select('case_closed', 'case_late').show()

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
+-----------+---------+
only showing top 20 rows



In [53]:
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- case_due_date: string (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [55]:
cases.groupBy('council_district').count().show()

[Stage 34:>                                                       (0 + 12) / 12]

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



                                                                                

In [57]:
cases = cases.withColumn('council_district', col('council_district').cast('string'))

<div class="alert alert-info" role="alert">

Now we'll use spark's  `to_timestamp` function to handle the 3 columns that have dates in them. 
    
In order to work properly, we'll need to provide the date format when using `to_timestamp`. 
The date format is a little different than the date functionality we've worked with in pandas, this is because it is using [Java's `SimpleDateFormat`][1].

[1]: https://docs.oracle.com/javase/7/docs/api/java/text/SimpleDateFormat.html

</div>

In [58]:
#Before
cases.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [60]:
#After

fmt = "M/d/yy H:mm"

cases = (
    cases.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))
)

cases.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



<div class="alert alert-info" role="alert">

We'll begin by normalizing the request address field. 

Using the `trim` and `lower` functions let's strip any leading or trailing whitespace and convert everything to lowercase.

</div>

In [61]:
#Before
cases.select("request_address").show(5, 50)

+-------------------------------------+
|                      request_address|
+-------------------------------------+
| 2315  EL PASO ST, San Antonio, 78207|
|  2215  GOLIAD RD, San Antonio, 78223|
|102  PALFREY ST W, San Antonio, 78223|
| 114  LA GARDE ST, San Antonio, 78223|
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [62]:
#After
cases = cases.withColumn('request_address', trim(lower(cases.request_address)))

cases.select("request_address").show(5, 50)

+-------------------------------------+
|                      request_address|
+-------------------------------------+
| 2315  el paso st, san antonio, 78207|
|  2215  goliad rd, san antonio, 78223|
|102  palfrey st w, san antonio, 78223|
| 114  la garde st, san antonio, 78223|
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



<div class="alert alert-info" role="alert">

Here we will convert the number of days a case is late to a number of weeks.

</div>

In [63]:
cases = cases.withColumn('num_weeks_late', expr('num_days_late/7'))

cases.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



<div class="alert alert-info" role="alert">

Lastly, we can format the `council_district` column a little differently. 
- We'll add leading 0s to it:

</div>

In [64]:
# df = df.withColumn('council_distric', col('council_district').cast('int'))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
cases = cases.withColumn('council_district',
                  format_string('%03d', col('council_district').cast('int')))

cases.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



## New Features

Let's now create some new features based on our existing data.

We will first extract the zipcode from the address:

In [65]:
cases.select("request_address").show(2, 50)

+------------------------------------+
|                     request_address|
+------------------------------------+
|2315  el paso st, san antonio, 78207|
| 2215  goliad rd, san antonio, 78223|
+------------------------------------+
only showing top 2 rows



In [66]:
# col: request_address
# regex pattern
# inx:0, as default b'c we are requesting the last few digits "$"
# df.select(regexp_extract('request_address', r"\d+$", 0)).show(2)

In [67]:
# \d+: greedy digit; $: end of line
cases = cases.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

cases.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [68]:
cases = (
# calculates the number of days between the current timestamp & case_opened_date
    cases.withColumn(
        'case_age', datediff(current_timestamp(), 'case_opened_date')
    )
# Calculates the number of days between the case_closed_date & case_opened_date
    .withColumn(
        'days_to_closed', datediff('case_closed_date', 'case_opened_date')
    )
# since case_closed is boolean, we can check if it's T/F;
#If the case is not closed, it takes the value from the "case_age" column
#Otherwise, it takes the value from the "days_to_closed" column    
    .withColumn(
        'case_lifetime',
            when(expr("! case_closed"), col("case_age")).otherwise(col('days_to_closed'))
    )
)

In [69]:
cases.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr('case_closed')).show(5)

cases.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 12:29:00|    2011|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-03 08:11:00|    2011|             2|            2|
|       true|2018-01-01 00:48:00|2018-01-02 07:57:00|    2011|             1|            1|
|       true|2018-01-01 01:29:00|2018-01-02 08:13:00|    2011|             1|            1|
|       true|2018-01-01 01:34:00|2018-01-01 13:29:00|    2011|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+----------------+--------+--------------+-------------+
|case_closed|   case_opened_date|case_closed_date|case_age

### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [71]:
days_open = cases.filter(col('case_closed') == "NO")

In [117]:
days_open.select(
    "case_id",
    "case_status",
    "num_days_late",
    "case_lifetime",
    ).where(expr("case_closed=='NO'")).sort(desc('num_days_late')).show()

                                                                                

+----------+-----------+------------------+-------------+
|   case_id|case_status|     num_days_late|case_lifetime|
+----------+-----------+------------------+-------------+
|1013225646|       Open|       348.6458333|         2376|
|1013225651|       Open|       348.6458333|         2376|
|1013226813|       Open|348.52356480000003|         2375|
|1013229328|       Open|347.58256939999995|         2374|
|1013236238|       Open|       345.3894213|         2372|
|1013257313|       Open|       341.6458333|         2363|
|1013263418|       Open|       341.4006481|         2360|
|1013245251|       Open|       340.6083333|         2367|
|1013247888|       Open|       339.6458333|         2367|
|1013249877|       Open|       339.3817361|         2366|
|1013256068|       Open|335.38761569999997|         2364|
|1013257022|       Open|       334.6458333|         2363|
|1013260804|       Open|       334.6458333|         2361|
|1013261951|       Open|        334.570544|         2360|
|1013289892|  

In [80]:
days_since_opened = cases.filter(col('case_closed') == "NO")

In [83]:
days_since_opened.select(
    "case_id",
    "case_status",
    "case_opened_date",
    "case_age",
    ).where(expr("case_closed=='NO'")).sort(asc('case_opened_date')).show()

[Stage 56:>                                                       (0 + 12) / 12]

+----------+-----------+-------------------+--------+
|   case_id|case_status|   case_opened_date|case_age|
+----------+-----------+-------------------+--------+
|1013225646|       Open|2017-01-01 13:48:00|    2376|
|1013225651|       Open|2017-01-01 13:57:00|    2376|
|1013226813|       Open|2017-01-02 11:26:00|    2375|
|1013229328|       Open|2017-01-03 10:01:00|    2374|
|1013232133|       Open|2017-01-04 09:42:00|    2373|
|1013232331|       Open|2017-01-04 10:16:00|    2373|
|1013232521|       Open|2017-01-04 10:56:00|    2373|
|1013232883|       Open|2017-01-04 12:19:00|    2373|
|1013233425|       Open|2017-01-04 14:45:00|    2373|
|1013233467|       Open|2017-01-04 14:52:00|    2373|
|1013233485|       Open|2017-01-04 14:55:00|    2373|
|1013233497|       Open|2017-01-04 14:58:00|    2373|
|1013233516|       Open|2017-01-04 15:02:00|    2373|
|1013234263|       Open|2017-01-04 18:49:00|    2373|
|1013236238|       Open|2017-01-05 14:39:00|    2372|
|1013236365|       Open|2017

                                                                                

### 2. How many Stray Animal cases are there?

In [90]:
cases.filter(col('service_request_type')=='Stray Animal').count()

                                                                                

26760

### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [93]:
cases.filter(col('dept_division')=='Field Operations')\
    .filter(col('service_request_type')!='Officer Standby').count()

                                                                                

113902

### 4. Convert the council_district column to a string column.

In [95]:
cases = cases.withColumn('council_district', col('council_district').cast('string'))

In [97]:
cases.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- num_weeks_late: double (nullable = true)
 |-- zipcode: string (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- days_to_closed: integer (nullable = true)
 |-- case_lifetime: integer (nullable = true)



### 5. Extract the year from the case_closed_date column.

In [100]:
cases.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 2011                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333

In [105]:
cases.withColumn('case_closed_date', year("case_closed_date"))\
.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018                 
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 2011                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333

### 6. Convert num_days_late from days to hours in new columns num_hours_late.

In [119]:
cases = cases.withColumn('num_hours_late', round(expr('num_days_late*24'),1))

cases.select("num_days_late", "num_hours_late").show(5)

                                                                                

+-------------------+--------------+
|      num_days_late|num_hours_late|
+-------------------+--------------+
| -998.5087616000001|      -23964.2|
|0.37216435200000003|           8.9|
|       -29.74398148|        -713.9|
|-2.0126041669999997|         -48.3|
|       -3.022337963|         -72.5|
+-------------------+--------------+
only showing top 5 rows



### 7. Join the case data with the source and department data.

In [None]:
cases = cases.join(source, 'source_id', 'left').drop(source)

In [110]:
cases = (
    cases
    # left join on dept_division
    .join(dept, 'dept_division', 'left')
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(cases.dept_division)
    .withColumnRenamed('standardized_dept_name', 'department')
    # convert last col to a boolean
    .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA')=="YES")
)

cases.show(2, vertical=True)

                                                                                

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 005                  
 num_weeks_late       | -142.6441088         
 zipcode              | 78207                
 case_age             | 2011                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 num_hours_late       | -23964.2102784       
 department           | Animal Care Services 
 dept_subject_to_SLA  | true      

### 8. Are there any cases that do not have a request source?

In [127]:
cases.filter(cases.source_id == 'null').count()

                                                                                

0

### 9. What are the top 10 service request types in terms of number of requests?

In [121]:
cases.groupBy('service_request_type').count().sort(desc('count')).show(10, 50)

                                                                                

+--------------------------------+-----+
|            service_request_type|count|
+--------------------------------+-----+
|                       No Pickup|86855|
|            Overgrown Yard/Trash|65895|
|                    Bandit Signs|32910|
|                    Damaged Cart|30338|
|      Front Or Side Yard Parking|28794|
|                    Stray Animal|26760|
| Aggressive Animal(Non-Critical)|24882|
|           Cart Exchange Request|22024|
|Junk Vehicle On Private Property|21473|
|                 Pot Hole Repair|20616|
+--------------------------------+-----+
only showing top 10 rows



### 10. What are the top 10 service request types in terms of average days late?

In [123]:
cases.groupBy('service_request_type').agg(round(mean('num_days_late'), 1).alias('avg_late')).sort(desc('avg_late')).show(10, 50)

                                                                                

+--------------------------------------+--------+
|                  service_request_type|avg_late|
+--------------------------------------+--------+
|                    Zoning: Junk Yards|   176.0|
|            Labeling for Used Mattress|   162.4|
|     Record Keeping of Used Mattresses|   154.0|
|Signage Requied for Sale of Used Mattr|   151.6|
|              Storage of Used Mattress|   142.1|
|                  Zoning: Recycle Yard|   135.9|
|        Donation Container Enforcement|   131.8|
|   License Requied Used Mattress Sales|   128.8|
|               Traffic Signal Graffiti|    77.9|
|                             Complaint|    72.5|
+--------------------------------------+--------+
only showing top 10 rows



### 11. Does number of days late depend on department?

In [124]:
cases.groupby('department').agg(round(mean('num_days_late'), 1).alias('avg_late_again')).sort(desc('avg_late_again')).show(10, 50)

                                                                                

+------------------------+--------------+
|              department|avg_late_again|
+------------------------+--------------+
|        Customer Service|          59.5|
|             Solid Waste|          -2.2|
|            Metro Health|          -4.9|
|      Parks & Recreation|          -5.3|
|Trans & Cap Improvements|         -20.5|
|    DSD/Code Enforcement|         -38.3|
|    Animal Care Services|        -226.2|
|            City Council|          null|
+------------------------+--------------+



### 12. How do number of days late depend on department and request type?

In [126]:
cases.groupby('department', 'service_request_type').agg(round(avg('num_days_late'), 1).alias('avg_days_late')).sort(desc('department')).show(10, 50)

                                                                                

+------------------------+----------------------------------------------+-------------+
|              department|                          service_request_type|avg_days_late|
+------------------------+----------------------------------------------+-------------+
|Trans & Cap Improvements|                Accident Problem Investigation|        -23.0|
|Trans & Cap Improvements|           Herbicide - Grass in curb or street|        -12.4|
|Trans & Cap Improvements|                    Signal Timing Modification|        -80.2|
|Trans & Cap Improvements|                              Speed Limit Sign|        -12.1|
|Trans & Cap Improvements|                      Tree- Visual Obstruction|         -2.6|
|Trans & Cap Improvements|                  Left-Turn Signal New Request|        -54.2|
|Trans & Cap Improvements|         Street/Drainage Problem Investigation|        -36.9|
|Trans & Cap Improvements|                               Street Sweeping|         -5.5|
|Trans & Cap Improvements|Traffi