In [None]:
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import *
from pyspark.sql.functions import lit
from pyspark.sql.functions import asc, desc
from pyspark.sql.functions import month, year, quarter
from pyspark.sql.functions import current_date, datediff
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from sqlalchemy import text, create_engine
from env import host, user, password
from pyspark.sql.functions import avg

# The SparkSession is where you would specify the JDBC driver and additional connection details.
# We'll use pd.read_sql to simplify here so we can focus on the Spark API and not the IT setup.
# When using Spark on the job, you'll work with the operations team to install the right Java drivers and configure your connection
spark = SparkSession.builder.getOrCreate()

from pydataset import data

# Spark Wrangle Exercises

## 1. Read the case, department, and source data into their own spark dataframes.



In [6]:
def get_db_url(database, host=host, user=user, password=password):
    return f'mysql+pymysql://{user}:{password}@{host}/{database}'

In [14]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
    .appName("311 Data Assignment") \
    .getOrCreate()


23/07/05 13:55:22 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [16]:
query = """SELECT * FROM cases"""
url = get_db_url("311_data")
engine = create_engine(url)
df = pd.read_sql(text(query), engine.connect())
df = spark.createDataFrame(df)


In [18]:
df.show()

23/07/05 13:59:36 WARN TaskSetManager: Stage 1 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 13:59:41 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 1 (TID 1): Attempting to kill Python Worker
                                                                                

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO|       -998.5087616|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|       -2.012604167|        YES|     Storm Water|Remova

In [23]:
query = """SELECT * FROM dept"""
url = get_db_url("311_data")
engine = create_engine(url)
dept_df = pd.read_sql(text(query), engine.connect())
dept_df = spark.createDataFrame(dept_df)


In [25]:
dept_df.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

In [26]:
query = """SELECT * FROM source"""
url = get_db_url("311_data")
engine = create_engine(url)
source_df = pd.read_sql(text(query), engine.connect())
source_df = spark.createDataFrame(source_df)

In [30]:
source_df.show()

+-----+---------+--------------------+
|index|source_id|     source_username|
+-----+---------+--------------------+
|    0|   100137|    Merlene Blodgett|
|    1|   103582|         Carmen Cura|
|    2|   106463|     Richard Sanchez|
|    3|   119403|      Betty De Hoyos|
|    4|   119555|      Socorro Quiara|
|    5|   119868| Michelle San Miguel|
|    6|   120752|      Eva T. Kleiber|
|    7|   124405|           Lori Lara|
|    8|   132408|       Leonard Silva|
|    9|   135723|        Amy Cardenas|
|   10|   136202|    Michelle Urrutia|
|   11|   136979|      Leticia Garcia|
|   12|   137943|    Pamela K. Baccus|
|   13|   138605|        Marisa Ozuna|
|   14|   138650|      Kimberly Green|
|   15|   138650|Kimberly Green-Woods|
|   16|   138793| Guadalupe Rodriguez|
|   17|   138810|       Tawona Martin|
|   18|   139342|     Jessica Mendoza|
|   19|   139344|        Isis Mendoza|
+-----+---------+--------------------+
only showing top 20 rows



## 2. Let's see how writing to the local disk works in spark:

* Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
* Inspect your folder structure. What do you notice?


In [31]:
# Write the 'source' DataFrame to CSV format
source_df.write.csv("sources_csv", mode="overwrite", header=True)

# Write the 'source' DataFrame to JSON format
source_df.write.json("sources_json", mode="overwrite")


                                                                                

#It added 2 new folders with contents

## 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [33]:
# Inspect folder structure
import os
folder_contents = os.listdir(".")
print(folder_contents)

# Inspect DataFrame data types
df.printSchema()
dept_df.printSchema()
source_df.printSchema()


['sources_csv', 'spark101.ipynb', '.DS_Store', 'env.py', 'wrangle-exercise.ipynb', '__pycache__', 'sources_json', 'README.md', '.gitignore', '.ipynb_checkpoints', '.git']
root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: long (nullable = true)

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)

root
 

In [34]:
from pyspark.sql.functions import col
from pyspark.sql.types import DateType

# Cast columns to appropriate data types
df = df.withColumn("case_opened_date", col("case_opened_date").cast(DateType()))
df = df.withColumn("case_closed_date", col("case_closed_date").cast(DateType()))
df = df.withColumn("SLA_due_date", col("SLA_due_date").cast(DateType()))
df = df.withColumn("case_late", col("case_late").cast("boolean"))
df = df.withColumn("case_closed", col("case_closed").cast("boolean"))
df = df.withColumn("council_district", col("council_district").cast("integer"))

# Print the updated schema
df.printSchema()


root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: date (nullable = true)
 |-- case_closed_date: date (nullable = true)
 |-- SLA_due_date: date (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



## 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [37]:
from pyspark.sql.functions import current_date, datediff

# Filter the DataFrame for currently open issues
open_issues_df = df.filter(df.case_closed == False)

In [38]:
# Calculate the age of the latest (in terms of days past SLA) currently open issue
latest_age = open_issues_df.select(datediff(current_date(), "SLA_due_date").alias("age")).orderBy(col("age").desc()).first()[0]

23/07/05 15:07:53 WARN TaskSetManager: Stage 11 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [39]:
# Calculate the age of the oldest (in terms of days since opened) currently open issue
oldest_age = open_issues_df.select(datediff(current_date(), "case_opened_date").alias("age")).orderBy(col("age")).first()[0]

print("The latest currently open issue is", latest_age, "days past SLA.")
print("The oldest currently open issue has been open for", oldest_age, "days.")

23/07/05 15:08:20 WARN TaskSetManager: Stage 12 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.

The latest currently open issue is None days past SLA.
The oldest currently open issue has been open for None days.


                                                                                

## 2. How many Stray Animal cases are there?

In [40]:
# Filter the DataFrame for Stray Animal cases
stray_animal_cases = df.filter(col("service_request_type") == "Stray Animal")

# Count the number of Stray Animal cases
stray_animal_case_count = stray_animal_cases.count()

print("The number of Stray Animal cases is:", stray_animal_case_count)


23/07/05 15:10:07 WARN TaskSetManager: Stage 13 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.

The number of Stray Animal cases is: 26760


                                                                                

## 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [41]:
# Filter the DataFrame for service requests assigned to Field Operations department
field_operations_requests = df.filter(col("dept_division") == "Field Operations")

# Filter the Field Operations requests that are not classified as "Officer Standby"
non_officer_standby_requests = field_operations_requests.filter(col("service_request_type") != "Officer Standby")

# Count the number of non-Officer Standby requests in Field Operations department
non_officer_standby_count = non_officer_standby_requests.count()

print("The number of service requests in Field Operations department that are not Officer Standby:", non_officer_standby_count)


23/07/05 15:11:52 WARN TaskSetManager: Stage 16 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.

The number of service requests in Field Operations department that are not Officer Standby: 113902


                                                                                

## 4. Convert the council_district column to a string column.

In [42]:
# Convert council_district column to string
df = df.withColumn("council_district", col("council_district").cast("string"))

# Print the schema to verify the column type conversion
df.printSchema()


root
 |-- case_id: long (nullable = true)
 |-- case_opened_date: date (nullable = true)
 |-- case_closed_date: date (nullable = true)
 |-- SLA_due_date: date (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)



## 5. Extract the year from the case_closed_date column.


In [54]:
from pyspark.sql.functions import year

# Extract the year from case_closed_date
df = df.withColumn("closed_year", year("case_closed_date"))

# Show the updated DataFrame with the extracted year
df.show(vertical=True)


23/07/05 15:38:11 WARN TaskSetManager: Stage 95 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 5.0 in stage 95 (TID 435): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 10.0 in stage 95 (TID 440): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 9.0 in stage 95 (TID 439): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 6.0 in stage 95 (TID 436): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 11.0 in stage 95 (TID 441): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunner: Detected deadlock while completing task 8.0 in stage 95 (TID 438): Attempting to kill Python Worker
23/07/05 15:38:16 WARN PythonRunn

-RECORD 0--------------------------------------
 dept_division          | Storm Water          
 source_id              | svcCRMSS             
 case_id                | 1014127333           
 case_opened_date       | null                 
 case_closed_date       | null                 
 SLA_due_date           | null                 
 case_late              | false                
 num_days_late          | -2.012604167         
 case_closed            | true                 
 service_request_type   | Removal Of Obstru... 
 SLA_days               | 4.322222222          
 case_status            | Closed               
 request_address        | 2215  GOLIAD RD, ... 
 council_district       | 3                    
 closed_year            | null                 
 num_hours_late         | -48.302500008        
 index                  | 133                  
 source_username        | svcCRMSS             
 dept_name              | Trans & Cap Impro... 
 standardized_dept_name | Trans & Cap Im

## 6. Convert num_days_late from days to hours in new columns num_hours_late.

In [44]:
# Convert num_days_late to hours
df = df.withColumn("num_hours_late", col("num_days_late") * 24)

# Show the updated DataFrame with the converted column
df.show()


23/07/05 15:15:23 WARN TaskSetManager: Stage 20 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
[Stage 20:>                                                         (0 + 1) / 1]

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+-----------+-------------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|closed_year|     num_hours_late|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+-----------+-------------------+
|1014127332|            null|            null|        null|    false|       -998.5087616|       true|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|       null|-23964.2102783999

23/07/05 15:15:27 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 20 (TID 101): Attempting to kill Python Worker
                                                                                

In [None]:
## 7. Join the case data with the source and department data.

In [45]:
# Join case data with source data
df = df.join(source_df, "source_id", "left")

# Join case data with department data
df = df.join(dept_df, "dept_division", "left")

# Show the joined DataFrame
df.show()


23/07/05 15:18:07 WARN TaskSetManager: Stage 21 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 8.0 in stage 21 (TID 110): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 4.0 in stage 21 (TID 106): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 9.0 in stage 21 (TID 111): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 1.0 in stage 21 (TID 103): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 5.0 in stage 21 (TID 107): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunner: Detected deadlock while completing task 11.0 in stage 21 (TID 113): Attempting to kill Python Worker
23/07/05 15:18:11 WARN PythonRunne

+----------------+---------+----------+----------------+----------------+------------+---------+-------------------+-----------+--------------------+------------------+-----------+--------------------+----------------+-----------+-------------------+-----+---------------+--------------------+----------------------+-------------------+
|   dept_division|source_id|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|service_request_type|          SLA_days|case_status|     request_address|council_district|closed_year|     num_hours_late|index|source_username|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+----------------+---------+----------+----------------+----------------+------------+---------+-------------------+-----------+--------------------+------------------+-----------+--------------------+----------------+-----------+-------------------+-----+---------------+--------------------+----------------------+----------

## 8. Are there any cases that do not have a request source?

In [46]:
from pyspark.sql.functions import isnull

# Filter cases with null request source
cases_without_source = df.filter(isnull(df.source_id))

# Check if there are any cases without a request source
if cases_without_source.count() > 0:
    print("There are cases without a request source.")
    cases_without_source.show()
else:
    print("All cases have a request source.")


23/07/05 15:19:44 WARN TaskSetManager: Stage 30 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.

All cases have a request source.


                                                                                

## 9. What are the top 10 service request types in terms of number of requests?

In [47]:
from pyspark.sql.functions import desc

# Group by service_request_type and count the number of requests
top_service_request_types = df.groupBy("service_request_type").count()

# Sort in descending order based on the count
top_service_request_types = top_service_request_types.orderBy(desc("count"))

# Show the top 10 service request types
top_service_request_types.show(10)


23/07/05 15:20:39 WARN TaskSetManager: Stage 37 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows



                                                                                

## 10. What are the top 10 service request types in terms of average days late?


In [48]:
from pyspark.sql.functions import desc

# Group by service_request_type and calculate the average days late
top_service_request_types = df.groupBy("service_request_type").mean("num_days_late")

# Sort in descending order based on the average days late
top_service_request_types = top_service_request_types.orderBy(desc("avg(num_days_late)"))

# Show the top 10 service request types
top_service_request_types.show(10)


23/07/05 15:21:34 WARN TaskSetManager: Stage 49 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
[Stage 57:=====>                                                  (1 + 10) / 11]

+--------------------+------------------+
|service_request_type|avg(num_days_late)|
+--------------------+------------------+
|Request for Resea...|               NaN|
|CCO_Request for R...|               NaN|
|  Zoning: Junk Yards|175.95636210420943|
|Labeling for Used...|162.43032902285717|
|Record Keeping of...| 153.9972403942857|
|Signage Requied f...|151.63868055333333|
|Storage of Used M...|142.11255641500003|
|Zoning: Recycle Yard|  135.928516124798|
|Donation Containe...|131.75610506358706|
|License Requied U...|128.79828704142858|
+--------------------+------------------+
only showing top 10 rows



                                                                                

## 11. Does number of days late depend on department?

In [52]:
# Group by dept_division and calculate the average num_days_late
avg_days_late_by_dept = df.groupBy("dept_division").agg(avg("num_days_late").alias("avg_days_late"))

In [53]:
# Show the average number of days late by department
avg_days_late_by_dept.show()

23/07/05 15:28:50 WARN TaskSetManager: Stage 83 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+--------------------+-------------------+
|       dept_division|      avg_days_late|
+--------------------+-------------------+
|       Miscellaneous|-1.7218576838926687|
|         Solid Waste| 3.5190239198762256|
|    Field Operations| -226.5178394055037|
|    Waste Collection|-2.1706522384798994|
|             Streets|-24.037123211992807|
|Code Enforcement ...|-44.468611946863874|
|              Vector|-1.1206532993223444|
|   Dangerous Premise|-42.234884997542245|
|     311 Call Center| 59.737091496300756|
|               Brush|-3.9857905714570996|
|Dangerous Premise...| -43.20829057269444|
|Traffic Engineeri...| -33.44708001774808|
|Code Enforcement ...| 135.92851612479797|
|          District 2|                NaN|
|             Signals|-28.809523320694538|
|Engineering Division| 13.433724555869716|
|Director's Office...|  37.57064670295008|
|         Storm Water|-14.055678397031896|
|               Shops|   9.64126176872269|
|Storm Water Engin...|-15.075511976006071|
+----------

## 12. How do number of days late depend on department and request type?


In [50]:
from pyspark.sql.functions import avg

# Group by dept_division and service_request_type, and calculate the average num_days_late
avg_days_late_by_dept_request = df.groupBy("dept_division", "service_request_type").agg(avg("num_days_late").alias("avg_days_late"))

# Show the average number of days late by department and request type
avg_days_late_by_dept_request.show()


23/07/05 15:24:57 WARN TaskSetManager: Stage 64 contains a task of very large size (12597 KiB). The maximum recommended task size is 1000 KiB.
[Stage 72:>                                                       (0 + 11) / 11]

+--------------------+--------------------+--------------------+
|       dept_division|service_request_type|       avg_days_late|
+--------------------+--------------------+--------------------+
|    Code Enforcement|Overgrown Yard/Trash|  -46.35747521842478|
|    Shops (Internal)|Major Park Improv...|  -280.2546235360405|
|    Waste Collection|Organics Info / L...|  14.251441689631402|
|Traffic Engineeri...|       Parking Issue|  -15.07585572790985|
|    Code Enforcement|     Permits, Fences| -14.198422204579083|
|             Streets| Rebarb Sticking Out|-0.41349038699999996|
|Traffic Engineeri...|Flashing Beacon N...|  -68.60608196375308|
|    Shops (Internal)|         Playgrounds|  -6.624961567266341|
|    Code Enforcement|Used/Scrap Tire F...|  -71.23635577898261|
|             Signals|Signal Timing Mod...|  -80.29628127803407|
| Food Establishments|      Food Poisoning|  -7.783637740071429|
|    Code Enforcement|Temporary Obstruc...| -119.75801638405595|
|    Field Operations|Agg

                                                                                