In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import pandas as pd
from env import get_db_url

spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/09/12 14:15:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/09/12 14:15:24 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/09/12 14:15:24 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
case_df = spark.read.csv("case.csv", header=True, inferSchema=True)
case_df.show(2, vertical=True)

                                                                                

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 1/1/18 0:42          
 case_closed_date     | 1/1/18 12:29         
 SLA_due_date         | 9/26/20 0:42         
 case_late            | NO                   
 num_days_late        | -998.5087616000001   
 case_closed          | YES                  
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 1/1/18 0:46          
 case_closed_date     | 1/3/18 8:11          
 SLA_due_date         | 1/5/18 8:30          
 case_late            | NO                   
 num_days_late        | -2.0126041

In [3]:
dept_df = spark.read.csv("dept.csv", header=True, inferSchema=True)
dept_df.show(2, vertical=True)

-RECORD 0--------------------------------------
 dept_division          | 311 Call Center      
 dept_name              | Customer Service     
 standardized_dept_name | Customer Service     
 dept_subject_to_SLA    | YES                  
-RECORD 1--------------------------------------
 dept_division          | Brush                
 dept_name              | Solid Waste Manag... 
 standardized_dept_name | Solid Waste          
 dept_subject_to_SLA    | YES                  
only showing top 2 rows



In [4]:
source_df = spark.read.csv("source.csv", header=True, inferSchema=True)
source_df.show(2, vertical=True)

-RECORD 0---------------------------
 source_id       | 100137           
 source_username | Merlene Blodgett 
-RECORD 1---------------------------
 source_id       | 103582           
 source_username | Carmen Cura      
only showing top 2 rows



In [6]:
# rename column
case_df = case_df.withColumnRenamed("SLA_due_date", "case_due_date")
# change to string 
case_df = case_df.withColumn("council_district", col("council_district").cast("string"))

In [7]:
print("--- Before handling dates")
case_df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

fmt = "M/d/yy H:mm"
case_df = (
    case_df.withColumn("case_opened_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_closed_date", to_timestamp("case_opened_date", fmt))
    .withColumn("case_due_date", to_timestamp("case_opened_date", fmt))
)

print("--- After")
case_df.select("case_opened_date", "case_closed_date", "case_due_date").show(5)

--- Before handling dates
+----------------+----------------+-------------+
|case_opened_date|case_closed_date|case_due_date|
+----------------+----------------+-------------+
|     1/1/18 0:42|    1/1/18 12:29| 9/26/20 0:42|
|     1/1/18 0:46|     1/3/18 8:11|  1/5/18 8:30|
|     1/1/18 0:48|     1/2/18 7:57|  1/5/18 8:30|
|     1/1/18 1:29|     1/2/18 8:13| 1/17/18 8:30|
|     1/1/18 1:34|    1/1/18 13:29|  1/1/18 4:34|
+----------------+----------------+-------------+
only showing top 5 rows

--- After
+-------------------+-------------------+-------------------+
|   case_opened_date|   case_closed_date|      case_due_date|
+-------------------+-------------------+-------------------+
|2018-01-01 00:42:00|2018-01-01 00:42:00|2018-01-01 00:42:00|
|2018-01-01 00:46:00|2018-01-01 00:46:00|2018-01-01 00:46:00|
|2018-01-01 00:48:00|2018-01-01 00:48:00|2018-01-01 00:48:00|
|2018-01-01 01:29:00|2018-01-01 01:29:00|2018-01-01 01:29:00|
|2018-01-01 01:34:00|2018-01-01 01:34:00|2018-01-01 01:

In [8]:
case_df = case_df.withColumn(
    "num_weeks_late", expr("num_days_late / 7 AS num_weeks_late")
)

case_df.select("num_days_late", "num_weeks_late").show(5)

+-------------------+--------------------+
|      num_days_late|      num_weeks_late|
+-------------------+--------------------+
| -998.5087616000001|        -142.6441088|
|-2.0126041669999997|-0.28751488099999994|
|       -3.022337963|-0.43176256614285713|
|       -15.01148148| -2.1444973542857144|
|0.37216435200000003|         0.053166336|
+-------------------+--------------------+
only showing top 5 rows



In [10]:
case_df = case_df.withColumn("case_closed", expr('case_closed == "YES"')).withColumn(
    "case_late", expr('case_late == "YES"')
)

case_df.select("case_closed", "case_late").show(5)

+-----------+---------+
|case_closed|case_late|
+-----------+---------+
|       true|    false|
|       true|    false|
|       true|    false|
|       true|    false|
|       true|     true|
+-----------+---------+
only showing top 5 rows



In [11]:
case_df = (
    case_df.withColumn(
        "case_age", datediff(current_timestamp(), "case_opened_date")
    )
    .withColumn(
        "days_to_closed", datediff("case_closed_date", "case_opened_date")
    )
    .withColumn(
        "case_lifetime",
        when(expr("! case_closed"), col("case_age")).otherwise(
            col("days_to_closed")
        ),
    )
)

case_df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("case_closed")).show(5)

case_df.select(
    "case_closed",
    "case_opened_date",
    "case_closed_date",
    "case_age",
    "days_to_closed",
    "case_lifetime",
).where(expr("! case_closed")).show(5)

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|case_age|days_to_closed|case_lifetime|
+-----------+-------------------+-------------------+--------+--------------+-------------+
|       true|2018-01-01 00:42:00|2018-01-01 00:42:00|    1715|             0|            0|
|       true|2018-01-01 00:46:00|2018-01-01 00:46:00|    1715|             0|            0|
|       true|2018-01-01 00:48:00|2018-01-01 00:48:00|    1715|             0|            0|
|       true|2018-01-01 01:29:00|2018-01-01 01:29:00|    1715|             0|            0|
|       true|2018-01-01 01:34:00|2018-01-01 01:34:00|    1715|             0|            0|
+-----------+-------------------+-------------------+--------+--------------+-------------+
only showing top 5 rows

+-----------+-------------------+-------------------+--------+--------------+-------------+
|case_closed|   case_opened_date|   case_closed_date|ca

In [13]:
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 case_age             | 1715                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
-RECORD 1------------------------------------
 case_id              | 1014127333           
 case_opened_date     | 2018-01-01

In [17]:
from pyspark.sql.functions import year
from pyspark.sql.functions import to_date
 # get year from close date
case_df = case_df.withColumn('birth_year',year(case_df.case_closed_date))
case_df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 dept_division        | Field Operations     
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 case_age             | 1715                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 birth_year           | 2018                 
-RECORD 1------------------------------------
 case_id              | 1014127333

In [None]:
case_df = case_df.withColumn(
    "num_hours_late", expr("num_days_late / 7 AS num_weeks_late")
)

case_df.select("num_days_late", "num_weeks_late").show(5)

In [18]:
df = (
    case_df
    # left join on dept_division
    .join(dept_df, "dept_division", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(dept_df.dept_division)
    .drop(dept_df.dept_name)
    .drop(case_df.dept_division)
    .withColumnRenamed("standardized_dept_name", "department")
    # convert to a boolean
    .withColumn("dept_subject_to_SLA", col("dept_subject_to_SLA") == "YES")
)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 source_id            | svcCRMLS             
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 case_age             | 1715                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 birth_year           | 2018                 
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
-RECORD 1-------------------------

In [20]:
df = (
    df
    # left join on source id
    .join(source_df, "source_id", "left")
    # drop all the columns except for standardized name, as it has much fewer unique values
    .drop(source_df.source_id)

)

df.show(2, vertical=True)

-RECORD 0------------------------------------
 source_id            | svcCRMLS             
 case_id              | 1014127332           
 case_opened_date     | 2018-01-01 00:42:00  
 case_closed_date     | 2018-01-01 00:42:00  
 case_due_date        | 2018-01-01 00:42:00  
 case_late            | false                
 num_days_late        | -998.5087616000001   
 case_closed          | true                 
 service_request_type | Stray Animal         
 SLA_days             | 999.0                
 case_status          | Closed               
 request_address      | 2315  EL PASO ST,... 
 council_district     | 5                    
 num_weeks_late       | -142.6441088         
 case_age             | 1715                 
 days_to_closed       | 0                    
 case_lifetime        | 0                    
 birth_year           | 2018                 
 department           | Animal Care Services 
 dept_subject_to_SLA  | true                 
 source_username      | svcCRMLS  

In [21]:
df.filter('! case_closed')\
    .select('case_opened_date', 'case_due_date', 'num_days_late')\
    .sort(desc('num_days_late'))\
    .show(5)

[Stage 30:>                                                       (0 + 12) / 12]

+-------------------+-------------------+------------------+
|   case_opened_date|      case_due_date|     num_days_late|
+-------------------+-------------------+------------------+
|2017-01-01 13:57:00|2017-01-01 13:57:00|       348.6458333|
|2017-01-01 13:48:00|2017-01-01 13:48:00|       348.6458333|
|2017-01-02 11:26:00|2017-01-02 11:26:00|348.52356480000003|
|2017-01-03 10:01:00|2017-01-03 10:01:00|347.58256939999995|
|2017-01-05 14:39:00|2017-01-05 14:39:00|       345.3894213|
+-------------------+-------------------+------------------+
only showing top 5 rows



[Stage 30:====>                                                   (1 + 11) / 12]                                                                                

In [22]:
df.filter('! case_closed')\
    .select('case_opened_date', 'case_due_date', 'SLA_days')\
    .sort(desc('SLA_days'))\
    .show(5)

+-------------------+-------------------+-----------+
|   case_opened_date|      case_due_date|   SLA_days|
+-------------------+-------------------+-----------+
|2017-09-22 08:27:00|2017-09-22 08:27:00| 1419.00191|
|2017-11-03 14:05:00|2017-11-03 14:05:00|     1419.0|
|2017-11-21 06:57:00|2017-11-21 06:57:00|1416.063981|
|2017-12-03 12:59:00|2017-12-03 12:59:00|1414.813009|
|2017-01-06 08:24:00|2017-01-06 08:24:00|1406.003623|
+-------------------+-------------------+-----------+
only showing top 5 rows



In [23]:
df.groupby('service_request_type')\
    .count()\
    .sort(desc('count'))\
    .show(truncate=False)

[Stage 36:====>                                                   (1 + 11) / 12]

+---------------------------------+-----+
|service_request_type             |count|
+---------------------------------+-----+
|No Pickup                        |89210|
|Overgrown Yard/Trash             |66403|
|Bandit Signs                     |32968|
|Damaged Cart                     |31163|
|Front Or Side Yard Parking       |28920|
|Stray Animal                     |27361|
|Aggressive Animal(Non-Critical)  |25492|
|Cart Exchange Request            |22608|
|Junk Vehicle On Private Property |21649|
|Pot Hole Repair                  |20827|
|Alley-Way Maintenance            |20293|
|Lost/Stolen Cart                 |19299|
|Right Of Way/Sidewalk Obstruction|17836|
|Dead Animal - Dog                |17092|
|Cart Delivery                    |15761|
|Dead Animal - Cat                |15345|
|Animal Neglect                   |13851|
|Dead Animal - Misc               |13535|
|Trapped/Confined Animal          |11605|
|Public Nuisance(Own Animal)      |10969|
+---------------------------------

                                                                                

In [24]:
df.filter(df.service_request_type == lit('Stray Animal')).count()

27361

In [26]:
df.filter(df.department == 'Field Operations')\
    .filter(df.service_request_type != 'Officer Standby')\
    .count()

0

In [27]:
df.withColumn('year', year('case_closed_date'))\
    .select('service_request_type', 'case_closed_date', 'year')\
    .show(5)

+--------------------+-------------------+----+
|service_request_type|   case_closed_date|year|
+--------------------+-------------------+----+
|        Stray Animal|2018-01-01 00:42:00|2018|
|Removal Of Obstru...|2018-01-01 00:46:00|2018|
|Removal Of Obstru...|2018-01-01 00:48:00|2018|
|Front Or Side Yar...|2018-01-01 01:29:00|2018|
|Animal Cruelty(Cr...|2018-01-01 01:34:00|2018|
+--------------------+-------------------+----+
only showing top 5 rows



In [28]:
df.withColumn('num_hours_late', round(expr('num_days_late * 24'), 1))\
    .select('num_days_late', 'num_hours_late')\
    .show(10)

+-------------------+--------------+
|      num_days_late|num_hours_late|
+-------------------+--------------+
| -998.5087616000001|      -23964.2|
|-2.0126041669999997|         -48.3|
|       -3.022337963|         -72.5|
|       -15.01148148|        -360.3|
|0.37216435200000003|           8.9|
|       -29.74398148|        -713.9|
|       -14.70673611|        -353.0|
|       -14.70662037|        -353.0|
|       -14.70662037|        -353.0|
|       -14.70649306|        -353.0|
+-------------------+--------------+
only showing top 10 rows



In [29]:
df = df.withColumn('num_hours_late', round(expr('num_days_late * 24'), 1))

In [31]:
df.filter(df.source_id == 'null').count()

0

In [32]:
df.groupBy('service_request_type')\
    .count()\
    .sort(desc('count'))\
    .show(10)

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows

