In [40]:
import pandas as pd
import numpy as np

import pyspark
import datetime

from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import datediff
from pyspark.sql.functions import current_timestamp

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [47]:
case_df = pd.read_csv('./sa311/case.csv')
case_df.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [48]:
case_df.columns

Index(['case_id', 'case_opened_date', 'case_closed_date', 'SLA_due_date',
       'case_late', 'num_days_late', 'case_closed', 'dept_division',
       'service_request_type', 'SLA_days', 'case_status', 'source_id',
       'request_address', 'council_district'],
      dtype='object')

In [49]:
case_df.isnull().sum()

case_id                     0
case_opened_date            0
case_closed_date        18110
SLA_due_date               33
case_late                   0
num_days_late              33
case_closed                 0
dept_division               0
service_request_type        0
SLA_days                   33
case_status                 0
source_id                   0
request_address             0
council_district            0
dtype: int64

In [50]:
case_df.case_closed_date.fillna('na', inplace=True)
case_df.SLA_due_date.fillna('na', inplace=True)
case_df.num_days_late.fillna(0.0, inplace=True)
case_df.SLA_days.fillna(0.0, inplace=True)

In [55]:
df = spark.createDataFrame(case_df)

In [56]:
df.schema

StructType(List(StructField(case_id,LongType,true),StructField(case_opened_date,StringType,true),StructField(case_closed_date,StringType,true),StructField(SLA_due_date,StringType,true),StructField(case_late,StringType,true),StructField(num_days_late,DoubleType,true),StructField(case_closed,StringType,true),StructField(dept_division,StringType,true),StructField(service_request_type,StringType,true),StructField(SLA_days,DoubleType,true),StructField(case_status,StringType,true),StructField(source_id,StringType,true),StructField(request_address,StringType,true),StructField(council_district,LongType,true)))

In [57]:
(df
 .select(to_timestamp(df.case_opened_date, 'M/d/y H:mm').alias('timestamp'))
 .select(col('timestamp'),
     datediff(current_timestamp(), col('timestamp')).alias('days_since_now')))

DataFrame[timestamp: timestamp, days_since_now: int]

In [60]:
df.withColumn(
 'num_days_late', df.num_days_late.cast('float')
)

DataFrame[case_id: bigint, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: float, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: bigint]

In [62]:
df.schema

StructType(List(StructField(case_id,LongType,true),StructField(case_opened_date,StringType,true),StructField(case_closed_date,StringType,true),StructField(SLA_due_date,StringType,true),StructField(case_late,StringType,true),StructField(num_days_late,DoubleType,true),StructField(case_closed,StringType,true),StructField(dept_division,StringType,true),StructField(service_request_type,StringType,true),StructField(SLA_days,DoubleType,true),StructField(case_status,StringType,true),StructField(source_id,StringType,true),StructField(request_address,StringType,true),StructField(council_district,LongType,true)))

How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [64]:
df.select('service_request_type')\
 .where(df.service_request_type == 'Stray Animal')\
 .count()

26760

How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [65]:
df.select('dept_division', 'service_request_type')\
    .where(df.dept_division == 'Field Operations')\
    .where(df.service_request_type != 'Officer Standby')\
    .count()

113902

Create a new DataFrame without any information related to dates or location.

In [66]:
no_dates_address_df = (df.drop('case_opened_date',
                       'case_closed_date',
                       'SLA_due_date',
                       'request_address'))

In [67]:
no_dates_address_df.show(2)

+----------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+----------------+
|   case_id|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|council_district|
+----------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+----------------+
|1014127332|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|               5|
|1014127333|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|               3|
+----------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+----------------+
only showing top 2 rows



Read dept.csv into a Spark DataFrame. Inspect the dept_name column. Replace the missing values with "other".