In [None]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

import re

In [2]:
spark = SparkSession.builder.getOrCreate()

In [3]:
spark

1. Read the case, department, and source data into their own spark dataframes.

In [123]:
df = spark.read.csv('data/case.csv', header=True)

In [5]:
dept = spark.read.csv('data/dept.csv', header=True)

In [6]:
source = spark.read.csv('data/source.csv', header=True)

2. Let's see how writing to the local disk works in spark:


- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?
    > """not saved with file extentions?"""

In [7]:
source.write.csv('data/sources_csv', mode = 'overwrite')

In [8]:
source.write.json('data/sources_json', mode = 'overwrite')

3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [9]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [10]:
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [11]:
df.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: string (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: string (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



In [15]:
df.show(1, vertical= True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [146]:
# handle date times

fmt = "M/d/yy H:mm"

df = (
    df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('SLA_due_date' , to_timestamp('SLA_due_date', fmt))
)


In [147]:
# handle booleans
df = (
df.withColumn('case_late', df.case_late == "YES")
    .withColumn('case_closed', df.case_closed == "YES")
)

In [148]:
# handle numberical values

df = (
 df.withColumn('num_days_late', df.num_days_late.cast("float"))
    .withColumn('SLA_days', df.SLA_days.cast("int"))
)

In [32]:
df.printSchema()

root
 |-- case_id: string (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: float (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: integer (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



1.  How old is the latest (in terms of days past SLA) currently open issue?

In [74]:
(
    df.filter(df.case_closed == 'false')
    .withColumn("days_past_SLA", datediff(df.SLA_due_date, df.case_opened_date))
    .select(col("days_past_SLA"))
    .sort(desc('days_past_SLA'))
).show(1)

+-------------+
|days_past_SLA|
+-------------+
|         1419|
+-------------+
only showing top 1 row



1b.  How long has the oldest (in terms of days since opened) currently opened issue been open?

In [73]:
(
    df.filter(df.case_closed == "false")
    .select(df.num_days_late)
    .sort(desc('num_days_late'))
    .show(1)
)

+-------------+
|num_days_late|
+-------------+
|    348.64584|
+-------------+
only showing top 1 row



2. How many Stray Animal cases are there?

In [81]:
(
    df.filter(col('service_request_type') == "Stray Animal")
    
).count()
    

26760

3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [87]:
(
df.filter(df.dept_division == 'Field Operations')
    .filter(df.service_request_type != 'Officer Standby')
).count()

113902

4. Convert the council_district column to a string column.

In [97]:
# never turned into int, for obvious reasons, showed I can cast as int
(
    df.withColumn('council_district', df.council_district.cast('int') * 2)
)

DataFrame[case_id: string, case_opened_date: timestamp, case_closed_date: timestamp, SLA_due_date: timestamp, case_late: boolean, num_days_late: float, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: int, case_status: string, source_id: string, request_address: string, council_district: int]

5. Extract the year from the case_closed_date column.

In [104]:
df = df.withColumn('year_closed', year(df.case_closed_date))
df.select('year_closed').show()

+-----------+
|year_closed|
+-----------+
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
|       2018|
+-----------+
only showing top 20 rows



6. Convert num_days_late from days to hours in new columns num_hours_late.

In [108]:
(
    df.withColumn('num_hours_late', df.num_days_late*24)
    .select('num_hours_late')
).show()

+--------------+
|num_hours_late|
+--------------+
|     -23964.21|
|      -48.3025|
|     -72.53611|
|    -360.27554|
|      8.931944|
|     -713.8556|
|    -352.96167|
|     -352.9589|
|     -352.9589|
|    -352.95584|
|    -352.95584|
|    -352.95276|
|       -352.95|
|    -352.95276|
|    -352.94974|
|     -352.9414|
|    -352.94415|
|    -352.93832|
|    -352.93832|
|    -352.93555|
+--------------+
only showing top 20 rows



7. Join the case data with the source and department data.

In [124]:
df = (
    df.join(dept, "dept_division", "left")
    .join(source, "source_id", "left" )
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .drop(dept.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    .withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA') == "YES")
)

8. Are there any cases that do not have a request source?

In [135]:
(
    df.select(df.source_id.isNull()
    .cast('int')
    .alias('is_null'))
    .agg(sum('is_null'))
).show()

+------------+
|sum(is_null)|
+------------+
|           0|
+------------+



9. What are the top 10 service request types in terms of number of requests?

In [141]:
(
    df.select('service_request_type')
    .groupby(df.service_request_type)
    .count()
    .sort(desc('count'))
).show(10)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



10. What are the top 10 service request types in terms of average days late?

In [156]:
(
    df.where('case_late')
    .select('service_request_type', 'num_days_late')
    .groupBy('service_request_type')
    .agg(mean('num_days_late').alias('avg_days_late'))
    .sort(desc('avg_days_late'))
).show(10)

+--------------------+------------------+
|service_request_type|     avg_days_late|
+--------------------+------------------+
|Zoning: Recycle Yard|210.89202097690466|
|  Zoning: Junk Yards|200.20517606225633|
|Structure/Housing...|190.20707663367776|
|Donation Containe...|171.09115328554248|
|Storage of Used M...|163.96812765938895|
|Labeling for Used...|162.43032836914062|
|Record Keeping of...| 153.9972414289202|
|Signage Requied f...|151.63867886861166|
|Traffic Signal Gr...| 137.6458282470703|
|License Requied U...|128.79828480311804|
+--------------------+------------------+
only showing top 10 rows



In [149]:
df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- case_id: string (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: float (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: integer (nullable = true)
 |-- case_status: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)
 |-- department: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)
 |-- source_username: string (nullable = true)



11. Does number of days late depend on department?

In [190]:
(
df.where('case_late')
    .groupBy('department')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('num_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
).show(truncate=False)

+------------------------+---------+--------------+
|department              |days_late|num_cases_late|
+------------------------+---------+--------------+
|Metro Health            |6.5      |854           |
|Solid Waste             |7.1      |33729         |
|Trans & Cap Improvements|10.7     |5529          |
|Parks & Recreation      |22.4     |3810          |
|Animal Care Services    |23.4     |23751         |
|DSD/Code Enforcement    |49.5     |26439         |
|Customer Service        |88.2     |2035          |
+------------------------+---------+--------------+



12. How do number of days late depend on department and request type

In [197]:
(
    df.filter('case_closed')
    .where('case_late')
    .groupBy('department', 'service_request_type')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('num_cases_late'))
    .sort(desc('days_late'))
    .withColumn('days_late', round(col('days_late'), 1))
).show(40, truncate=False)

+--------------------+--------------------------------------------+---------+--------------+
|department          |service_request_type                        |days_late|num_cases_late|
+--------------------+--------------------------------------------+---------+--------------+
|DSD/Code Enforcement|Zoning: Recycle Yard                        |273.6    |75            |
|DSD/Code Enforcement|Zoning: Junk Yards                          |251.9    |146           |
|DSD/Code Enforcement|Donation Container Enforcement              |201.7    |82            |
|DSD/Code Enforcement|Structure/Housing Maintenance               |182.4    |30            |
|DSD/Code Enforcement|Graffiti: Private Property (Corridors)      |175.1    |3             |
|DSD/Code Enforcement|Storage of Used Mattress                    |164.0    |7             |
|DSD/Code Enforcement|Labeling for Used Mattress                  |162.4    |7             |
|DSD/Code Enforcement|Record Keeping of Used Mattresses           |154