## Wrangle Data in Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
 
spark = SparkSession.builder.getOrCreate()

import numpy as np
import pandas as pd

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/20 13:42:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


### Data Acquisition

These exercises should go in a notebook or script named wrangle. Add, commit, and push your changes.

This exercises uses the case.csv, dept.csv, and source.csv files from the san antonio 311 call dataset.

1. Read the case, department, and source data into their own spark dataframes.
 
2. Let's see how writing to the local disk works in spark:

  - Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
  - Inspect your folder structure. What do you notice?
  
3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

***

In [2]:
# Read the case, department, and source data into their own spark dataframes.
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)

dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)

                                                                                

In [4]:
# write the code necessary to store the source data in both csv and json format, 
# store these as sources_csv and sources_json

# write to csv
source.write.csv("sources_csv", mode="overwrite")

# write to json
source.write.json("sources_json", mode="overwrite")

In [5]:
# Inspect the data in your dataframes. Are the data types appropriate? 
# Write the code necessary to cast the values to the appropriate types.

source.dtypes

[('source_id', 'string'), ('source_username', 'string')]

In [6]:
source.show(5)
# appropriate dtypes

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
|   106463| Richard Sanchez|
|   119403|  Betty De Hoyos|
|   119555|  Socorro Quiara|
+---------+----------------+
only showing top 5 rows



In [8]:
print(dept.dtypes)
dept.show(5)
# all are technically appropriate. However, there would be more usefulness in converting
# the final column to a boolean type. 

[('dept_division', 'string'), ('dept_name', 'string'), ('standardized_dept_name', 'string'), ('dept_subject_to_SLA', 'string')]
+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [9]:
# string to boolean conversion
dept = dept.withColumn('dept_subject_to_SLA', expr('dept_subject_to_SLA == "YES"'))

dept.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|               true|
|               Brush|Solid Waste Manag...|           Solid Waste|               true|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|               true|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|               true|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|               true|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



In [14]:
def display(spark_df, rows=5):
    '''takes in a spark dataframe and displays a given number of rows using pandas formatting (for convenience)'''
    return spark_df.limit(rows).toPandas()

In [15]:
print(case.dtypes)
display(case)

[('case_id', 'int'), ('case_opened_date', 'string'), ('case_closed_date', 'string'), ('SLA_due_date', 'string'), ('case_late', 'string'), ('num_days_late', 'double'), ('case_closed', 'string'), ('dept_division', 'string'), ('service_request_type', 'string'), ('SLA_days', 'double'), ('case_status', 'string'), ('source_id', 'string'), ('request_address', 'string'), ('council_district', 'int')]


Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [3]:
# change case closed and case late to bool values
case = case.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

case.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [4]:
# convert council_district to a string since it's a unique identifier.
case = case.withColumn("council_district", col("council_district").cast("string"))
# same with case_id
case = case.withColumn("case_id", col("case_id").cast("string"))

In [9]:
##print("--- Before handling dates")
##case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)
#
#fmt = "M/d/yy H:mm"
#case = (case.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt)).withColumn\
#      ("case_closed_date", to_timestamp("case_closed_date", fmt)).withColumn\
#      ("case_due_date", to_timestamp("case_due_date", fmt))
#)
#
#print("--- After")
#case.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

AnalysisException: cannot resolve 'case_due_date' given input columns: [SLA_days, SLA_due_date, case_closed, case_closed_date, case_id, case_late, case_opened_date, case_status, council_district, dept_division, num_days_late, request_address, service_request_type, source_id];
'Project [case_id#146, case_opened_date#161, case_closed_date#176, SLA_due_date#19, case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#131, to_timestamp('case_due_date, Some(M/d/yy H:mm)) AS case_due_date#191]
+- Project [case_id#146, case_opened_date#161, to_timestamp('case_closed_date, Some(M/d/yy H:mm)) AS case_closed_date#176, SLA_due_date#19, case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#131]
   +- Project [case_id#146, to_timestamp('case_opened_date, Some(M/d/yy H:mm)) AS case_opened_date#161, case_closed_date#18, SLA_due_date#19, case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#131]
      +- Project [cast(case_id#16 as string) AS case_id#146, case_opened_date#17, case_closed_date#18, SLA_due_date#19, case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#131]
         +- Project [case_id#16, case_opened_date#17, case_closed_date#18, SLA_due_date#19, case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, cast(council_district#29 as string) AS council_district#131]
            +- Project [case_id#16, case_opened_date#17, case_closed_date#18, SLA_due_date#19, (case_late#20 = YES) AS case_late#103, num_days_late#21, case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#29]
               +- Project [case_id#16, case_opened_date#17, case_closed_date#18, SLA_due_date#19, case_late#20, num_days_late#21, (case_closed#22 = YES) AS case_closed#88, dept_division#23, service_request_type#24, SLA_days#25, case_status#26, source_id#27, request_address#28, council_district#29]
                  +- Relation [case_id#16,case_opened_date#17,case_closed_date#18,SLA_due_date#19,case_late#20,num_days_late#21,case_closed#22,dept_division#23,service_request_type#24,SLA_days#25,case_status#26,source_id#27,request_address#28,council_district#29] csv


***
1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?
1. How many Stray Animal cases are there?
1. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?
1. Convert the council_district column to a string column.
1. Extract the year from the case_closed_date column.
1. Convert num_days_late from days to hours in new columns num_hours_late.
1. Join the case data with the source and department data.
1. Are there any cases that do not have a request source?
1. What are the top 10 service request types in terms of number of requests?
1. What are the top 10 service request types in terms of average days late?
1. Does number of days late depend on department?
1. How do number of days late depend on department and request type?

In [None]:
# 1. How old is the latest (in terms of days past SLA) currently open issue? 
#.  How long has the oldest (in terms of days since opened) currently opened issue been open?

In [10]:
# 2. How many Stray Animal cases are there?
# total cases of stray animal
case.filter(case.service_request_type == 'Stray Animal').count()

                                                                                

26760

In [11]:
# 3. How many service requests that are assigned to the Field Operations department (dept_division)
#.   are not classified as "Officer Standby" request type (service_request_type)?
case.filter(case.dept_division == "Field Operations")\
.where(case.service_request_type != 'Officer Standby').count()

                                                                                

113902

In [12]:
# 4. Convert the council_district column to a string column.
case = case.withColumn('council_district', col('council_district').cast('string'))

In [13]:
case.dtypes

[('case_id', 'string'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'boolean'),
 ('num_days_late', 'double'),
 ('case_closed', 'boolean'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'string')]

In [14]:
# 5. Extract the year from the case_closed_date column.
case = case.withColumn("case_closed_year", year("case_closed_date"))

case.show(2, False, True)

-RECORD 0----------------------------------------------------
 case_id              | 1014127332                           
 case_opened_date     | 1/1/18 0:42                          
 case_closed_date     | 1/1/18 12:29                         
 SLA_due_date         | 9/26/20 0:42                         
 case_late            | false                                
 num_days_late        | -998.5087616000001                   
 case_closed          | true                                 
 dept_division        | Field Operations                     
 service_request_type | Stray Animal                         
 SLA_days             | 999.0                                
 case_status          | Closed                               
 source_id            | svcCRMLS                             
 request_address      | 2315  EL PASO ST, San Antonio, 78207 
 council_district     | 5                                    
 case_closed_year     | null                                 
-RECORD 

In [15]:
# 6. Convert num_days_late from days to hours in new columns num_hours_late.
case = case.withColumn('num_hours_late', expr('num_days_late*24'))

case.select("num_days_late", "num_hours_late").show(5)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
+-------------------+-------------------+
only showing top 5 rows



In [16]:
# 7. Join the case data with the source and department data.
# join the case data with the source and department data.
df = (
    case
    # left join on dept_division
    .join(dept, "dept_division", "left")
    # drop  the columns : dept_division, dept_name  standardized name, as it has much fewer unique values
    .drop(dept.dept_division)
    .drop(dept.dept_name)
    #rename standardized name,  
    .withColumnRenamed("standardized_dept_name", "department")
)

(
    case.join(source, "source_id", "left")
    .sort(col("source_username"))
    .show(5, vertical=True)
)

df.show(1, False, True)

                                                                                

-RECORD 0------------------------------------
 source_id            | af26445              
 case_id              | 1014220226           
 case_opened_date     | 2/5/18 15:07         
 case_closed_date     | 2/5/18 17:59         
 SLA_due_date         | 6/13/18 15:07        
 case_late            | false                
 num_days_late        | -127.88046299999999  
 case_closed          | true                 
 dept_division        | Streets              
 service_request_type | Base/Pavement Repair 
 SLA_days             | 128.0                
 case_status          | Closed               
 request_address      | BAYWATER DR and L... 
 council_district     | 7                    
 case_closed_year     | null                 
 num_hours_late       | -3069.131112         
 source_username      | Alex Franklin        
-RECORD 1------------------------------------
 source_id            | af26445              
 case_id              | 1014218339           
 case_opened_date     | 2/5/18 9:1

In [17]:
# 8. Are there any cases that do not have a request source?
df.where(df.source_id.isNull()).show(3, False, True)

(0 rows)



In [19]:
# 9. What are the top 10 service request types in terms of number of requests?
df.groupBy("service_request_type").count().orderBy(desc("count")).show(10, truncate= False)

                                                                                

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |86855|
|Overgrown Yard/Trash            |65895|
|Bandit Signs                    |32910|
|Damaged Cart                    |30338|
|Front Or Side Yard Parking      |28794|
|Stray Animal                    |26760|
|Aggressive Animal(Non-Critical) |24882|
|Cart Exchange Request           |22024|
|Junk Vehicle On Private Property|21473|
|Pot Hole Repair                 |20616|
+--------------------------------+-----+
only showing top 10 rows



In [20]:
# 10. What are the top 10 service request types in terms of average days late
(
df.where('case_late')# just the rows where case_late == true
.groupBy("service_request_type").mean("num_days_late").orderBy(desc('avg(num_days_late)')).show(10, truncate= False)
)

[Stage 32:>                                                         (0 + 8) / 8]

+--------------------------------------+------------------+
|service_request_type                  |avg(num_days_late)|
+--------------------------------------+------------------+
|Zoning: Recycle Yard                  |210.89201994318182|
|Zoning: Junk Yards                    |200.20517608494276|
|Structure/Housing Maintenance         |190.20707698509807|
|Donation Container Enforcement        |171.09115313942615|
|Storage of Used Mattress              |163.96812829714287|
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Traffic Signal Graffiti               |137.64583330000002|
|License Requied Used Mattress Sales   |128.79828704142858|
+--------------------------------------+------------------+
only showing top 10 rows



                                                                                

In [21]:
# 11. Does number of days late depend on department?
df.where('case_late').groupBy("department").agg(mean("num_days_late")).sort('avg(num_days_late)').show(truncate=False)

[Stage 36:>                                                         (0 + 8) / 8]

+------------------------+------------------+
|department              |avg(num_days_late)|
+------------------------+------------------+
|Metro Health            |6.5438133155476494|
|Solid Waste             |7.186821906120899 |
|Trans & Cap Improvements|10.603064680316946|
|Parks & Recreation      |22.348910457867518|
|Animal Care Services    |23.458633245820124|
|DSD/Code Enforcement    |49.38428705358908 |
|Customer Service        |87.68385942150394 |
+------------------------+------------------+



                                                                                

In [22]:
df.where('case_late').groupBy("department").count().show(10)

+--------------------+-----+
|          department|count|
+--------------------+-----+
|         Solid Waste|32945|
|Animal Care Services|23276|
|Trans & Cap Impro...| 5411|
|  Parks & Recreation| 3797|
|    Customer Service| 2010|
|        Metro Health|  829|
|DSD/Code Enforcement|26235|
+--------------------+-----+



                                                                                

In [23]:
(df.filter('case_late')
 .groupby('department')
 .agg(mean('num_days_late')
      .alias('days_late'), count('num_days_late')
      .alias('n_cases_late'))
 .sort('days_late')
 .withColumn('days_late', round(col('days_late'), 1))
 .show(truncate=False)
)

                                                                                

+------------------------+---------+------------+
|department              |days_late|n_cases_late|
+------------------------+---------+------------+
|Metro Health            |6.5      |829         |
|Solid Waste             |7.2      |32945       |
|Trans & Cap Improvements|10.6     |5411        |
|Parks & Recreation      |22.3     |3797        |
|Animal Care Services    |23.5     |23276       |
|DSD/Code Enforcement    |49.4     |26235       |
|Customer Service        |87.7     |2010        |
+------------------------+---------+------------+



In [25]:
# 12. How do number of days late depend on department and request type?
(df.filter("case_closed")
 .groupBy("department", "service_request_type")
 .mean("num_days_late")
 .orderBy(desc('avg(num_days_late)'))
 .show(50, truncate= False)
)



+------------------------+----------------------------------------+------------------+
|department              |service_request_type                    |avg(num_days_late)|
+------------------------+----------------------------------------+------------------+
|DSD/Code Enforcement    |Zoning: Junk Yards                      |209.39675114281033|
|DSD/Code Enforcement    |Labeling for Used Mattress              |162.43032902285717|
|DSD/Code Enforcement    |Record Keeping of Used Mattresses       |153.99724039428568|
|DSD/Code Enforcement    |Signage Requied for Sale of Used Mattr  |151.63868055333333|
|DSD/Code Enforcement    |Storage of Used Mattress                |142.11255641500003|
|DSD/Code Enforcement    |Donation Container Enforcement          |141.27462658777188|
|DSD/Code Enforcement    |Zoning: Recycle Yard                    |138.9798982976596 |
|DSD/Code Enforcement    |License Requied Used Mattress Sales     |128.79828704142858|
|DSD/Code Enforcement    |Vendors          

                                                                                