In [1]:
from pyspark.sql import SparkSession
# from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

In [108]:
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType
# from pyspark.sql.functions import concat, sum, avg, min, max, count, mean, round
# from pyspark.sql.functions import lit
# from pyspark.sql.functions import when
# from pyspark.sql.functions import col, expr, lower, trim
# from pyspark.sql.functions import format_string, to_timestamp
# from pyspark.sql.functions import regexp_extract


In [3]:
spark

## Data Acquisision

1. Read the case, department, and source data into their own spark dataframes.

2. Let's see how writing to the local disk works in spark:
    - Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
    - Inspect your folder structure. What do you notice?    
3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

### 1. Read the case, department, and source data into their own spark dataframes.

In [92]:
schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType()),
])

source = spark.read.csv("data/source.csv", header=True, schema=schema)
source.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [94]:
# case dataset
df = spark.read.csv('data/case.csv', header=True, inferSchema=True)
df.printSchema()
df.show(2, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


In [95]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



### 2. Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?

In [98]:
source.write.csv('source.csv', mode = 'overwrite')
source.write.json('source.json', mode = 'overwrite')
df.write.csv('case.csv', mode = 'overwrite')
df.write.json('case.json', mode = 'overwrite')
dept.write.csv('dept.csv', mode = 'overwrite')
dept.write.json('dept.json', mode = 'overwrite')

### 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

### Prep for main dataframe

In [7]:
df = spark.read.csv('data/case.csv', header=True, inferSchema=True)
df.printSchema()
df.show(5, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  


In [8]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [9]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#37, case_opened_date#38, case_closed_date#39, SLA_due_date#40 AS case_due_date#136, case_late#41, num_days_late#42, case_closed#43, dept_division#44, service_request_type#45, SLA_days#46, case_status#47, source_id#48, request_address#49, council_district#50]
+- FileScan csv [case_id#37,case_opened_date#38,case_closed_date#39,SLA_due_date#40,case_late#41,num_days_late#42,case_closed#43,dept_division#44,service_request_type#45,SLA_days#46,case_status#47,source_id#48,request_address#49,council_district#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/luke/codeup-data-science/spark-exercises/data/case.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<case_id:int,case_opened_date:string,case_closed_date:string,SLA_due_date:string,case_late:...




In [10]:
# Changing these case columns to booleans:
df = df.withColumn("case_late", expr("case_late == 'YES'"))
df = df.withColumn("case_closed", expr("case_closed == 'YES'"))

In [14]:
# standardizing the district column
df = df.withColumn("council_district", format_string("%03d", col("council_district")))

In [16]:
fmt = "M/d/yy H:mm"

df.select(
    "case_opened_date",
    to_timestamp("case_opened_date", fmt)
).show(5)

+----------------+-----------------------------------------------+
|case_opened_date|to_timestamp(`case_opened_date`, 'M/d/yy H:mm')|
+----------------+-----------------------------------------------+
|     1/1/18 0:42|                            2018-01-01 00:42:00|
|     1/1/18 0:46|                            2018-01-01 00:46:00|
|     1/1/18 0:48|                            2018-01-01 00:48:00|
|     1/1/18 1:29|                            2018-01-01 01:29:00|
|     1/1/18 1:34|                            2018-01-01 01:34:00|
+----------------+-----------------------------------------------+
only showing top 5 rows



In [17]:
# Adjusting date columns:

fmt = "M/d/yy H:mm"

df = df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
df = df.withColumn("case_closed_date", to_timestamp("case_closed_date", fmt))
df = df.withColumn("case_due_date", to_timestamp("case_due_date", fmt))

In [18]:
df.explain()

== Physical Plan ==
*(1) Project [case_id#37, gettimestamp(case_opened_date#38, M/d/yy H:mm, Some(America/Chicago)) AS case_opened_date#493, gettimestamp(case_closed_date#39, M/d/yy H:mm, Some(America/Chicago)) AS case_closed_date#508, gettimestamp(SLA_due_date#40, M/d/yy H:mm, Some(America/Chicago)) AS case_due_date#523, (case_late#41 = YES) AS case_late#151, num_days_late#42, (case_closed#43 = YES) AS case_closed#166, dept_division#44, service_request_type#45, SLA_days#46, case_status#47, source_id#48, request_address#49, format_string(%03d, council_district#50) AS council_district#394]
+- FileScan csv [case_id#37,case_opened_date#38,case_closed_date#39,SLA_due_date#40,case_late#41,num_days_late#42,case_closed#43,dept_division#44,service_request_type#45,SLA_days#46,case_status#47,source_id#48,request_address#49,council_district#50] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex[file:/Users/luke/codeup-data-science/spark-exercises/data/case.csv], PartitionFi

In [19]:
df.printSchema()
df.show(3, vertical=True)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true      

#### Standardizing addresses

In [22]:
df = df.withColumn("request_address", lower(trim("request_address")))

In [24]:
df.printSchema()
df.show(2, vertical=True, truncate=False)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                 

In [29]:
df = df.withColumn("zipcode", regexp_extract("request_address", r"(\d+)$", 1))

In [38]:
# case_lifetime
#   - if case is closed then diff between close and open dates -- days_to_close
#   - else diff between open date and now -- case_age

df.select(
    "case_opened_date",
    "case_closed_date",
    "case_closed",
    datediff(current_timestamp(), "case_opened_date").alias("case_age"),
    datediff("case_closed_date", "case_opened_date").alias("days_to_close"),
).withColumn(
    "case_lifetime",
    when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")),
).show(10)

+-------------------+-------------------+-----------+--------+-------------+-------------+
|   case_opened_date|   case_closed_date|case_closed|case_age|days_to_close|case_lifetime|
+-------------------+-------------------+-----------+--------+-------------+-------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|       true|    1064|            0|            0|
|2018-01-01 00:46:00|2018-01-03 08:11:00|       true|    1064|            2|            2|
|2018-01-01 00:48:00|2018-01-02 07:57:00|       true|    1064|            1|            1|
|2018-01-01 01:29:00|2018-01-02 08:13:00|       true|    1064|            1|            1|
|2018-01-01 01:34:00|2018-01-01 13:29:00|       true|    1064|            0|            0|
|2018-01-01 06:28:00|2018-01-01 14:38:00|       true|    1064|            0|            0|
|2018-01-01 06:57:00|2018-01-02 15:32:00|       true|    1064|            1|            1|
|2018-01-01 06:58:00|2018-01-02 15:32:00|       true|    1064|            1|            1|

In [52]:
df = df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
df = df.withColumn("days_to_close", datediff("case_closed_date", "case_opened_date"))
df = df.withColumn("case_lifetime", when(col("case_closed"), col("days_to_close")).otherwise(col("case_age")))

Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

- I believe so, just need to join the dept dataframe into this regular df.

#### Joining the spark df's

In [39]:
dept = spark.read.csv("data/dept.csv", header=True, inferSchema=True)
dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [46]:
df = df.join(dept, "dept_division", "left")
df.show(2, vertical = True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 005                  
 zipcode                | 78207                
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Storm Water   

In [97]:
source.printSchema(), dept.printSchema(), df.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



(None, None, None)

#### Splitting the data:

In [54]:
train, test = df.randomSplit([0.8, 0.2])
#
train, validate, test = df.randomSplit([0.6, 0.2, 0.2])

### Acquire and Prep complete

In [55]:
df.write.json("wrangle-json")

## Explore Questions:

#### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [64]:
df.select("SLA_days", "case_status", "case_opened_date", "case_closed_date", "case_due_date", "case_late").where(df.case_status == 'Open').sort(df.case_opened_date.desc()).show(5)

+--------+-----------+-------------------+----------------+-------------------+---------+
|SLA_days|case_status|   case_opened_date|case_closed_date|      case_due_date|case_late|
+--------+-----------+-------------------+----------------+-------------------+---------+
|    14.0|       Open|2018-08-08 10:38:00|            null|2018-08-22 10:38:00|    false|
|    14.0|       Open|2018-08-08 10:38:00|            null|2018-08-22 10:38:00|    false|
|     9.0|       Open|2018-08-08 10:38:00|            null|2018-08-17 10:38:00|    false|
|    64.0|       Open|2018-08-08 10:37:00|            null|2018-10-11 10:37:00|    false|
|    64.0|       Open|2018-08-08 10:37:00|            null|2018-10-11 10:37:00|    false|
+--------+-----------+-------------------+----------------+-------------------+---------+
only showing top 5 rows



#### 2. How many Stray Animal cases are there?

In [69]:
# How many Stray Animal cases are there?

# 26760 is what george got as well.

df.select("service_request_type").where(df.service_request_type == "Stray Animal").count()

26760

#### 3. How many service requests

In [73]:

# How many service requests that are assigned to the Field Operations department (dept_division) 
# are not classified as "Officer Standby" request type (service_request_type)?

df.filter((col('service_request_type') != 'Officer Standby') & (col("dept_division") == 'Field Operations')).count()

113902

#### 4. Convert the `council_district` column to a string column.

In [74]:
df.printSchema()


root
 |-- dept_division: string (nullable = true)
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = false)
 |-- zipcode: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)
 |-- case_age: integer (nullable = true)
 |-- days_to_close: integer (nullable = true)
 |-- case_lifetime: integer (nullable = true)



#### 5. Extract the year from the `case_closed_date` column.

In [79]:
df = df.withColumn("year", (year("case_closed_date")))
df.show(2, vertical = True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 005                  
 zipcode                | 78207                
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
 case_age               | 1065                 
 days_to_close          | 0             

#### 6. Convert `num_days_late` from days to hours in new columns `num_hours_late`.

In [90]:
df = df.withColumn("num_hours_late", (round(df.num_days_late * 24, 2)))
df.show(2, vertical = True)
# figured this one out.

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 005                  
 zipcode                | 78207                
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
 case_age               | 1065                 
 days_to_close          | 0             

#### 7. Join the case data with the source and department data.

In [91]:
schema = StructType([
    StructField("source_id", StringType()),
    StructField("source_username", StringType()),
])

df_source = spark.read.csv("data/source.csv", header=True, schema=schema)
df_source.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [None]:
df.show(2, vertical)

#### 8. Are there any cases that do not have a request source?

# Need to go through and clean up my code using these guides from Zach. My df isn't the same column types as his, so I'll need to adjust the guides accordingly.

#### 12. How do number of days late depend on department and request type?

In [102]:

# How do number of days late depend on department and request type?
(
    df.filter("case_closed")
    .filter("case_late")
    .groupby("standardized_dept_name", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .sort(desc("days_late"))
    .show(40, truncate=False)
)

AnalysisException: filter expression '`case_closed`' of type string is not a boolean.;;
Filter case_closed#3600: string
+- Relation[case_id#3594,case_opened_date#3595,case_closed_date#3596,SLA_due_date#3597,case_late#3598,num_days_late#3599,case_closed#3600,dept_division#3601,service_request_type#3602,SLA_days#3603,case_status#3604,source_id#3605,request_address#3606,council_district#3607] csv


In [105]:
df.select(max('case_opened_date')).collect()

[Row(max(case_opened_date)='9/9/17 9:59')]

In [106]:
max_date = df.select(max('case_opened_date'), max('case_closed_date')).first()[0]
max_date

'9/9/17 9:59'

In [109]:
max_date = max_date.strftime('%Y-%m-%d %H:%M:%S')

AttributeError: 'str' object has no attribute 'strftime'

In [103]:
df = (
    df.withColumn('case_age', datediff(lit(max_date), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

NameError: name 'max_date' is not defined