# Imports

In [22]:
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import env

## Spark sesh

In [2]:
#create enviroment
spark = SparkSession.builder.getOrCreate()
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/26 13:54:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/10/26 13:54:08 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


# Questions
## 1. Read the cases, department, and source data into their own Spark dataframes.

In [3]:
#sql query
url = env.get_db_url(env.user,env.password,env.host,'311_data')

In [4]:
#sql query
cases_query = 'select * from cases limit 100000'
dept_query = 'select * from dept'
source_query = 'select * from source'

In [5]:
#pandas df
cases_df = pd.read_sql(cases_query, url)
dept_df = pd.read_sql(dept_query, url)
source_df = pd.read_sql(source_query, url)

In [7]:
#spark df
cases = spark.createDataFrame(cases_df)
dept = spark.createDataFrame(dept_df)
source = spark.createDataFrame(source_df)

In [9]:
cases.show(1)

23/10/26 13:54:36 WARN TaskSetManager: Stage 0 contains a task of very large size (1656 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|   999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+---

## 2. Let's see how writing to the local disk works in Spark:

    Write the code necessary to store the source data in both csv and json format.
- Store these as sources_csv and sources_json.

`Inspect your folder structure.`
- What do you notice?

In [10]:
source.show(1)

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
+-----+---------+----------------+
only showing top 1 row



In [11]:
# CSV
source.write.csv('data/source_csv',mode='overwrite')

                                                                                

In [13]:
# JSON
source.write.json('data/source_json',mode='overwrite')

In [None]:
# 2 Answer:
# The files are saved in different instances and if using traditional pandas / python tools would require a different to tool to be able to obtain.

## 3. Inspect the data in your dataframes. Are the data types appropriate? Write the code necessary to cast the values to the appropriate types.

In [14]:
source.dtypes

[('index', 'bigint'), ('source_id', 'string'), ('source_username', 'string')]

In [29]:
dfs = [source,dept,cases]
for df in dfs:
    print(f'+ + + + + + + + + + + + + + + + +\nData types for {df}:\n')
    df.dtypes
    df.show(1)
    #print('+ + + + + + + + + + + + + + + + +')

+ + + + + + + + + + + + + + + + +
Data types for DataFrame[index: bigint, source_id: string, source_username: string]:

+-----+---------+----------------+
|index|source_id| source_username|
+-----+---------+----------------+
|    0|   100137|Merlene Blodgett|
+-----+---------+----------------+
only showing top 1 row

+ + + + + + + + + + + + + + + + +
Data types for DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: int]:

+---------------+----------------+----------------------+-------------------+
|  dept_division|       dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+----------------+----------------------+-------------------+
|311 Call Center|Customer Service|      Customer Service|               NULL|
+---------------+----------------+----------------------+-------------------+
only showing top 1 row

+ + + + + + + + + + + + + + + + +
Data types for DataFrame[case_id: bigint, case_opened_date: string, case

23/10/26 14:54:53 WARN TaskSetManager: Stage 19 contains a task of very large size (1656 KiB). The maximum recommended task size is 1000 KiB.


In [28]:
# source is fine

# dept could change 'dept_subject_to_SLA' as bool
dept = dept.withColumn('dept_subject_to_SLA', (F.col('dept_subject_to_SLA') == 'YES').cast('int'))
# cases could change anything with date as a date time rather than a string
cases = cases.withColumns(
{
    'case_closed': (F.col('case_closed') == 'YES').cast('int'),
    'case_late': (F.col('case_late') == 'YES').cast('int')
})
    # changing d type of council district
cases = cases.withColumn(
'council_district',
F.format_string(
    '%03d',
    cases.council_district.cast('int')
))

## Data Exploratory Questions

### 1. How old is the latest (in terms of days past SLA) currently open issue? How long has the oldest (in terms of days since opened) currently opened issue been open?

### 2. How many Stray Animal cases are there?

### 3. How many service requests that are assigned to the Field Operations department (dept_division) are not classified as "Officer Standby" request type (service_request_type)?

### 4. Convert the council_district column to a string column.

### 5. Extract the year from the case_closed_date column.

### 6. Convert num_days_late from days to hours in a new column num_hours_late.

### 7. Join the cases data with the source and department data.

### 8. Are there any cases that do not have a request source?

### 9. What are the top 10 service request types in terms of number of requests?



### 10. What are the top 10 service request types in terms of average days late?


### 11. Does number of days late depend on department? (Answer without the use of a stats test)


### 12. How do number of days late depend on department and request type?