<h1 id="exercises">Exercises - Data Acquisition</h1>


This exercises uses the <code>case.csv</code>, <code>dept.csv</code>, and <code>source.csv</code> files from the
san antonio 311 call dataset.

In [1]:
import pandas as pd
import numpy as np

from os import path
from pydataset import data

import pyspark
import pyspark.sql.functions as F
import pyspark.sql.types as T

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
def get_spark_shape(df):
    return (df.count(), len(df.columns))

## i.1
Read the case, department, and source data into their own spark dataframes.

#### Create case

In [4]:
case = spark.read.csv('spark_csv/case.csv', header=True, inferSchema=True)
print(get_spark_shape(case))
case = case.distinct()
print(get_spark_shape(case))
case.printSchema()

(841704, 14)
(841630, 14)
root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



#### Register case

In [5]:
case.createOrReplaceTempView('case')

#### Create dept

In [6]:
dept=spark.read.csv('spark_csv/dept.csv', header=True, inferSchema=True)
print(get_spark_shape(dept))
dept.printSchema()

(39, 4)
root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



In [7]:
dept.show(39, truncate=False)

+-----------------------------+-------------------------+------------------------+-------------------+
|dept_division                |dept_name                |standardized_dept_name  |dept_subject_to_SLA|
+-----------------------------+-------------------------+------------------------+-------------------+
|311 Call Center              |Customer Service         |Customer Service        |YES                |
|Brush                        |Solid Waste Management   |Solid Waste             |YES                |
|Clean and Green              |Parks and Recreation     |Parks & Recreation      |YES                |
|Clean and Green Natural Areas|Parks and Recreation     |Parks & Recreation      |YES                |
|Code Enforcement             |Code Enforcement Services|DSD/Code Enforcement    |YES                |
|Code Enforcement (IntExp)    |Code Enforcement Services|DSD/Code Enforcement    |YES                |
|Code Enforcement (Internal)  |null                     |DSD/Code Enforce

#### Register dept

In [8]:
dept.createOrReplaceTempView('dept')

#### Create source

In [9]:
source=spark.read.csv('spark_csv/source.csv', header=True, inferSchema=True)
print(get_spark_shape(source))
source.printSchema()

(140, 2)
root
 |-- source_id: string (nullable = true)
 |-- source_username: string (nullable = true)



#### Register source

In [10]:
source.createOrReplaceTempView('source')

## i.2
Let's see how writing to the local disk works in spark:

### i.2.a
Write the code necessary to store the source data in both csv and json format, store these as <code>sources_csv</code> and <code>sources_json</code>

In [11]:
csv_file = 'data/sources_csv'

source.write.csv(csv_file, mode='overwrite')
if path.exists(csv_file):
    print('file saved')

file saved


In [12]:
json_file = 'data/sources_json'

source.write.json(json_file, mode='overwrite')
if path.exists(json_file):
    print('file saved')

file saved


### i.2.b
Inspect your folder structure. What do you notice?

**Spark makes directories, not just files.**

## i.3
Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

#### inspect case

In [13]:
case.show(3, vertical=True, truncate=False)

-RECORD 0-------------------------------------------------------
 case_id              | 1014127780                              
 case_opened_date     | 1/1/18 22:22                            
 case_closed_date     | 2/14/18 16:42                           
 SLA_due_date         | 1/6/18 22:22                            
 case_late            | YES                                     
 num_days_late        | 38.76371528                             
 case_closed          | YES                                     
 dept_division        | Field Operations                        
 service_request_type | Animal Neglect                          
 SLA_days             | 5.0                                     
 case_status          | Closed                                  
 source_id            | 124405                                  
 request_address      | 5335  NW 410, San Antonio, 78229        
 council_district     | 7                                       
-RECORD 1----------------

In [14]:
case.createOrReplaceTempView('case')

In [15]:
dupes = spark.sql('''
    SELECT -- DISTINCT
        *
    FROM
        case
    WHERE
--        case_id = '1013247747'
        case_id IN 
            (
            SELECT case_id FROM case GROUP BY case_id HAVING count(*) > 1
            )
--        AND
--        case_id NOT IN 
--            (
--            SELECT case_id FROM (SELECT DISTINCT * FROM case) distincts GROUP BY case_id HAVING count(*) > 1
--            )
--    GROUP BY
--        case_id
--        ,council_district 
--        ,request_address
--    HAVING
--        count(*) > 1
--        AND
--        (
--        min(council_district) = max(council_district)
--        OR
--        min(request_address) <> max(request_address)
--        )
    ORDER BY
        case_id
--        ,council_district
--        ,request_address
''')


print(dupes.count())
dupes.show(10, vertical=True, truncate=False)

76
-RECORD 0---------------------------------------------------
 case_id              | 1013247747                          
 case_opened_date     | 1/10/17 17:25                       
 case_closed_date     | 1/10/17 20:26                       
 SLA_due_date         | 10/6/19 17:25                       
 case_late            | NO                                  
 num_days_late        | -998.8738194                        
 case_closed          | YES                                 
 dept_division        | Field Operations                    
 service_request_type | Stray Animal                        
 SLA_days             | 999.0                               
 case_status          | Closed                              
 source_id            | dl05036                             
 request_address      | NB LOOP 410 SW and VALLEY HI DR     
 council_district     | 0                                   
-RECORD 1---------------------------------------------------
 case_id             

#### Modify case

- Change 'SLA' to 'case
- Set dates to timestamps
- Set YES/NO fields to boolean
- Change council district field to string

In [16]:
fmt = "M/d/yy H:mm"
case = (case
        .withColumnRenamed('SLA_due_date', 'case_due_date')
        .withColumnRenamed('SLA_days', 'case_days')
        .withColumn("case_opened_date", F.to_timestamp("case_opened_date", fmt))
        .withColumn("case_closed_date", F.to_timestamp("case_closed_date", fmt))
        .withColumn("case_due_date", F.to_timestamp("case_due_date", fmt))
        .withColumn("case_closed", F.expr('case_closed == "YES"'))
        .withColumn("case_late", F.expr('case_late == "YES"'))
        .withColumn("council_district", F.col("council_district").cast("string"))
)
case.printSchema()
case.show(3, vertical=True, truncate=False)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- case_due_date: timestamp (nullable = true)
 |-- case_late: boolean (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: boolean (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- case_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: string (nullable = true)

-RECORD 0-------------------------------------------------------
 case_id              | 1014127780                              
 case_opened_date     | 2018-01-01 22:22:00                     
 case_closed_date     | 2018-02-14 16:42:00                     
 case_due_date        | 2018-01-06 22:22:00                     
 case_late            | true   

#### Inspect dept

In [17]:
dept.show(3, truncate=False)

+---------------+----------------------+----------------------+-------------------+
|dept_division  |dept_name             |standardized_dept_name|dept_subject_to_SLA|
+---------------+----------------------+----------------------+-------------------+
|311 Call Center|Customer Service      |Customer Service      |YES                |
|Brush          |Solid Waste Management|Solid Waste           |YES                |
|Clean and Green|Parks and Recreation  |Parks & Recreation    |YES                |
+---------------+----------------------+----------------------+-------------------+
only showing top 3 rows



#### Modify dept

- Change dept_subject_to_SLA to boolean

In [18]:
dept = (dept
        .withColumn("dept_subject_to_SLA", F.expr('dept_subject_to_SLA == "YES"'))
        
)
dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: boolean (nullable = true)



#### Inspect source

In [19]:
source.show(3, truncate=False)

+---------+----------------+
|source_id|source_username |
+---------+----------------+
|100137   |Merlene Blodgett|
|103582   |Carmen Cura     |
|106463   |Richard Sanchez |
+---------+----------------+
only showing top 3 rows



#### Modify source

* No modifications necessary *

You might have noticed that the latest date in the dataset is fairly far off from the present day. To account for this, replace any occurances of the current time with the maximum date from the dataset.

In [20]:
case.createOrReplaceTempView('case')

In [21]:
spark.sql('''
    SELECT MAX(case_opened_date) MaxOpen, MAX(case_closed_date) MaxClose FROM case
''').show()

+-------------------+-------------------+
|            MaxOpen|           MaxClose|
+-------------------+-------------------+
|2018-08-08 10:38:00|2018-08-08 10:38:00|
+-------------------+-------------------+



In [22]:
case.createOrReplaceTempView('case')

In [23]:
spark.sql('''
    SELECT 
        case_id,
        date_add(case_opened_date, 0) date_opened, 
        hour(case_opened_date) * 60 + minute(case_opened_date) min_opened,
        date_add(case_due_date, 0) date_due, 
        hour(case_due_date) * 60 + minute(case_due_date) min_due,
        date_add(case_closed_date, 0) date_closed, 
        hour(case_closed_date) * 60 + minute(case_closed_date) min_closed,
--        case_closed_date - case_opened_date case_dur,
--        round(case_days, 3) case_age, 
--        round(num_days_late, 3) days_late, 
--        case_days + num_days_late case_daysd,
        cast(case_days + num_days_late as int) case_days,
        cast(1440 * ((case_days + num_days_late) - cast(case_days + num_days_late as int)) as int) case_mins,
--        date_add(case_due_date, num_days_late) calcdue, 
        date_add(case_opened_date, round(case_days + num_days_late)) calcage
    FROM 
        case
    WHERE 
        case_closed_date < case_opened_date
--        (case_closed = True 
--            AND 
--            date_add(case_opened_date, round(case_days + num_days_late)) <> date_add(case_closed_date, 0))
--        OR
--        (case_closed = False
--            AND
--            date_add(case_opened_date, int(case_days) + int(num_days_late)) <> '2018-08-08')
''').show()

+----------+-----------+----------+----------+-------+-----------+----------+---------+---------+----------+
|   case_id|date_opened|min_opened|  date_due|min_due|date_closed|min_closed|case_days|case_mins|   calcage|
+----------+-----------+----------+----------+-------+-----------+----------+---------+---------+----------+
|1013627878| 2017-06-07|      1037|2017-06-22|    510| 2017-06-07|       720|        0|     -317|2017-06-07|
|1013760291| 2017-07-31|       899|2017-08-14|    899| 2017-07-31|       720|        0|     -179|2017-07-31|
|1013776977| 2017-08-07|       879|2017-10-10|    879| 2017-08-07|       720|        0|     -159|2017-08-07|
|1013780219| 2017-08-08|       879|2017-08-22|    879| 2017-08-08|       720|        0|     -159|2017-08-08|
|1013780868| 2017-08-08|      1032|2017-08-11|    510| 2017-08-08|       720|        0|     -312|2017-08-08|
|1013783316| 2017-08-09|       939|2017-08-23|    939| 2017-08-09|       720|        0|     -219|2017-08-09|
|1013805774| 2017-0

## ii.1
How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

In [24]:
print('Latest case in open caselog')
(case
 .filter(F.col('case_closed')==False)
 .select('case_id', 'case_due_date', 'service_request_type', 'num_days_late')
 .sort(F.col('num_days_late').desc())
 .show(1)
)

Latest case in open caselog
+----------+-------------------+--------------------+-------------+
|   case_id|      case_due_date|service_request_type|num_days_late|
+----------+-------------------+--------------------+-------------+
|1013225651|2017-01-17 08:30:00|   No Address Posted|  348.6458333|
+----------+-------------------+--------------------+-------------+
only showing top 1 row



In [25]:
print('Oldest case in open caselog')
(case
 .filter(F.col('case_closed')==False)
 .sort(F.col('case_opened_date'))
 .select('case_id', 'case_opened_date', 'service_request_type', 'num_days_late')
 .show(1)
)

Oldest case in open caselog
+----------+-------------------+--------------------+-------------+
|   case_id|   case_opened_date|service_request_type|num_days_late|
+----------+-------------------+--------------------+-------------+
|1013225646|2017-01-01 13:48:00|   No Address Posted|  348.6458333|
+----------+-------------------+--------------------+-------------+
only showing top 1 row



In [35]:
(case
 .groupBy('service_request_type')
 .pivot('case_closed')
 .count()
 .withColumn('total', 'false' + 'true')
).show(100, truncate=False)

AssertionError: col should be Column

## ii.2
How many Stray Animal cases are there?

## ii.3
How many service requests that are assigned to the Field Operations department (<code>dept_division</code>) are not classified as "Officer Standby" request type (<code>service_request_type</code>)?

## ii.4
Convert the <code>council_district</code> column to a string column.

## ii.5
Extract the year from the <code>case_closed_date</code> column.


## ii.6
Convert <code>num_days_late</code> from days to hours in new columns <code>num_hours_late</code>.

## ii.7
Join the case data with the source and department data.

## ii.8
Are there any cases that do not have a request source?

## ii.9
What are the top 10 service request types in terms of number of requests?

## ii.10
What are the top 10 service request types in terms of average days late?

## ii.11
Does number of days late depend on department?

## ii.12
How do number of days late depend on department and request type?