# Zach's Wrangling with Spark lecture

In [1]:
import pyspark
from pyspark.sql.functions import *

In [2]:
# to get back the original sum function
from builtins import sum as builtin_sum

In [3]:
sum

<function pyspark.sql.functions._create_function.<locals>._(col)>

In [4]:
builtin_sum([1, 2, 3, 4, 5])

15

In [5]:
# This step could get very complicated
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [6]:
spark

## Reading / Writing Data

- spark.read.csv

- spark.read.json

These functions will produce a dataframe

**Important Options**

- inferSchema = tells spark to go through the data and correctly read the datatypes

- (takes time, especially with a larger dataset)

header=True

Instead of inferSchema you could create a custom data schema.  Why?  For control of the datatypes / data structures in the df that's returned.  This speeds Spark up, AND programmatically documents my data structure / reasoning.  Instead of using MarkDown, we're doing it in Python

**In the real world, we'll be setting our own schemas (per format in the curriculum), but for the sake of this course, we'll just be using 'inferSchema.'

### Writing Data:

- "df.write.csv(filename)"

- "df.read.csv(filename, header=True, inferSchema=True)" (for the curriculum)

## Data Prep

In [7]:
df = spark.read.csv("case.csv", inferSchema=True, header=True)

In [8]:
df.show(5, vertical=True, truncate=False)

# 'SLA' = Service Level Agreement 

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616000001                    
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

In [9]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [10]:
df.withColumnRenamed("SLA_due_date", "case_due_date").show(1, vertical=True)

# Shows that 'SLA_due_date' has been renamed with 'case_due_date:'

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [11]:
# To make sure it's always a yes or no in case_closed:

df.groupby("case_late").pivot("case_closed").count().show()

+---------+-----+------+
|case_late|   NO|   YES|
+---------+-----+------+
|      YES| 6525| 87978|
|       NO|11585|735616|
+---------+-----+------+



In [12]:
# .withColumn to transform columns
df.withColumn('case_late', col('case_late') == 'YES').show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [13]:
# .withColumn to transform columns inplace
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [14]:
df.select('council_district').show(5)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
+----------------+
only showing top 5 rows



In [15]:
df.groupby('council_district').count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



**^^ Tells me that there's only districts 0-10, but loads of counts within each**

In [16]:
df.show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [17]:
df = df.withColumn('council_district', format_string('%04d', col('council_district')))

In [18]:
df.select('case_opened_date', 'case_closed_date', 'case_due_date').show()

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
|     1/1/18 6:28|    1/1/18 14:38| 1/31/18 8:30|
|     1/1/18 6:57|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:59|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:00|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:03|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:05|    1/2/18 15:33| 1/17/18 8:30|


### Java SimpleDateFormat

In [19]:
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

In [20]:
df.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [21]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [22]:
'    a   b    c    '.strip()

'a   b    c'

In [23]:
df = df.withColumn('request_address', lower(trim(col('request_address'))))

df.select('request_address').show(truncate=False)

+----------------------------------------+
|request_address                         |
+----------------------------------------+
|2315  el paso st, san antonio, 78207    |
|2215  goliad rd, san antonio, 78223     |
|102  palfrey st w, san antonio, 78223   |
|114  la garde st, san antonio, 78223    |
|734  clearview dr, san antonio, 78228   |
|bandera rd and bresnahan                |
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
+----------

In [24]:
(
    # 0 is the capture group, 0th capture group is the entire match
    df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))
    .select('zipcode', 'request_address')
    .show(truncate=False)
)

+-------+----------------------------------------+
|zipcode|request_address                         |
+-------+----------------------------------------+
|78207  |2315  el paso st, san antonio, 78207    |
|78223  |2215  goliad rd, san antonio, 78223     |
|78223  |102  palfrey st w, san antonio, 78223   |
|78223  |114  la garde st, san antonio, 78223    |
|78228  |734  clearview dr, san antonio, 78228   |
|       |bandera rd and bresnahan                |
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |834  barrel point, san antonio, 78251   |
|78251  |834  barrel point, san

In [25]:
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

df.show(3, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 2018-01-01 00:42:00                   
 case_closed_date     | 2018-01-01 12:29:00                   
 case_due_date        | 2020-09-26 00:42:00                   
 case_late            | false                                 
 num_days_late        | -998.5087616000001                    
 case_closed          | true                                  
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  el paso st, san antonio, 78207  
 council_district     | 0005                                  
 zipcode              | 78207                          

In [26]:
(
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_age', 'days_to_closed', 'case_lifetime')
    .show()
)

+-------------------+----------------+--------+--------------+-------------+
|   case_opened_date|case_closed_date|case_age|days_to_closed|case_lifetime|
+-------------------+----------------+--------+--------------+-------------+
|2018-01-02 09:39:00|            null|     889|          null|          889|
|2018-01-02 10:49:00|            null|     889|          null|          889|
|2018-01-02 13:45:00|            null|     889|          null|          889|
|2018-01-02 14:09:00|            null|     889|          null|          889|
|2018-01-02 14:34:00|            null|     889|          null|          889|
|2018-01-02 15:22:00|            null|     889|          null|          889|
|2018-01-02 15:58:00|            null|     889|          null|          889|
|2018-01-03 08:04:00|            null|     888|          null|          888|
|2018-01-03 09:18:00|            null|     888|          null|          888|
|2018-01-03 09:26:00|            null|     888|          null|          888|

In [27]:
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

df.show(3, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 0005                 
 zipcode              | 78207                
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05

In [28]:
df.sample(fraction=.01).show(5, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127410           
 case_opened_date     | 2018-01-01 10:13:00  
 case_closed_date     | 2018-02-21 20:06:00  
 case_due_date        | 2018-01-06 10:13:00  
 case_late            | true                 
 num_days_late        | 46.41153935          
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Animal Neglect       
 SLA_days             | 5.0                  
 case_status          | Closed               
 source_id            | ns16326              
 request_address      | 339  brandywine a... 
 council_district     | 0007                 
 zipcode              | 78228                
 case_lifetime        | 51                   
-RECORD 1------------------------------------
 case_id              | 1014127721           
 case_opened_date     | 2018-01-01 18:13:00  
 case_closed_date     | 2018-01-02 08:20:00  
 case_due_date        | 2018-03-07

### Using the 'Dept' csv file

In [29]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(10)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [30]:
df.join(dept, "dept_division", "left")

DataFrame[dept_division: string, case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_lifetime: int, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

In [31]:
df

DataFrame[case_id: int, case_opened_date: timestamp, case_closed_date: timestamp, case_due_date: timestamp, case_late: boolean, num_days_late: double, case_closed: boolean, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: string, zipcode: string, case_lifetime: int]

In [32]:
# Train / Test Split Using Spark

train, test = df.randomSplit([.8, .2])

In [33]:
train.count()

673081

In [34]:
test.count()

168623

In [35]:
train, validate, test = df.randomSplit([.7, .15, .15], seed=123)

In [36]:
train.count()

588665

In [37]:
validate.count()

126618

In [38]:
test.count()

126421

In [39]:
# Checking totals

total_train_test = 673322 + 168382

total_train_validate_test = 588665 + 126618 + 126421

print(total_train_test)
print(total_train_validate_test)

841704
841704


**^^ Cool.  Checks out.**

# Exercises

## Data Acquisition

Read the case, department, and source data into their own spark dataframes.

Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json

- Inspect your folder structure. What do you notice?

- Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.


In [40]:
case_df = spark.read.csv("case.csv", inferSchema=True, header=True)

In [41]:
dept_df = spark.read.csv("dept.csv", inferSchema=True, header=True)

In [42]:
source_df = spark.read.csv("source.csv", inferSchema=True, header=True)

In [43]:
source_df.show(5)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



**^^ Ran case_df and dept_df to make sure they were read in, and they were.**

In [44]:
# storing as 'sources.csv'

csv_sources = source_df.toPandas().to_csv("sources_csv")

In [45]:
# storing as 'sources_json'

json_sources = source_df.toPandas().to_json("sources_json")

**Inspecting the folder structure, I notice:**

- 'sources_csv' is MUCH easier to read naturally, and is already in a sort of table format

- 'sources_json' is harder to read, but looks like a dictionary of dictionaries


In [59]:
df_csv = spark.read.csv("sources_csv", inferSchema=True, header=True)
df_csv.head(5)

[Row(_c0=0, source_id='100137', source_username='Merlene Blodgett'),
 Row(_c0=1, source_id='103582', source_username='Carmen Cura'),
 Row(_c0=2, source_id='106463', source_username='Richard Sanchez'),
 Row(_c0=3, source_id='119403', source_username='Betty De Hoyos'),
 Row(_c0=4, source_id='119555', source_username='Socorro Quiara')]

In [60]:
df_csv.dtypes

[('_c0', 'int'), ('source_id', 'string'), ('source_username', 'string')]

In [65]:
df_json = spark.read.json("sources_json")
df_json.head(5)

[Row(source_id=Row(0='100137', 1='103582', 10='136202', 100='ih24384', 101='jg06389', 102='js12254', 103='js26451', 104='jw10936', 105='lb07325', 106='lb24838', 107='ld04403', 108='lg26383', 109='ls26247', 11='136979', 110='mc21309', 111='me05816', 112='mg26820', 113='mp21218', 114='mp26640', 115='mt13131', 116='mt23152', 117='mt26294', 118='np26458', 119='ns16326', 12='137943', 120='ps01944', 121='rb05270', 122='rs16746', 123='ru26699', 124='sg22264', 125='sg26196', 126='sp26368', 127='ss09159', 128='ss21394', 129='ss26317', 13='138605', 130='sv24848', 131='svcCFlag', 132='svcCRMLS', 133='svcCRMSS', 134='sw26367', 135='ts15690', 136='vb22265', 137='vk26526', 138='yc16753', 139='yh24110', 14='138650', 15='138650', 16='138793', 17='138810', 18='139342', 19='139344', 2='106463', 20='139345', 21='139807', 22='139868', 23='140436', 24='140507', 25='140508', 26='140509', 27='140637', 28='140987', 29='140991', 3='119403', 30='140992', 31='141239', 32='141240', 33='141241', 34='141256', 35='1

In [66]:
df_json.dtypes

[('source_id',
  'struct<0:string,1:string,10:string,100:string,101:string,102:string,103:string,104:string,105:string,106:string,107:string,108:string,109:string,11:string,110:string,111:string,112:string,113:string,114:string,115:string,116:string,117:string,118:string,119:string,12:string,120:string,121:string,122:string,123:string,124:string,125:string,126:string,127:string,128:string,129:string,13:string,130:string,131:string,132:string,133:string,134:string,135:string,136:string,137:string,138:string,139:string,14:string,15:string,16:string,17:string,18:string,19:string,2:string,20:string,21:string,22:string,23:string,24:string,25:string,26:string,27:string,28:string,29:string,3:string,30:string,31:string,32:string,33:string,34:string,35:string,36:string,37:string,38:string,39:string,4:string,40:string,41:string,42:string,43:string,44:string,45:string,46:string,47:string,48:string,49:string,5:string,50:string,51:string,52:string,53:string,54:string,55:string,56:string,57:string,5

**^^ In 'sources_csv,' the index is an integer and the rest of the data is in string format.  In 'sources_json,' all the data is in string format.**

## Zach's Data Prep

In [81]:
df = spark.read.csv('case.csv', header=True, inferSchema=True)

# Rename column
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)
df = df.withColumn('council_district', format_string('%04d', col('council_district')))
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))
# Extract zipcode
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

# Join departments and sources
depts = spark.read.csv('dept.csv', header=True, inferSchema=True)
sources = spark.read.csv('source.csv', header=True, inferSchema=True)

df = df.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')

# # Train Test Split
# train, test = df.randomSplit([.8, .2])
# train, validate, test = df.randomSplit([.7, .15, .15], seed=123)

AnalysisException: 'Path does not exist: file:/Users/DataScience/codeup-data-science/spark/data/dept.csv;'

### 1.) How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [68]:
(
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .filter(~ col('case_closed'))
    .select('case_opened_date', 'case_closed_date', 'case_age', 'days_to_closed', 'case_lifetime')
    .show()
)

+-------------------+----------------+--------+--------------+-------------+
|   case_opened_date|case_closed_date|case_age|days_to_closed|case_lifetime|
+-------------------+----------------+--------+--------------+-------------+
|2018-01-02 09:39:00|            null|     889|          null|          889|
|2018-01-02 10:49:00|            null|     889|          null|          889|
|2018-01-02 13:45:00|            null|     889|          null|          889|
|2018-01-02 14:09:00|            null|     889|          null|          889|
|2018-01-02 14:34:00|            null|     889|          null|          889|
|2018-01-02 15:22:00|            null|     889|          null|          889|
|2018-01-02 15:58:00|            null|     889|          null|          889|
|2018-01-03 08:04:00|            null|     888|          null|          888|
|2018-01-03 09:18:00|            null|     888|          null|          888|
|2018-01-03 09:26:00|            null|     888|          null|          888|

In [74]:
df = spark.read.csv("sources_csv", inferSchema=True, header=True)

df.head(5)

[Row(_c0=0, source_id='100137', source_username='Merlene Blodgett'),
 Row(_c0=1, source_id='103582', source_username='Carmen Cura'),
 Row(_c0=2, source_id='106463', source_username='Richard Sanchez'),
 Row(_c0=3, source_id='119403', source_username='Betty De Hoyos'),
 Row(_c0=4, source_id='119555', source_username='Socorro Quiara')]

### 2.) How many Stray Animal cases are there?

In [77]:
df_case = spark.read.csv("case.csv", inferSchema=True, header=True)

df_case.head(5)

[Row(case_id=1014127332, case_opened_date='1/1/18 0:42', case_closed_date='1/1/18 12:29', SLA_due_date='9/26/20 0:42', case_late='NO', num_days_late=-998.5087616000001, case_closed='YES', dept_division='Field Operations', service_request_type='Stray Animal', SLA_days=999.0, case_status='Closed', source_id='svcCRMLS', request_address='2315  EL PASO ST, San Antonio, 78207', council_district=5),
 Row(case_id=1014127333, case_opened_date='1/1/18 0:46', case_closed_date='1/3/18 8:11', SLA_due_date='1/5/18 8:30', case_late='NO', num_days_late=-2.0126041669999997, case_closed='YES', dept_division='Storm Water', service_request_type='Removal Of Obstruction', SLA_days=4.322222222, case_status='Closed', source_id='svcCRMSS', request_address='2215  GOLIAD RD, San Antonio, 78223', council_district=3),
 Row(case_id=1014127334, case_opened_date='1/1/18 0:48', case_closed_date='1/2/18 7:57', SLA_due_date='1/5/18 8:30', case_late='NO', num_days_late=-3.022337963, case_closed='YES', dept_division='Stor

In [79]:
from pyspark.sql.functions import col, countDistinct

df_case.agg(*(countDistinct(col("service_request_type")).alias("service_type") for c in df_case.columns))

DataFrame[service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint, service_type: bigint]

### 3.) How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

### 4.) Convert the council_district column to a string column.

### 5.) Extract the year from the case_closed_date column.

### 6.) Convert num_days_late from days to hours in new columns num_hours_late.

### 7.) Join the case data with the source and department data.

### 8.) Are there any cases that do not have a request source?

### 9.) What are the top 10 service request types in terms of number of requests?

### 10.) What are the top 10 service request types in terms of average days late?

### 11.) Does number of days late depend on department?

### 12.) How do number of days late depend on department and request type?