# Pandas vs Spark

### Pandas
- **bring in**: df = pd.read_csv("data.csv")
- **view**: df
- **rename**: df.columns =['a', 'b', 'c']
- **drop**: df.drop('mpg', axis=1)
- **filter**: df[df.mpg<20]    
- **add column**: df['gpm'] = 1/ df.mpg 
- **fill nulls**: df.fillna(0)
- **agg**: df.groupby(['cyl', 'gear'].agg)
- **datatime**: fmt = %Y%m%d'  OR  pd.to_datetime(df.date, fmt)

### PySpark
- **bring in**: df = spark.read.csv.()
- **view**: df.show()
- **rename**: df.toDF('a','b','c')    
- **drop**: df.drop('mpg')
- **filter**: 
- **add**: df.withColumn['gpm', 1/df.mpg]
- **fill Nulls**: df.fillna(0)
- **agg**:
- **datatime**: df.withColumn("date", to_timestamp("date",fmt))

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

##### 

In [2]:
# Read in CSV file 
source = (spark.read.csv("source.csv",
                     sep=",",
                     header=True,
                     inferSchema=True)
     )

In [3]:
# Another way to read in data:

(
    spark.read.format("csv")
    .option("sep", ",")
    .option("inferSchema", True)
    .option("header", True)
    .load("source.csv")
)

DataFrame[source_id: string, source_username: string]

### Data Schema
- Spark includes a concept of a data schema
- Specify the types of our data ahead of time

In [4]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField("source_id", StringType()),
        StructField("source_username", StringType()),
    ]
)


# Read csv, but now we specify the schema:

source = spark.read.csv("source.csv", header=True, schema=schema)

In [5]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



<hr style="border:2px solid black"> </hr>

### Writing data

In [6]:
# write data to a destination using .write property

source.write.json("source_json", mode="overwrite")

<hr style="border:2px solid black"> </hr>

### Data Preparation

In [7]:
# Read the case.csv file
df = spark.read.csv("case.csv", header=True, inferSchema=True)

In [8]:
# look at first three records
df.show(3)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [11]:
#mke it easier to read
df.show(3, truncate = False, vertical = True)

-RECORD 0-----------------------------------------------------
 case_id              | 1014127332                            
 case_opened_date     | 1/1/18 0:42                           
 case_closed_date     | 1/1/18 12:29                          
 SLA_due_date         | 9/26/20 0:42                          
 case_late            | NO                                    
 num_days_late        | -998.5087616000001                    
 case_closed          | YES                                   
 dept_division        | Field Operations                      
 service_request_type | Stray Animal                          
 SLA_days             | 999.0                                 
 case_status          | Closed                                
 source_id            | svcCRMLS                              
 request_address      | 2315  EL PASO ST, San Antonio, 78207  
 council_district     | 5                                     
-RECORD 1----------------------------------------------

In [9]:
#look at datatypes
df.dtypes

#dates will need to be changed to datetime, change case late to bool

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

<hr style="border:3px solid black"> </hr>

# Things to do:

### Rename Columns:
- 'SLA_due_date -> case_due_date

### Correct Data Types:
- case_closed and case_late to boolean
- council_district as a string
- case_opened_date, case_closed_date and case_due_date to datetime format

### Data Transformation:
- request_address: trim and lowercase
- format council district with leading zeros
- convert the number of days a case is late to a number of weeks

### New features:
- zip_code : extract from address
- case_age
- days_to_closed
- case_lifetime

### Join cases data with department data:


<hr style="border:2px solid black"> </hr>

## Step 1: Rename Columns:

In [14]:
df= df.withColumnRenamed('SLA_due_date', 'case_due_date')

In [17]:
df.show(1, truncate = False, vertical = True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 case_due_date        | 9/26/20 0:42                         
 case_late            | NO                                   
 num_days_late        | -998.5087616000001                   
 case_closed          | YES                                  
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
only showing top 1 row



<hr style="border:2px solid black"> </hr>

## Step 2: Correct Dtypes

In [None]:
df.dtypes

In [None]:
# correct data types: case_closed and case_late to boolean

df.select("case_closed", "case_late").show(5)

In [19]:
# use .withColumn to change columns from string to boolean values
df = df.withColumn('case_closed', expr('case_closed == "YES"'))\
.withColumn('case_late', expr('case_late == "YES"'))

In [20]:
df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [21]:
# council_district cast as string
df.select('council_district').show(4)

+----------------+
|council_district|
+----------------+
|               5|
|               3|
|               3|
|               3|
+----------------+
only showing top 4 rows



In [40]:
# council_district as a string instead of int

df = df.withColumn('council_district', col('council_district').cast('string'))

In [41]:
# view the column

df.select('council_district').show(4)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
+----------------+
only showing top 4 rows



In [23]:
# check datatypes
df.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('case_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [24]:
# convert case_opened_date, case_closed_date and case_due_date to datetime format

df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows



In [25]:
# to_timestamp, fmt

fmt = "M/d/yy H:mm"
df = df.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))\
.withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))\
.withColumn('case_due_date', to_timestamp('case_due_date', fmt))

In [26]:
# check the three columns again

df.select('case_opened_date', 'case_closed_date', 'case_due_date').show(5)

+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|
|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|
|2018-01-01 00:48:00|2018-01-02 07:57:00|2018-01-05 08:30:00|
|2018-01-01 01:29:00|2018-01-02 08:13:00|2018-01-17 08:30:00|
|2018-01-01 01:34:00|2018-01-01 13:29:00|2018-01-01 04:34:00|
+-------------------+-------------------+-------------------+
only showing top 5 rows



<hr style="border:2px solid black"> </hr>

## Step 3: Data Transformation

In [27]:
# request_address: trim and lowercase

df.select('request_address').show(5, False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  EL PASO ST, San Antonio, 78207 |
|2215  GOLIAD RD, San Antonio, 78223  |
|102  PALFREY ST W, San Antonio, 78223|
|114  LA GARDE ST, San Antonio, 78223 |
|734  CLEARVIEW DR, San Antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [28]:
df = df.withColumn('request_address', trim(lower(df.request_address)))
df.select('request_address').show(5, False)

+-------------------------------------+
|request_address                      |
+-------------------------------------+
|2315  el paso st, san antonio, 78207 |
|2215  goliad rd, san antonio, 78223  |
|102  palfrey st w, san antonio, 78223|
|114  la garde st, san antonio, 78223 |
|734  clearview dr, san antonio, 78228|
+-------------------------------------+
only showing top 5 rows



In [29]:
# convert the number of days a case is late to a number of weeks
df = df.withColumn('num_weeks_late', expr('num_days_late/7'))

df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



In [30]:
# use format_string function to pad zeros for council_district
#%03d is 3 digits
df = df.withColumn('council_district', format_string('%03d', col('council_district').cast('int')))

In [31]:
df.select('council_district').show(5)

+----------------+
|council_district|
+----------------+
|             005|
|             003|
|             003|
|             003|
|             007|
+----------------+
only showing top 5 rows



<hr style="border:2px solid black"> </hr>

## Step 4: Create New Features

In [32]:
#create a new column for zipcode:
#extract the last five digits from address using regexp
df = df.withColumn('zipcode', regexp_extract('request_address', r"(\d+$)", 1))

df.select('zipcode').show(5)

+-------+
|zipcode|
+-------+
|  78207|
|  78223|
|  78223|
|  78223|
|  78228|
+-------+
only showing top 5 rows



In [33]:
df.show(1, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode

### Create New Columns

- case_age: How old the case is; the difference in days between when the case was opened and the current day
- days_to_closed: The number of days between when the case was opened and when it was closed
- case_lifetime: Number of days between when the case was opened and when it was closed, if the case is still open, the number of days since the case was opened

In [34]:
#create three new columns 'case_age', 'days_to_closed', 'case_lifetime'
#datediff- takes two dates and gives back difference

df = (
    df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

In [35]:
df.show(1, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode

<hr style="border:2px solid black"> </hr>

In [37]:
# read the dept.csv file:

dept = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept.show(5, truncate = False, vertical = True)

-RECORD 0-----------------------------------------------
 dept_division          | 311 Call Center               
 dept_name              | Customer Service              
 standardized_dept_name | Customer Service              
 dept_subject_to_SLA    | YES                           
-RECORD 1-----------------------------------------------
 dept_division          | Brush                         
 dept_name              | Solid Waste Management        
 standardized_dept_name | Solid Waste                   
 dept_subject_to_SLA    | YES                           
-RECORD 2-----------------------------------------------
 dept_division          | Clean and Green               
 dept_name              | Parks and Recreation          
 standardized_dept_name | Parks & Recreation            
 dept_subject_to_SLA    | YES                           
-RECORD 3-----------------------------------------------
 dept_division          | Clean and Green Natural Areas 
 dept_name              | Parks

#### Foreign Key between dept and source DF appears to be 'dept_division'

In [38]:
# join the df and dept dataframe using 'dept_division' as common key
# drop columns as needed (keep standardized_dept_name)
# convert dept_subject_to_SLA to boolean

df = (
    df
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    .drop(df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

In [39]:
df.show(1, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 2018-01-01 00:42:00                  
 case_closed_date     | 2018-01-01 12:29:00                  
 case_due_date        | 2020-09-26 00:42:00                  
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  el paso st, san antonio, 78207 
 council_district     | 005                                  
 num_weeks_late       | -142.6441088                         
 zipcode              | 78207                                
 case_ag