# Distributed Machine Learning
## Wrangle Exercises

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/23 09:09:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


1. Read the case, department, and source data into their own spark dataframes.

2. Let's see how writing to the local disk works in spark:

- Write the code necessary to store the source data in both csv and json format, store these as sources_csv and sources_json
- Inspect your folder structure. What do you notice?
3. Inspect the data in your dataframes. 
- Are the data types appropriate? 
- Write the code necessary to cast the values to the appropriate types.
---------------


In [2]:
source = spark.read.csv("source.csv", sep=",", header=True, inferSchema=True)
case = spark.read.csv("case.csv", sep=",", header=True, inferSchema=True)
dept = spark.read.csv("dept.csv", sep=",", header=True, inferSchema=True)

                                                                                

In [3]:
source.show(2)

+---------+----------------+
|source_id| source_username|
+---------+----------------+
|   100137|Merlene Blodgett|
|   103582|     Carmen Cura|
+---------+----------------+
only showing top 2 rows



In [4]:
case.show(1)

+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|     num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO|-998.5087616000001|        YES|Field Operations|        Stray Animal|   999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------

In [5]:
dept.show(2)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 2 rows



In [6]:
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [7]:
source.printSchema()

root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



In [11]:
case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



### Initial prep (case file)

In [14]:
# Rename column
case = case.withColumnRenamed('SLA_due_date', 'case_due_date')

# Convert to better data types
case = (
    case.withColumn('case_late', col('case_late') == 'YES')
    .withColumn('case_closed', col('case_closed') == 'YES')
)
case = case.withColumn('council_district', format_string('%04d', col('council_district')))
case = (
    case.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)

# Cleanup text data
case = case.withColumn('request_address', lower(trim(col('request_address'))))
# Extract zipcode
case = case.withColumn('zipcode', regexp_extract(col('request_address'), r'\d+$', 0))

# Create a `case_lifetime` feature
case = (
    case.withColumn('case_age', datediff(current_timestamp(), 'case_opened_date'))
    .withColumn('days_to_closed', datediff('case_closed_date', 'case_opened_date'))
    .withColumn('case_lifetime', when(col('case_closed'), col('days_to_closed')).otherwise(col('case_age')))
    .drop('case_age', 'days_to_closed')
)

# Join departments and sources
depts = spark.read.csv('dept.csv', header=True, inferSchema=True)
sources = spark.read.csv('source.csv', header=True, inferSchema=True)

case = case.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')

# # Train Test Split
# train, test = df.randomSplit([.8, .2], seed=123)
# train, validate, test = df.randomSplit([.7, .15, .15], seed=123)

### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [18]:
case.createOrReplaceTempView('case')


(spark.sql('''SELECT DATEDIFF(current_timestamp, case_due_date)
AS days_past_due 
FROM case 
WHERE NOT case_closed 
ORDER BY days_past_due 
DESC LIMIT 10;''').show())

[Stage 15:===>                                                    (1 + 15) / 16]

+-------------+
|days_past_due|
+-------------+
|         1952|
|         1952|
|         1952|
|         1951|
|         1949|
|         1945|
|         1945|
|         1944|
|         1943|
|         1943|
+-------------+



                                                                                

### 2. How many Stray Animal cases are there?

In [27]:
stray_count = case.filter(case.service_request_type == 'Stray Animal').count()
print(f'\n\tStray Animal Count: {stray_count}.')


	Stray Animal Count: 27361.


In [29]:
# Alternate solution

case.groupBy('service_request_type').count().filter(expr('service_request_type == "Stray Animal"')).show()

+--------------------+-----+
|service_request_type|count|
+--------------------+-----+
|        Stray Animal|27361|
+--------------------+-----+



### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

In [35]:
count_field_ops_not_officer_standby = case.filter(case.dept_division == 'Field Operations').filter(case.service_request_type != 'Officer Standby').count()

print(f'\n\tNon-Officer Standby Field Operatons Count: {count_field_ops_not_officer_standby}.')


	Non-Officer Standby Field Operatons Count: 116295.


4. Convert the council_district column to a string column.



In [37]:
case = case.withColumn('council_district', format_string('%04d', col('council_district')))

### 5. Extract the year from the case_closed_date column.

In [38]:
case.select('case_closed_date', year('case_closed_date')).show()

+-------------------+----------------------+
|   case_closed_date|year(case_closed_date)|
+-------------------+----------------------+
|2018-01-01 12:29:00|                  2018|
|2018-01-03 08:11:00|                  2018|
|2018-01-02 07:57:00|                  2018|
|2018-01-02 08:13:00|                  2018|
|2018-01-01 13:29:00|                  2018|
|2018-01-01 14:38:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-02 15:32:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-02 15:33:00|                  2018|
|2018-01-0

### 6. Convert num_days_late from days to hours in new columns num_hours_late.

In [44]:
case.withColumn('num_hours_late', case.num_days_late * 24).select('num_days_late', 'num_hours_late').show()

+-------------------+-------------------+
|      num_days_late|     num_hours_late|
+-------------------+-------------------+
| -998.5087616000001|     -23964.2102784|
|-2.0126041669999997|-48.302500007999996|
|       -3.022337963|      -72.536111112|
|       -15.01148148|      -360.27555552|
|0.37216435200000003|  8.931944448000001|
|       -29.74398148| -713.8555555199999|
|       -14.70673611|      -352.96166664|
|       -14.70662037|      -352.95888888|
|       -14.70662037|      -352.95888888|
|       -14.70649306|      -352.95583344|
|       -14.70649306|      -352.95583344|
|       -14.70636574|      -352.95277776|
|          -14.70625|-352.95000000000005|
|       -14.70636574|      -352.95277776|
|       -14.70623843|-352.94972232000003|
|-14.705891199999998|-352.94138879999997|
|       -14.70600694|      -352.94416656|
|       -14.70576389|      -352.93833336|
|       -14.70576389|      -352.93833336|
|       -14.70564815|       -352.9355556|
+-------------------+-------------

### 7. Join the case data with the source and department data.

In [51]:
depts = spark.read.csv('dept.csv', header=True, inferSchema=True)
sources = spark.read.csv('source.csv', header=True, inferSchema=True)

case = case.join(depts, 'dept_division', 'left').join(sources, 'source_id', 'left')


### 8. Are there any cases that do not have a request source?

In [47]:
case.select(case.source_id.isNull().cast('int').alias('is_null')).agg(sum('is_null')).show()

+------------+
|sum(is_null)|
+------------+
|           0|
+------------+



In [48]:
# Alternate way
case.filter(col('source_id').isNull()).show(vertical=True)

(0 rows)



### 9. What are the top 10 service request types in terms of number of requests?

In [56]:
case.groupby('service_request_type').count().sort(col('count').desc()).show(10, truncate=False)

+--------------------------------+------+
|service_request_type            |count |
+--------------------------------+------+
|No Pickup                       |103340|
|Overgrown Yard/Trash            |69451 |
|Damaged Cart                    |36113 |
|Bandit Signs                    |33316 |
|Stray Animal                    |30967 |
|Front Or Side Yard Parking      |29676 |
|Aggressive Animal(Non-Critical) |29152 |
|Cart Exchange Request           |26112 |
|Lost/Stolen Cart                |22707 |
|Junk Vehicle On Private Property|22705 |
+--------------------------------+------+
only showing top 10 rows





### 10. What are the top 10 service request types in terms of average days late?

In [85]:
case.where('case_late').groupBy('service_request_type').\
agg(mean('num_days_late').alias('number_days_late'), count('*').alias('number_of_cases')).\
sort(desc('number_days_late')).\
show(10, truncate=False)

+--------------------------------------+------------------+---------------+
|service_request_type                  |number_days_late  |number_of_cases|
+--------------------------------------+------------------+---------------+
|Zoning: Recycle Yard                  |210.89201994318182|132            |
|Zoning: Junk Yards                    |200.2051760849428 |262            |
|Structure/Housing Maintenance         |190.20707698509807|51             |
|Donation Container Enforcement        |171.09115313942624|122            |
|Storage of Used Mattress              |163.96812829714287|7              |
|Labeling for Used Mattress            |162.43032902285717|7              |
|Record Keeping of Used Mattresses     |153.99724039428568|7              |
|Signage Requied for Sale of Used Mattr|151.63868055333333|12             |
|Traffic Signal Graffiti               |137.64583330000002|16             |
|License Requied Used Mattress Sales   |128.79828704142858|7              |
+-----------

### 11. Does number of days late depend on department?

In [86]:
(case.filter('case_late')
    .groupby('dept_name')
    .agg(mean('num_days_late').alias('days_late'), count('num_days_late').alias('n_cases_late'))
    .sort('days_late')
    .withColumn('days_late', round(col('days_late'), 1))
    .show(truncate=False)
)

AnalysisException: Reference 'dept_name' is ambiguous, could be: dept_name, dept_name, dept_name.

### 12. How do number of days late depend on department and request type?

In [74]:
(
    case.filter("case_closed")
    .filter("case_late")
    .groupby("standardized_dept_name", "service_request_type")
    .agg(avg("num_days_late").alias("days_late"), count("*").alias("n_cases"))
    .withColumn("days_late", round(col("days_late"), 1))
    .sort(col('standardized_dept_name'), col('days_late'))
    .show(40, truncate=False)
)

AnalysisException: Reference 'standardized_dept_name' is ambiguous, could be: standardized_dept_name, standardized_dept_name, standardized_dept_name.

### 13. You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.

In [79]:
case = (
    case.withColumn('case_opened_date', to_timestamp(col('case_opened_date'), 'M/d/yy H:mm'))
    .withColumn('case_closed_date', to_timestamp(col('case_closed_date'), 'M/d/yy H:mm'))
    .withColumn('case_due_date', to_timestamp(col('case_due_date'), 'M/d/yy H:mm'))
)