In [1]:
import pyspark
from pyspark.sql.functions import *
spark = pyspark.sql.SparkSession.builder.getOrCreate()

22/05/23 11:15:12 WARN Utils: Your hostname, Brandyns-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.3.168 instead (on interface en0)
22/05/23 11:15:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/23 11:15:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Read the case, department, and source data into their own spark dataframes.

In [2]:
case = spark.read.csv('case.csv', sep=",", header=True, inferSchema=True)

                                                                                

In [3]:
department = spark.read.csv('dept.csv', sep=",", header=True, inferSchema=True)

In [4]:
source = spark.read.csv('source.csv', sep=",", header=True, inferSchema=True)

Store the source data in both csv and json format

In [5]:
source.write.json('source_json', mode='overwrite')
source.write.csv('source_csv', mode='overwrite')

Inspect the folder structure, what do you notice?

- The above are not files but directories

Inspect the data in the dataframes, are the data types appropriate? Write code necessary to cast the values to the appropriate types

In [6]:
# Showing the first row of case
case.show(1, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
only showing top 1 row



In [7]:
# Showing case dtypes
case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

In [8]:
# Rename SLA_due_date column to better match other date-type columns
case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

In [9]:
# Columns case_closed and case_late store yes/no values, they are currently strings and should be converted to boolean
# Confirming they are just yes/no
case.groupby('case_closed', 'case_late').count().show()

[Stage 9:>                                                          (0 + 8) / 8]

+-----------+---------+------+
|case_closed|case_late| count|
+-----------+---------+------+
|         NO|      YES|  6525|
|        YES|      YES| 87978|
|         NO|       NO| 11585|
|        YES|       NO|735616|
+-----------+---------+------+



                                                                                

In [10]:
# Changing case_closed and case_late to boolean
case = (case.withColumn("case_closed", expr('case_closed == "Yes"')).withColumn("case_late", 
                                                                               expr('case_late == "Yes"')))

In [13]:
# Changing council_district to string
case = case.withColumn("council_district", col('council_district').cast('string'))

In [15]:
# Dealing with timestamp changes
# Establishing our date format
fmt = "M/d/yy H:mm"

In [16]:
case = (
case.withColumn('case_opened_date', to_timestamp('case_opened_date', fmt))
    .withColumn('case_closed_date', to_timestamp('case_closed_date', fmt))
    .withColumn('case_due_date', to_timestamp('case_due_date', fmt))

)

In [18]:
# Cleaning some text data
case = case.withColumn('request_address', lower(trim(col('request_address'))))
# Extracting zip code
case = case.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

In [19]:
# Create a 'case_lifetime' feature
case = (
case.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

In [20]:
# Join departments and sources
case = case.join(department, 'dept_division', 'left').join(source, 'source_id', 'left')

In [24]:
# Provide ability to query with Spark SQL
case.createOrReplaceTempView('case')

1. How old is the latest currently open issue? How long has the oldest currently opened issue been open?

In [25]:
spark.sql('''
SELECT DATEDIFF(current_timestamp, case_due_date) AS days_past_due
FROM case
WHERE NOT case_closed
ORDER BY days_past_due DESC
LIMIT 15
''').show()

[Stage 19:>                                                         (0 + 8) / 8]

+-------------+
|days_past_due|
+-------------+
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
|         1968|
+-------------+



                                                                                

In [27]:
open_cases = case.filter(~col('case_closed'))

In [29]:
case.select(max('case_lifetime')).show()

[Stage 23:>                                                         (0 + 8) / 8]

+------------------+
|max(case_lifetime)|
+------------------+
|              1968|
+------------------+



                                                                                

2. How many stray animal cases are there?

In [30]:
case.filter(case.service_request_type == 'Stray Animal').count()

                                                                                

27361

In [31]:
(
case.groupby('service_request_type')
    .count()
    .filter(expr('service_request_type == "Stray Animal"'))
    .show()

)

[Stage 33:>                                                         (0 + 8) / 8]

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|        Stray Animal|27361|
+--------------------+-----+



                                                                                

3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [32]:
field_ops = case.filter(col('dept_division') == 'Field Operations')

In [33]:
field_ops.filter(col('service_request_type')  != 'Officer Standby').count()

                                                                                

116295

In [34]:
(
case.filter(case.dept_division == 'Field Operations')
    .filter(case.service_request_type != 'Officer Standby')
    .count()
)

                                                                                

116295

In [35]:
(
case.filter(expr("dept_division == 'Field Operations'"))
    .groupby('service_request_type')
    .count()
    .show(truncate=False)
)

[Stage 48:>                                                         (0 + 8) / 8]

+-------------------------------+-----+
|service_request_type           |count|
+-------------------------------+-----+
|Animal Permits Request         |3083 |
|Injured Animal(Critical)       |9779 |
|Officer Standby                |3067 |
|Animal Bite(Non-Critical)      |4783 |
|Stray Animal                   |27361|
|Trapped/Confined Animal        |11605|
|City Council Animal Request    |365  |
|Aggressive Animal(Non-Critical)|25492|
|Animal Bite(Critical)          |717  |
|Public Nuisance(Own Animal)    |10969|
|Aggressive Animal(Critical)    |5280 |
|Animal Cruelty(Critical)       |3009 |
|Animal Neglect                 |13851|
|Spay/Neuter Request Response   |1    |
+-------------------------------+-----+



                                                                                

4. Convert council_district column to a string column
- Already done

5. Extract the year from the case_closed_date column

In [38]:
case.select('case_closed_date', year('case_closed_date')).show(5)

+-------------------+----------------------+
|   case_closed_date|year(case_closed_date)|
+-------------------+----------------------+
|2018-01-01 12:29:00|                  2018|
|2018-01-03 08:11:00|                  2018|
|2018-01-02 07:57:00|                  2018|
|2018-01-02 08:13:00|                  2018|
|2018-01-01 13:29:00|                  2018|
+-------------------+----------------------+
only showing top 5 rows



In [39]:
case = case.withColumn('year_closed', year('case_closed_date'))

6. Convert num_days_late from days to hours in new columns num_hours_late

In [40]:
(
case.withColumn('num_hours_late', case.num_days_late * 24)
.select('num_days_late', 'num_hours_late')
.show()
)

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
|       -29.74398148| -713.8555555199999|
|       -14.70673611|      -352.96166664|
|       -14.70662037|      -352.95888888|
|       -14.70662037|      -352.95888888|
|       -14.70649306|      -352.95583344|
|       -14.70649306|      -352.95583344|
|       -14.70636574|      -352.95277776|
|          -14.70625|-352.95000000000005|
|       -14.70636574|      -352.95277776|
|       -14.70623843|-352.94972232000003|
|-14.705891199999998|-352.94138879999997|
|       -14.70600694|      -352.94416656|
|       -14.70576389|      -352.93833336|
|       -14.70576389|      -352.93833336|
|       -14.70564815|       -352.9355556|
+-------------------+-------------

In [41]:
case = case.withColumn('num_hours_late', col('num_days_late')*24)

7. Join the case data with the source and department data
- This was done previously

8. Are there any cases that do not have a request source?

In [43]:
case.filter(col('source_id').isNull()
).count()

                                                                                

0

9. What are the top 10 service request types in terms of number of requests?

In [44]:
(
case.groupby('service_request_type')
    .count()
    .sort(col('count').desc())
    .show(10, truncate=False)
)

[Stage 64:>                                                         (0 + 8) / 8]

+--------------------------------+-----+
|service_request_type            |count|
+--------------------------------+-----+
|No Pickup                       |89210|
|Overgrown Yard/Trash            |66403|
|Bandit Signs                    |32968|
|Damaged Cart                    |31163|
|Front Or Side Yard Parking      |28920|
|Stray Animal                    |27361|
|Aggressive Animal(Non-Critical) |25492|
|Cart Exchange Request           |22608|
|Junk Vehicle On Private Property|21649|
|Pot Hole Repair                 |20827|
+--------------------------------+-----+
only showing top 10 rows



                                                                                

10. What are the top 10 service request types in terms of average days late?

In [45]:
(
case.groupby('service_request_type')
    .agg(mean('num_days_late'))
    .sort(col('avg(num_days_late)').desc())
    .show(10, truncate=False)
)

[Stage 69:>                                                         (0 + 8) / 8]

+--------------------------------------+------------------+
|service_request_type                  |avg(num_days_late)|
+--------------------------------------+------------------+
|Zoning: Junk Yards                    |175.95636210420943|
|Labeling for Used Mattress            |162.43032902285717|
|Record Keeping of Used Mattresses     |153.99724039428568|
|Signage Requied for Sale of Used Mattr|151.63868055333333|
|Storage of Used Mattress              |142.11255641500003|
|Zoning: Recycle Yard                  |135.92851612479797|
|Donation Container Enforcement        |131.75610506358706|
|License Requied Used Mattress Sales   |128.79828704142858|
|Traffic Signal Graffiti               |101.79846062200002|
|Complaint                             |72.8705023031169  |
+--------------------------------------+------------------+
only showing top 10 rows



                                                                                

11. Does number of days late depend on department?