In [1]:
# To get rid of those blocks of red warnings
import warnings
warnings.filterwarnings("ignore")

# Standard Imports
import numpy as np
from scipy import stats
import pandas as pd
import os
from scipy.stats import spearmanr
from sklearn import metrics
from random import randint
from typing import Dict, List, Optional, Union, cast
from time import sleep
import pyspark
from pydataset import data
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Vis Imports
import matplotlib.pyplot as plt
import seaborn as sns
from wordcloud import WordCloud

# Modeling Imports
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression, LassoLars, TweedieRegressor
from sklearn.metrics import mean_squared_error, r2_score, explained_variance_score
from sklearn.feature_selection import f_regression 
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import PolynomialFeatures
from sklearn.feature_selection import SelectKBest, f_regression, RFE
import sklearn.preprocessing
import statsmodels.api as sm
from sklearn.cluster import DBSCAN
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report, accuracy_score
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.naive_bayes import MultinomialNB

# NLP Imports
import unicodedata
import re
import json
import requests
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize.toktok import ToktokTokenizer
from nltk.corpus import stopwords

# Custom Module Imports
import env

## This exercises use the cases, dept, and source tables from the 311_data on the Codeup MySQL server.

- Read the case, department, and source data into their own spark dataframes.

- Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?
- Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

- How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?
- How many Stray Animal cases are there?
- How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

- Convert the council_district column to a string column.

- Extract the year from the case_closed_date column.
- Convert num_days_late from days to hours in new columns num_hours_late.

- Join the case data with the source and department data.

- Are there any cases that do not have a request source?

- What are the top 10 service request types in terms of number of requests?

- What are the top 10 service request types in terms of average days late?
- Does number of days late depend on department?
- How do number of days late depend on department and request type?

In [4]:
spark = SparkSession.builder.getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/21 11:36:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/10/21 11:36:54 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
def get_connection(db, user=env.user, host=env.host, password=env.password):
    return f'mysql+pymysql://{user}:{password}@{host}/{db}'

In [5]:
query = """SELECT * FROM cases
JOIN source
USING(`source_id`)
JOIN dept
USING(`dept_division`);
"""
url = get_connection("311_data")
df = pd.read_sql(query, url)
df = spark.createDataFrame(df)

In [8]:
df.show(5, vertical=True, truncate=False)

22/10/21 11:53:37 WARN TaskSetManager: Stage 2 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


[Stage 2:>                                                          (0 + 1) / 1]

22/10/21 11:53:41 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 2 (TID 2): Attempting to kill Python Worker
-RECORD 0----------------------------------------------------------------
 dept_division          | Miscellaneous                                  
 source_id              | 139344                                         
 case_id                | 1014127378                                     
 case_opened_date       | 1/1/18 8:24                                    
 case_closed_date       | 1/2/18 7:37                                    
 SLA_due_date           | 1/3/18 8:24                                    
 case_late              | NO                                             
 num_days_late          | -1.03287037                                    
 case_closed            | YES                                            
 service_request_type   | Dead Animal - Cat                              
 SLA_days               | 2.0                       

                                                                                

In [10]:
df.dtypes

[('dept_division', 'string'),
 ('source_id', 'string'),
 ('case_id', 'bigint'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('request_address', 'string'),
 ('council_district', 'bigint'),
 ('index', 'bigint'),
 ('source_username', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [11]:
# Rename column
df = df.withColumnRenamed('SLA_due_date', 'case_due_date')

# Convert to better data types
df = (
    df.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)

df = df.withColumn('council_district', format_string('%03d', col('council_district')))

fmt = 'M/d/yy H:mm'
df = (
    df.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), fmt))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), fmt))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), fmt))
)


# Cleanup text data
df = df.withColumn('request_address', lower(trim(col('request_address'))))

# Extract zipcode
df = df.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
df = (
    df.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(
        col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

In [13]:
(df.select('case_closed', 'case_lifetime', 'num_days_late')   
    .filter('! case_closed')
    .sort(desc('num_days_late'))
    .where('num_days_late != "nan"')
).show(5)

22/10/24 10:08:41 WARN TaskSetManager: Stage 4 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


[Stage 4:>                                                        (0 + 10) / 10]

+-----------+-------------+------------------+
|case_closed|case_lifetime|     num_days_late|
+-----------+-------------+------------------+
|      false|         2122|       348.6458333|
|      false|         2122|       348.6458333|
|      false|         2121|348.52356480000003|
|      false|         2120|347.58256939999995|
|      false|         2118|       345.3894213|
+-----------+-------------+------------------+
only showing top 5 rows



                                                                                

In [14]:
(df.select('case_closed', 'case_lifetime', 'num_days_late')   
    .filter('! case_closed')
    .sort(desc('num_days_late'))
    .where('num_days_late != "nan"')
).show(5)

22/10/24 10:09:01 WARN TaskSetManager: Stage 5 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.




+-----------+-------------+------------------+
|case_closed|case_lifetime|     num_days_late|
+-----------+-------------+------------------+
|      false|         2122|       348.6458333|
|      false|         2122|       348.6458333|
|      false|         2121|348.52356480000003|
|      false|         2120|347.58256939999995|
|      false|         2118|       345.3894213|
+-----------+-------------+------------------+
only showing top 5 rows



                                                                                

In [15]:
df.filter(expr('service_request_type == "Stray Animal"')).count()


22/10/24 10:20:13 WARN TaskSetManager: Stage 6 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

27361

In [16]:
(df.filter(df.dept_division == 'Field Operations')
    .filter(df.service_request_type != 'Officer Standby')
).count()

22/10/24 10:20:25 WARN TaskSetManager: Stage 9 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

116295

In [17]:
df = df.withColumn('case_closed_year',
             year('case_closed_date'))

In [18]:
df.select('case_closed_date','case_closed_year').show(5)


22/10/24 10:20:47 WARN TaskSetManager: Stage 12 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


[Stage 12:>                                                         (0 + 1) / 1]

22/10/24 10:20:51 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 12 (TID 55): Attempting to kill Python Worker
+-------------------+----------------+
|   case_closed_date|case_closed_year|
+-------------------+----------------+
|2018-01-02 07:37:00|            2018|
|2018-01-01 12:30:00|            2018|
|2018-01-01 10:12:00|            2018|
|2018-01-03 08:54:00|            2018|
|2018-01-01 10:54:00|            2018|
+-------------------+----------------+
only showing top 5 rows



                                                                                

In [19]:
df = df.withColumn('num_hours_late',
             expr('num_days_late * 24'))

In [20]:
df.select('num_days_late', 'num_hours_late').show(5)


22/10/24 10:21:10 WARN TaskSetManager: Stage 13 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


[Stage 13:>                                                         (0 + 1) / 1]

22/10/24 10:21:15 WARN PythonRunner: Detected deadlock while completing task 0.0 in stage 13 (TID 56): Attempting to kill Python Worker
+-------------+--------------+
|num_days_late|num_hours_late|
+-------------+--------------+
|  -1.03287037|  -24.78888888|
| -998.8396875|   -23972.1525|
| -1.928912037| -46.293888888|
| -5.983298611|-143.599166664|
|    -1.899375|       -45.585|
+-------------+--------------+
only showing top 5 rows



                                                                                

In [21]:
df.filter(expr('index is null')).count()


22/10/24 10:21:29 WARN TaskSetManager: Stage 14 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

0

In [23]:
(df.groupby('service_request_type').count()
    .sort(desc('count'))
).show(10)

22/10/24 10:23:03 WARN TaskSetManager: Stage 18 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.




+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|           No Pickup|89210|
|Overgrown Yard/Trash|66403|
|        Bandit Signs|32968|
|        Damaged Cart|31163|
|Front Or Side Yar...|28920|
|        Stray Animal|27361|
|Aggressive Animal...|25492|
|Cart Exchange Req...|22608|
|Junk Vehicle On P...|21649|
|     Pot Hole Repair|20827|
+--------------------+-----+
only showing top 10 rows





In [24]:
(df.groupby('service_request_type')
    .agg(round(mean('num_days_late'), 2).alias('mean_days_late'))
    .sort(desc('mean_days_late'))
    .na.drop()
).show(10, truncate=False)

22/10/24 10:23:10 WARN TaskSetManager: Stage 21 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.




+--------------------------------------+--------------+
|service_request_type                  |mean_days_late|
+--------------------------------------+--------------+
|Zoning: Junk Yards                    |175.96        |
|Labeling for Used Mattress            |162.43        |
|Record Keeping of Used Mattresses     |154.0         |
|Signage Requied for Sale of Used Mattr|151.64        |
|Storage of Used Mattress              |142.11        |
|Zoning: Recycle Yard                  |135.93        |
|Donation Container Enforcement        |131.76        |
|License Requied Used Mattress Sales   |128.8         |
|Traffic Signal Graffiti               |101.8         |
|Complaint                             |72.87         |
+--------------------------------------+--------------+
only showing top 10 rows





In [25]:
(df.groupby('dept_name')
    .mean('num_days_late')
).show()

22/10/24 10:23:53 WARN TaskSetManager: Stage 24 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.




+--------------------+-------------------+
|           dept_name| avg(num_days_late)|
+--------------------+-------------------+
|Animal Care Services|-226.51783940550334|
|Solid Waste Manag...|-2.2000575136721547|
|Development Services| 13.433724555869729|
|Trans & Cap Impro...| -20.61283735405272|
|    Customer Service| 59.737091496300806|
|        Metro Health| -4.911766979607003|
|Parks and Recreation| -5.251521960055171|
|Code Enforcement ...| -38.70133068329586|
|        City Council|                NaN|
|                null|   135.928516124798|
+--------------------+-------------------+





In [26]:
(df.groupby('dept_name', 'service_request_type')
    .mean('num_days_late')
    .sort('service_request_type')
).show()

22/10/24 10:24:11 WARN TaskSetManager: Stage 27 contains a task of very large size (20881 KiB). The maximum recommended task size is 1000 KiB.




+--------------------+--------------------+-------------------+
|           dept_name|service_request_type| avg(num_days_late)|
+--------------------+--------------------+-------------------+
|Solid Waste Manag...|1st Call Bagged L...| -3.868991989858198|
|Solid Waste Manag...|2nd Call Bagged L...| -4.212107162343529|
|Solid Waste Manag...|3rd Call Bagged L...|       -3.152896412|
|Trans & Cap Impro...|ADA Infrastructur...|      -11.841508137|
|Code Enforcement ...|    ADA Obstructions|       -13.22349537|
|Code Enforcement ...|Absentee Property...|-14.231748829519528|
|Trans & Cap Impro...|Accident Problem ...|-22.971030459931214|
|Solid Waste Manag...|Additional Cart R...| -5.063509766847672|
|Solid Waste Manag...|Additional Garbag...| -2.920198210360159|
|Solid Waste Manag...|Additional Organi...| -5.997874052909091|
|Animal Care Services|Aggressive Animal...|  16.69636881189224|
|Animal Care Services|Aggressive Animal...|  2.645033883163267|
|Trans & Cap Impro...|  All-Way Stop Stu

