In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [2]:
df = spark.read.csv("case.csv", header=True, inferSchema=True)
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [50]:
source = spark.read.csv("source.csv", header=True, inferSchema=True)
source.show(4)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
+---------+----------------+
only showing top 4 rows



In [4]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



Let's see how writing to the local disk works in spark:

Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json\
Inspect your folder structure. What do you notice?

In [5]:
source.write.json("source_json", mode="overwrite")
source.write.csv("source_csv", mode="overwrite")

Two folders were created each with a csv/json file and a 'success' file

Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [6]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [7]:
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [8]:
df = (
    df.withColumn('case_late', col('case_late') == 'YES').\
    withColumn('case_closed', col('case_closed') == 'YES')
)

In [9]:
df.show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [10]:
df = (
    df.withColumn('case_opened_date', to_timestamp(('case_opened_date'), 'M/d/yy H:mm')).\
withColumn('case_closed_date', to_timestamp(('case_closed_date'), 'M/d/yy H:mm')).\
withColumn('case_due_date', to_timestamp(('case_due_date'), 'M/d/yy H:mm'))
)

In [11]:
dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



In [12]:
dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [13]:
#convert column dept_subject_to_SLA to bool
dept = dept.withColumn('dept_subject_to_SLA', col('dept_subject_to_SLA')== 'YES')

In [14]:
source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

#### How old is the latest (in terms of days past SLA) currently open issue? 

In [121]:
(
    df.withColumn('case_age', datediff(current_timestamp(), 'case_due_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_due_date','case_age', 'days_to_closed', 'case_lifetime')
    .sort(col('case_age').desc()).show(1)
)

+-------------------+----------------+-------------------+--------+--------------+-------------+
|   case_opened_date|case_closed_date|      case_due_date|case_age|days_to_closed|case_lifetime|
+-------------------+----------------+-------------------+--------+--------------+-------------+
|2017-01-01 13:48:00|            null|2017-01-17 08:30:00|    1239|          null|         1239|
+-------------------+----------------+-------------------+--------+--------------+-------------+
only showing top 1 row



#### How long has the oldest (in terms of days since opened) currently opened issue been open?

In [16]:
(
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_due_date','case_age', 'days_to_closed', 'case_lifetime')
    .sort(col('case_age').desc()).show(2)
)

+-------------------+----------------+-------------------+--------+--------------+-------------+
|   case_opened_date|case_closed_date|      case_due_date|case_age|days_to_closed|case_lifetime|
+-------------------+----------------+-------------------+--------+--------------+-------------+
|2017-01-01 13:57:00|            null|2017-01-17 08:30:00|    1255|          null|         1255|
|2017-01-01 13:48:00|            null|2017-01-17 08:30:00|    1255|          null|         1255|
+-------------------+----------------+-------------------+--------+--------------+-------------+
only showing top 2 rows



#### How many Stray Animal cases are there?

In [17]:
(df.filter(df.service_request_type == 'Stray Animal')).count()

26760

#### How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [18]:
(df.filter(df.dept_division == "Field Operations").filter(df.service_request_type != "Officer Standby")).count()

113902

#### Convert the council_district column to a string column.

In [19]:
df = df.withColumn('council_district', format_string('%04d', col('council_district')))

In [21]:
df.show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
only showing top 1 row



#### Extract the year from the case_closed_date column.

In [31]:
df.select( year('case_closed_date')).show(1, vertical = True)

-RECORD 0----------------------
 year(case_closed_date) | 2018 
only showing top 1 row



#### Convert num_days_late from days to hours in new columns num_hours_late.

In [35]:
df.withColumn('num_days_late', col('num_days_late')*24).show(1, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -23964.2102784       
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
only showing top 1 row



#### Join the case data with the source and department data.

In [44]:
df = df.join(dept, "dept_division", "left").drop(dept.dept_division)\
.drop(dept.dept_name)\
.drop(df.dept_division).withColumnRenamed("standardized_dept_name", "department")

In [51]:
df = df.join(source, "source_id", "left")

#### Are there any cases that do not have a request source?

In [61]:
df.groupBy('source_id').agg(count('source_id')).show(150)

+------------+----------------+
|   source_id|count(source_id)|
+------------+----------------+
|      136202|            3640|
|     MW16328|           11010|
|      141239|            7407|
|     df03076|              17|
|      141549|            3125|
|     mp21218|           15817|
|     sp26368|             124|
|      142989|            2827|
|     sg26196|               2|
|     ec25702|               1|
|     bo26471|              11|
|     js26451|               3|
|     sg22264|            1841|
|     js12254|              99|
|     ss26317|               5|
|     bn26322|              89|
|     BA10591|              12|
|     jg06389|              42|
|      140509|            1705|
|     MR25792|               1|
|     ns16326|           17234|
|     MB16118|             152|
|     ls26247|              17|
|     rs16746|           11111|
|      143900|             296|
|      141256|             327|
|     ag19640|               2|
|     cc17850|            3909|
|     RG

Seems like that all the cases have a request source

#### What are the top 10 service request types in terms of number of requests?

In [69]:
df.groupBy('service_request_type').agg(count('service_request_type')).sort(col('count(service_request_type)').desc()).show(10)

+--------------------+---------------------------+
|service_request_type|count(service_request_type)|
+--------------------+---------------------------+
|           No Pickup|                      89210|
|Overgrown Yard/Trash|                      66403|
|        Bandit Signs|                      32968|
|        Damaged Cart|                      31163|
|Front Or Side Yar...|                      28920|
|        Stray Animal|                      27361|
|Aggressive Animal...|                      25492|
|Cart Exchange Req...|                      22608|
|Junk Vehicle On P...|                      21649|
|     Pot Hole Repair|                      20827|
+--------------------+---------------------------+
only showing top 10 rows



#### What are the top 10 service request types in terms of average days late?

In [79]:
df.filter(df.num_days_late > 0).groupby('service_request_type').agg(mean('num_days_late').alias('count')).sort(col('count').desc()).show(10)

+--------------------+------------------+
|service_request_type|             count|
+--------------------+------------------+
|Zoning: Recycle Yard|210.89201994318182|
|  Zoning: Junk Yards|200.20517608494276|
|Structure/Housing...|190.20707698509804|
|Donation Containe...|171.09115313942618|
|Storage of Used M...|163.96812829714287|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...|153.99724039428568|
|Signage Requied f...|151.63868055333333|
|Traffic Signal Gr...|137.64583330000002|
|License Requied U...|128.79828704142858|
+--------------------+------------------+
only showing top 10 rows



#### Does number of days late depend on department?

In [80]:
df.filter(df.num_days_late > 0).groupby('department').agg(mean('num_days_late').alias('count')).sort(col('count').desc()).show(10)


+--------------------+------------------+
|          department|             count|
+--------------------+------------------+
|    Customer Service| 88.18248182589824|
|DSD/Code Enforcement| 49.50633998635033|
|Animal Care Services| 23.44672963473822|
|  Parks & Recreation|22.427807192724128|
|Trans & Cap Impro...| 10.66295045507867|
|         Solid Waste| 7.147172789557422|
|        Metro Health| 6.494699602827868|
+--------------------+------------------+



#### How do number of days late depend on department and request type?

In [82]:
df.filter(df.num_days_late > 0).groupby('service_request_type', 'department').agg(mean('num_days_late').alias('count')).sort(col('count').desc()).show(10)

+--------------------+--------------------+------------------+
|service_request_type|          department|             count|
+--------------------+--------------------+------------------+
|Zoning: Recycle Yard|DSD/Code Enforcement|210.89201994318182|
|  Zoning: Junk Yards|DSD/Code Enforcement|200.20517608494276|
|Structure/Housing...|DSD/Code Enforcement|190.20707698509804|
|Donation Containe...|DSD/Code Enforcement|171.09115313942618|
|Storage of Used M...|DSD/Code Enforcement|163.96812829714287|
|Labeling for Used...|DSD/Code Enforcement|162.43032902285717|
|Record Keeping of...|DSD/Code Enforcement|153.99724039428568|
|Signage Requied f...|DSD/Code Enforcement|151.63868055333333|
|Traffic Signal Gr...|Trans & Cap Impro...|137.64583330000002|
|License Requied U...|DSD/Code Enforcement|128.79828704142858|
+--------------------+--------------------+------------------+
only showing top 10 rows



In [131]:
latest_date = df.select(max(df.case_closed_date)).alias('latest_date').first()[0]

In [132]:
latest_date = latest_date.strftime('%Y-%m-%d %H:%M:%S')

In [133]:
(
    df.withColumn('case_age', datediff(lit('latest_date'), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_due_date', 'case_lifetime')
    .sort(col('case_age').desc()).show(2)
)

+-------------------+----------------+-------------------+-------------+
|   case_opened_date|case_closed_date|      case_due_date|case_lifetime|
+-------------------+----------------+-------------------+-------------+
|2017-08-11 09:24:00|            null|2017-12-18 09:24:00|         null|
|2017-08-11 10:01:00|            null|2017-12-18 10:01:00|         null|
+-------------------+----------------+-------------------+-------------+
only showing top 2 rows

