In [1]:
import pandas as pd
import numpy as np
import pyspark
import pyspark.sql.functions as F
 
from pyspark.sql import SparkSession
from pyspark.sql.functions import *   
from pyspark.sql.types import StructType, StructField, StringType


In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()
spark

In [3]:
df1 = spark.read.csv('case.csv', header = True, inferSchema = True)
df1.show(vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [4]:
df2 = spark.read.csv('dept.csv', header = True, inferSchema = True)
df2.show(vertical = True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
-RECORD 2--------------------------------------
 dept_division          | Clean and Green      
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 3--------------------------------------
 dept_division          | Clean and Green N... 
 dept_name              | Parks and Recreation 
 standardized_dept_name | Parks & Recreation   
 dept_subject_to_SLA    | YES                  
-RECORD 4-------------------------------

In [5]:
df3 = spark.read.csv('source.csv', header = True, inferSchema = True)
df3.show(vertical = True)

-RECORD 0-------------------------------
 source_id       | 100137               
 source_username | Merlene Blodgett     
-RECORD 1-------------------------------
 source_id       | 103582               
 source_username | Carmen Cura          
-RECORD 2-------------------------------
 source_id       | 106463               
 source_username | Richard Sanchez      
-RECORD 3-------------------------------
 source_id       | 119403               
 source_username | Betty De Hoyos       
-RECORD 4-------------------------------
 source_id       | 119555               
 source_username | Socorro Quiara       
-RECORD 5-------------------------------
 source_id       | 119868               
 source_username | Michelle San Miguel  
-RECORD 6-------------------------------
 source_id       | 120752               
 source_username | Eva T. Kleiber       
-RECORD 7-------------------------------
 source_id       | 124405               
 source_username | Lori Lara            
-RECORD 8-------

In [6]:
df3.write.json("sources_json", mode="overwrite")

In [7]:
df3.write.csv('sources_csv', mode = 'overwrite')

In [8]:
df3.describe().show()

+-------+------------------+---------------+
|summary|         source_id|source_username|
+-------+------------------+---------------+
|  count|               140|            140|
|   mean|136973.92727272728|           null|
| stddev| 10275.63643312297|           null|
|    min|            100137|  Alex Franklin|
|    max|           yh24110|       svcCRMSS|
+-------+------------------+---------------+



In [9]:
df3.show()

+---------+--------------------+
|source_id|     source_username|
+---------+--------------------+
|   100137|    Merlene Blodgett|
|   103582|         Carmen Cura|
|   106463|     Richard Sanchez|
|   119403|      Betty De Hoyos|
|   119555|      Socorro Quiara|
|   119868| Michelle San Miguel|
|   120752|      Eva T. Kleiber|
|   124405|           Lori Lara|
|   132408|       Leonard Silva|
|   135723|        Amy Cardenas|
|   136202|    Michelle Urrutia|
|   136979|      Leticia Garcia|
|   137943|    Pamela K. Baccus|
|   138605|        Marisa Ozuna|
|   138650|      Kimberly Green|
|   138650|Kimberly Green-Woods|
|   138793| Guadalupe Rodriguez|
|   138810|       Tawona Martin|
|   139342|     Jessica Mendoza|
|   139344|        Isis Mendoza|
+---------+--------------------+
only showing top 20 rows



In [10]:
df2.describe()

DataFrame[summary: string, dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [11]:
df2.describe().show()

+-------+----------------+--------------------+----------------------+-------------------+
|summary|   dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+-------+----------------+--------------------+----------------------+-------------------+
|  count|              39|                  38|                    39|                 39|
|   mean|            null|                null|                  null|               null|
| stddev|            null|                null|                  null|               null|
|    min| 311 Call Center|Animal Care Services|  Animal Care Services|                 NO|
|    max|Waste Collection|Trans & Cap Impro...|  Trans & Cap Impro...|                YES|
+-------+----------------+--------------------+----------------------+-------------------+



In [12]:
df1.describe()

DataFrame[summary: string, case_id: string, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: string, case_closed: string, dept_division: string, service_request_type: string, SLA_days: string, case_status: string, source_id: string, request_address: string, council_district: string]

In [13]:
df1.describe().show()

+-------+--------------------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+------------------+--------------------+-----------------+
|summary|             case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|         source_id|     request_address| council_district|
+-------+--------------------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+------------------+--------------------+-----------------+
|  count|              841704|          841704|          823594|      841671|   841704|             841671|     841704|          841704|              841704|            841671|     841704|            841704|              841704|           841704|
|   mean|1.0

#### Clean up data

In [14]:
df1 = df1.withColumnRenamed("SLA_due_date", "case_due_date")

In [15]:
df1.groupBy("case_late", "case_closed").count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [16]:
df1 = df1.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

df1.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [17]:
fmt = "M/d/yy H:mm"
df1 = (
    df1.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_due_date", fmt)))

In [18]:
df1.select("case_opened_date", "case_closed_date", "case_due_date")

DataFrame[case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp]

In [24]:
df1 = df1.withColumn("request_address", trim(lower(df1.request_address)))

df1.select("request_address").show(5)

+--------------------+
|     request_address|
+--------------------+
|2315  el paso st,...|
|2215  goliad rd, ...|
|102  palfrey st w...|
|114  la garde st,...|
|734  clearview dr...|
+--------------------+
only showing top 5 rows



In [33]:
df1 = df1.withColumn('num_days_late', col('num_days_late').cast('int'))

In [35]:
df1 = df1.withColumn('SLA_days', col('SLA_days').cast('int'))

How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [36]:
df1.show(vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998                 
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999                  
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2        

In [48]:
recent = df1.filter(df1.case_status == 'Open').sort(desc('num_days_late')).show()

+----------+-------------------+----------------+-------------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+------------+--------------------+----------------+
|   case_id|   case_opened_date|case_closed_date|      case_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|   source_id|     request_address|council_district|
+----------+-------------------+----------------+-------------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+------------+--------------------+----------------+
|1013226813|2017-01-02 11:26:00|            null|2017-01-17 11:26:00|     true|          348|      false|Code Enforcement|   No Address Posted|      15|       Open|    svcCRMSS|highfield and mil...|               6|
|1013225651|2017-01-01 13:57:00|            null|2017-01-17 08:30:00|     true|          348|      false|Code Enforcement|   No Address 

In [50]:
df1.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'int'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'int'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [54]:
df1.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: integer (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: integer (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



How many Stray Animal cases are there?

In [57]:
df1.filter(df1.service_request_type == 'Stray Animal').count()

26760

How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [58]:
df1.filter(df1.dept_division == 'Field Operations').where(df1.service_request_type != 'Officer Standby').count()

113902

Convert the council_district column to a string column.

In [60]:
df1 = df1.withColumn('council_district', col('council_district').cast('string'))

In [61]:
df1.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: integer (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: integer (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



Extract the year from the case_closed_date column.

In [None]:
df1 = df1.withColumn('case_closed_date', regex_extract())

In [66]:
df1.select('case_closed_date').show()

+-------------------+
|   case_closed_date|
+-------------------+
|2018-01-01 12:29:00|
|2018-01-03 08:11:00|
|2018-01-02 07:57:00|
|2018-01-02 08:13:00|
|2018-01-01 13:29:00|
|2018-01-01 14:38:00|
|2018-01-02 15:32:00|
|2018-01-02 15:32:00|
|2018-01-02 15:32:00|
|2018-01-02 15:32:00|
|2018-01-02 15:32:00|
|2018-01-02 15:32:00|
|2018-01-02 15:33:00|
|2018-01-02 15:32:00|
|2018-01-02 15:33:00|
|2018-01-02 15:33:00|
|2018-01-02 15:33:00|
|2018-01-02 15:33:00|
|2018-01-02 15:33:00|
|2018-01-02 15:33:00|
+-------------------+
only showing top 20 rows



In [69]:
df1.select(year("case_closed_date")).show()

+----------------------+
|year(case_closed_date)|
+----------------------+
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
|                  2018|
+----------------------+
only showing top 20 rows



Convert num_days_late from days to hours in new columns num_hours_late.

In [70]:
df1 = (df1.withColumn('num_hours_late'), expr('num_days_late * 24')).show()

TypeError: withColumn() missing 1 required positional argument: 'col'

In [None]:
df = df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)