# Data Wrangling with Spark

In [1]:
import pyspark
from pyspark.sql.functions import *

In [2]:
# This step could be very complicated
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
spark

## Reading / Writing Data

- `spark.read.csv`
- `spark.read.json`

These functions will produce a dataframe

important optionsa
- headers=True
- inferSchema=True

Instead of `inferSchema` is to create a custom data schema. Why?

- Speed
- Programmatic documentation of data structure 

Writing Data

- `df.write.csv(filename)`
- `df.write.json(filename)`



# Data Prep

In [4]:
df = spark.read.csv('case.csv', header=True, inferSchema=True)

In [5]:
df.show(5, vertical=True, truncate=False)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616000001                    
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

In [6]:
df = df.withColumnRenamed("SLA_due_date", "case_due_date")

In [7]:
df.withColumnRenamed("SLA_due_date", "case_due_date").show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [8]:
df.groupBy("case_late", 'case_closed').count().show()

+---------+-----------+------+
|case_late|case_closed| count|
+---------+-----------+------+
|       NO|        YES|735616|
|      YES|        YES| 87978|
|       NO|         NO| 11585|
|      YES|         NO|  6525|
+---------+-----------+------+



In [9]:
df.groupBy("case_late").pivot("case_closed").count().show()

+---------+-----+------+
|case_late|   NO|   YES|
+---------+-----+------+
|      YES| 6525| 87978|
|       NO|11585|735616|
+---------+-----+------+



In [10]:
# .withColumn to transform columns
df.withColumn("case_late", col("case_late") == "YES").show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [11]:
df = (
    df.withColumn("case_late", col("case_late") == "YES")
    .withColumn("case_closed", col("case_closed")== "YES")
)

In [12]:
df.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [13]:
df.select("council_district").show()

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
|               7|
|               7|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
|               4|
+----------------+
only showing top 20 rows



In [14]:
df.groupBy("council_district").count().show()

+----------------+------+
|council_district| count|
+----------------+------+
|               1|119309|
|               6| 74095|
|               3|102706|
|               5|114609|
|               9| 40916|
|               4| 93778|
|               8| 42345|
|               7| 72445|
|              10| 62926|
|               2|114745|
|               0|  3830|
+----------------+------+



In [15]:
df = df.withColumn("council_district", format_string("%04d", col("council_district")))

In [16]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | false                
 num_days_late        | -2.0126041

In [17]:
df.select("case_opened_date", "case_closed_date", "case_due_date").show()

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
|     1/1/18 6:28|    1/1/18 14:38| 1/31/18 8:30|
|     1/1/18 6:57|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:58|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 6:59|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:00|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:02|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:03|    1/2/18 15:32| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:04|    1/2/18 15:33| 1/17/18 8:30|
|     1/1/18 7:05|    1/2/18 15:33| 1/17/18 8:30|


In [18]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [19]:
df = (df.withColumn("case_opened_date", to_timestamp(col("case_opened_date"), "M/d/yy H:mm"))
.withColumn("case_closed_date", to_timestamp(col("case_closed_date"), "M/d/yy H:mm"))
.withColumn("case_due_date", to_timestamp(col("case_due_date"), "M/d/yy H:mm"))
)

In [20]:
df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 0005                 
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01 00:46:00  
 case_closed_date     | 2018-01-03 08:11:00  
 case_due_date        | 2018-01-05 08:30:00  
 case_late            | false                
 num_days_late        | -2.0126041

In [21]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [22]:
df = df.withColumn("request_address", lower(trim(col("request_address"))))

In [23]:
df.select("request_address").show(truncate=False)

+----------------------------------------+
|request_address                         |
+----------------------------------------+
|2315  el paso st, san antonio, 78207    |
|2215  goliad rd, san antonio, 78223     |
|102  palfrey st w, san antonio, 78223   |
|114  la garde st, san antonio, 78223    |
|734  clearview dr, san antonio, 78228   |
|bandera rd and bresnahan                |
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10133  figaro canyon, san antonio, 78251|
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|10129  boxing pass, san antonio, 78251  |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
|834  barrel point, san antonio, 78251   |
+----------

In [24]:
(
    # 0 is the capture group, 0th capture group
df.withColumn("zipcode", regexp_extract(col("request_address"), r"\d+$", 0))
.select("zipcode", 'request_address')
.show(truncate=False)
)

+-------+----------------------------------------+
|zipcode|request_address                         |
+-------+----------------------------------------+
|78207  |2315  el paso st, san antonio, 78207    |
|78223  |2215  goliad rd, san antonio, 78223     |
|78223  |102  palfrey st w, san antonio, 78223   |
|78223  |114  la garde st, san antonio, 78223    |
|78228  |734  clearview dr, san antonio, 78228   |
|       |bandera rd and bresnahan                |
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10133  figaro canyon, san antonio, 78251|
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |10129  boxing pass, san antonio, 78251  |
|78251  |834  barrel point, san antonio, 78251   |
|78251  |834  barrel point, san

In [25]:
(
    # 0 is the capture group, 0th capture group
df.withColumn("zipcode", regexp_extract(col("request_address"), r"\d+$", 0))
.select("zipcode", 'request_address')
    
)

DataFrame[zipcode: string, request_address: string]

Case Lifetime: how long it took to close the case or how long the case has been open?

In [30]:
df.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 12:29:00  
 case_due_date        | 2020-09-26 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  el paso st,... 
 council_district     | 0005                 
only showing top 1 row



In [31]:
(
    df.withColumn("case_age", datediff(current_timestamp(), "case_opened_date"))
    .withColumn("days_closed", datediff("case_closed_date", "case_opened_date"))
    .withColumn("case_lifetime", when(col("case_closed"), col("days_to_closed").otherwise("case_age")))
    .filter(~ col("case_closed"))
    .select("case_opened_date", "case_closed_date", "case_age", "days_to_closed")
    .show()
)

IllegalArgumentException: 'otherwise() can only be applied on a Column previously generated by when()'

In [34]:
dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [37]:
df = df.join(dept, "dept_division", "left")
df.show(3, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | Field Operations     
 case_id                | 1014127332           
 case_opened_date       | 2018-01-01 00:42:00  
 case_closed_date       | 2018-01-01 12:29:00  
 case_due_date          | 2020-09-26 00:42:00  
 case_late              | false                
 num_days_late          | -998.5087616000001   
 case_closed            | true                 
 service_request_type   | Stray Animal         
 SLA_days               | 999.0                
 case_status            | Closed               
 source_id              | svcCRMLS             
 request_address        | 2315  el paso st,... 
 council_district       | 0005                 
 dept_name              | Animal Care Services 
 standardized_dept_name | Animal Care Services 
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Storm Water          
 case_id                | 1014127333    

In [41]:
# data splitting with spark

train, test = df.randomSplit([.8, .2])

In [42]:
train.count()

672722

In [43]:
test.count()

168982

In [44]:
train, validate, test = df.randomSplit([.7, .15, .15], seed=123)