### Data Wrangling

https://docs.google.com/presentation/d/1v54Tr4POZj9K4zsaHOn7U22rzdSlTXsyCgE5lfhd-rA/edit?usp=sharing

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

#### Read data 

We will use San Antonio's 311 call data for this lesson and exercises. Dowload the CSV data from Google classroom

https://classroom.google.com/u/0/w/Mzg3MTg5NzU1Njk1/tc/Mzg3MTg5NzU1NzE1

In [2]:
# Read in CSV file 
source = (spark.read.csv("source.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [3]:
# Another way to read in data:

(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

#### Data Schemas
Spark includes a concept of a data schema  
Specify the types of our data ahead of time

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)


# Read csv, but now we specify the schema:

source = spark.read.csv("source.csv", header=True, schema=schema)

In [5]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



#### Writing data

In [6]:
# write data to a destination using .write property

source.write.json("source_json", mode="overwrite")

#### Data Preparation

In [7]:
# Read the case.csv file

df = spark.read.csv("case.csv", header=True, inferSchema=True)

In [8]:
# shape of dataframe
len(df.columns), df.count() 

(14, 841704)

In [10]:
# look at first three records
df.show(3, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [11]:
# datatypes?
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

**Things to do:**

1. **Rename Columns:**
    - 'SLA_due_date -> case_due_date



2. **Correct Data Types:**
    - case_closed and case_late to boolean
    - council_district as a string
    - case_opened_date, case_closed_date and case_due_date to datetime format


3. **Data Transformation:**
    - request_address: trim and lowercase
    - format council district with leading zeros
    - convert the number of days a case is late to a number of weeks
    
    
4. **New features:**
    - zip_code : extract from address
    - case_age 
    - days_to_closed
    - case_lifetime
    
    
5. **Join cases data with department data:**


#### Rename Columns:

In [17]:
# Rename 'SLA_due_date' to 'case_due_date' using .withColumnRenamed

df = df.withColumnRxenamed('SLA_due_date', 'case_due_date')
df.show(5, vertical = True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 case_due_date        | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 case_due_date        | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

#### Correct Data Types:

In [18]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [19]:
# correct data types: case_closed and case_late to boolean

df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|       NO|
|        YES|      YES|
+-----------+---------+
only showing top 5 rows



In [28]:
# use .withColumn to change columns from string to boolean values
df = df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

In [29]:
# check the columns
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [30]:
# council_district cast as string
df.select('council_district').show(4)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
+----------------+
only showing top 4 rows



In [37]:
# council_district as a string instead of int

df = df.withColumn("council_district", col("council_district").cast("string"))

In [38]:
# view the column

df.show(3)

+----------+-------------------+----------------+-------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|   case_opened_date|case_closed_date|case_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+-------------------+----------------+-------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|2018-01-01 00:42:00|    1/1/18 12:29| 9/26/20 0:42|    false| -998.5087616000001|       true|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|2018-01-01 00:46:00|     1/3/18 8:11|  1/5/18 8:30|    false|-2.0126041669999997|       true|     Storm Water|Removal Of Obs

In [39]:
# check datatypes
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [32]:
# convert case_opened_date, case_closed_date and case_due_date to datetime format

df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [44]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"

df = df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))\
.withColumn('case_closed_date', to_timestamp('case_opened_date', fmt))\
.withColumn('case_due_date', to_timestamp('case_due_date', fmt))

In [45]:
# check the three columns again

df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



In [46]:
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'timestamp'),
 ('case_closed_date', 'timestamp'),
 ('case_due_date', 'timestamp'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

#### Data Transformation

In [47]:
# request_address: trim and lowercase

df.select('request_address').show(5, False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  EL PASO ST, San Antonio, 78207 |
|2215  GOLIAD RD, San Antonio, 78223  |
|102  PALFREY ST W, San Antonio, 78223|
|114  LA GARDE ST, San Antonio, 78223 |
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [50]:
# request_address: trim and lowercase

df = df.withColumn('request_address', lower(trim('request_address')))

In [51]:
df.select('request_address').show(3, truncate = False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
+-------------------------------------+
only showing top 3 rows



In [52]:
# convert the number of days a case is late to a number of weeks

df = df.withColumn('num_weeks_late', expr('num_days_late/7'))

In [53]:
df.select('num_days_late', 'num_weeks_late').show(3)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
+-------------------+--------------------+
only showing top 3 rows



In [54]:
# use format_string function to pad zeros for council_district

df = df.withColumn("council_district", col("council_district").cast("int"))

# '%03d' means at least 3 digits, pad with 0s
#
# In order to use the format_string function the way we are, we'll need to
# convert council_district back to an integer temporarily, but the final output
# will be a string.
df = df.withColumn(
    "council_district",
    format_string("%03d", col("council_district").cast("int")),
)

df.select("council_district").show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



#### New features:

In [55]:
# create a new column for zipcode:

df = df.withColumn("zipcode", regexp_extract("request_address", r"\d+$", 0))

df.select("zipcode").show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



case_age: How old the case is; the difference in days between when the case was opened and the current day  
days_to_closed: The number of days between when the case was opened and when it was closed  
case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened  

In [57]:
# create three new columns 'case_age', 'days_to_closed', 'case_lifetime'

df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [63]:
df.show(1, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 00:42:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode

In [64]:
# read the dept.csv file:

dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5, False, True)

-RECORD 0-----------------------------------------------
 dept_division          | 311 Call Center               
 dept_name              | Customer Service              
 standardized_dept_name | Customer Service              
 dept_subject_to_SLA    | YES                           
-RECORD 1-----------------------------------------------
 dept_division          | Brush                         
 dept_name              | Solid Waste Management        
 standardized_dept_name | Solid Waste                   
 dept_subject_to_SLA    | YES                           
-RECORD 2-----------------------------------------------
 dept_division          | Clean and Green               
 dept_name              | Parks and Recreation          
 standardized_dept_name | Parks & Recreation            
 dept_subject_to_SLA    | YES                           
-RECORD 3-----------------------------------------------
 dept_division          | Clean and Green Natural Areas 
 dept_name              | Parks

In [66]:
dept.select('dept_division').distinct().show()
dept.select('dept_name').show()

+--------------------+
|       dept_division|
+--------------------+
|       Miscellaneous|
|         Solid Waste|
|    Field Operations|
|             Streets|
|    Waste Collection|
|          District 7|
|Code Enforcement ...|
|         District 10|
|              Vector|
|        Reservations|
|   Dangerous Premise|
|     311 Call Center|
|               Brush|
|Dangerous Premise...|
|Code Enforcement ...|
|Traffic Engineeri...|
|          District 2|
|             Signals|
|Engineering Division|
|Director's Office...|
+--------------------+
only showing top 20 rows

+--------------------+
|           dept_name|
+--------------------+
|    Customer Service|
|Solid Waste Manag...|
|Parks and Recreation|
|Parks and Recreation|
|Code Enforcement ...|
|Code Enforcement ...|
|                null|
|Code Enforcement ...|
|Code Enforcement ...|
|Trans & Cap Impro...|
|        City Council|
|        City Council|
|        City Council|
|        City Council|
|        City Council|
|       

In [67]:
# join the df and dept dataframe using 'dept_division' as common key
# drop columns as needed (keep standardized_dept_name)
# convert dept_subject_to_SLA to boolean

df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [68]:
df.show(1, False, True)

-RECORD 0----------------------------------------------------
 dept_division        | Field Operations                     
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 00:42:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode