# Big Data - Acquisition Exercise

In [1]:
import pyspark
import pandas as pd
import pyspark.sql.types as T
from pyspark.sql.functions import col, round as spark_round

import uuid

In [2]:
nprocs = 2

spark = (pyspark.sql.SparkSession.builder
 .master('local')
 .config('spark.jars.packages', 'mysql:mysql-connector-java:8.0.16')
 .config('spark.driver.memory', '4G')
 .config('spark.driver.cores', nprocs)
 .config('spark.sql.shuffle.partitions', nprocs)
 .appName('acquire')
 .enableHiveSupport()
 .getOrCreate())

## 1. read into spark environment (df_case, df_dept)**

In [3]:
df_case = spark.read.format("csv")\
                    .option("header", True)\
                    .option("inferSchema", True)\
                    .load("sa311/case.csv")

df_case.printSchema()
print()
df_case.show(5)

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)


+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case

In [4]:
df_dept = spark.read.format("csv")\
                    .option("header", True)\
                    .option("inferSchema", True)\
                    .load("sa311/dept.csv")

df_dept.printSchema()
print()
df_dept.show(5)

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)


+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 

## 2. write df_case and df_dept back to disk into their own directories (my_cases and my_depts)

In [5]:
df_case.write.format("csv").mode("overwrite")\
                           .option("header", True)\
                           .save("sa311/df_case")

In [6]:
df_dept.write.format("csv").mode("overwrite")\
                           .option("header", True)\
                           .save("sa311/df_dept")

## 3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)

In [7]:
df_case.write.format("parquet").mode("overwrite")\
                               .option("header", True)\
                               .save("sa311/df_case_parquet")

In [8]:
df_dept.write.format("parquet").mode("overwrite")\
                               .option("header", True)\
                               .save("sa311/df_dept_parquet")

## 4. Read your parquet files back into your spark environment.

In [9]:
df_case = spark.read.parquet("sa311/df_case_parquet")
df_case.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: string (nullable = true)
 |-- case_closed_date: string (nullable = true)
 |-- SLA_due_date: string (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [10]:
df_dept = spark.read.parquet("sa311/df_dept_parquet")
df_dept.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)



## 5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

In [11]:
cases_pdf = pd.read_csv("sa311/case.csv")
cases_pdf.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [12]:
depts_pdf = pd.read_csv("sa311/dept.csv")
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


## 6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)

In [13]:
cases_pdf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 841704 entries, 0 to 841703
Data columns (total 14 columns):
case_id                 841704 non-null int64
case_opened_date        841704 non-null object
case_closed_date        823594 non-null object
SLA_due_date            841671 non-null object
case_late               841704 non-null object
num_days_late           841671 non-null float64
case_closed             841704 non-null object
dept_division           841704 non-null object
service_request_type    841704 non-null object
SLA_days                841671 non-null float64
case_status             841704 non-null object
source_id               841704 non-null object
request_address         841704 non-null object
council_district        841704 non-null int64
dtypes: float64(2), int64(2), object(10)
memory usage: 89.9+ MB


In [14]:
cases_schema = T.StructType([T.StructField("case_id", T.IntegerType()),
                             T.StructField("case_opened_date", T.StringType()),
                             T.StructField("case_closed_date", T.StringType()),
                             T.StructField("SLA_due_date", T.StringType()),
                             T.StructField("case_late", T.StringType()),
                             T.StructField("num_days_late", T.FloatType()),
                             T.StructField("case_closed", T.StringType()),
                             T.StructField("dept_division", T.StringType()),
                             T.StructField("service_request_Type()", T.StringType()),
                             T.StructField("SLA_days", T.FloatType()),
                             T.StructField("case_status", T.StringType()),
                             T.StructField("source_id", T.StringType()),
                             T.StructField("request_address", T.StringType()),
                             T.StructField("council_district", T.IntegerType())])
cases_sdf = spark.createDataFrame(cases_pdf, schema=cases_schema)
cases_sdf.show(5)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+----------------------+---------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_Type()| SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+----------------------+---------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO|    -998.5088|        YES|Field Operations|          Stray Animal|    999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|   -2.0126042|        YES|     Storm Water|  Removal Of Obstru...| 4.322222|     Closed| svcCRMSS|2215  GOL

In [15]:
depts_pdf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 39 entries, 0 to 38
Data columns (total 4 columns):
dept_division             39 non-null object
dept_name                 38 non-null object
standardized_dept_name    39 non-null object
dept_subject_to_SLA       39 non-null object
dtypes: object(4)
memory usage: 1.3+ KB


In [16]:
depts_schema = T.StructType([T.StructField("dept_division", T.StringType()),
                             T.StructField("dept_name", T.StringType()),
                             T.StructField("standardized_dept_name", T.StringType()),
                             T.StructField("dept_subject_to_SLA", T.StringType())])

depts_sdf = spark.createDataFrame(depts_pdf, schema=depts_schema)
depts_sdf.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



## 7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)

In [17]:
cases_pdf1 = cases_sdf.toPandas()
cases_pdf1.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_Type(),SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508789,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [18]:
depts_pdf1 = depts_sdf.toPandas()
depts_pdf1.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


## 8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.

In [19]:
cases_table_name = f"df_{uuid.uuid4().hex}"
df_case.write.saveAsTable(cases_table_name)

In [20]:
spark.sql(f"DESCRIBE {cases_table_name}").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



In [21]:
cases_table_name

'df_6f9e6ac9f44545d2a7536ebbeb2d41ff'

In [22]:
depts_table_name = f"df_{uuid.uuid4().hex}"
df_dept.write.saveAsTable(depts_table_name)

In [23]:
spark.sql(f"DESCRIBE {depts_table_name}").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



In [24]:
depts_table_name

'df_36c1cb346767480796a710473d43c58d'

## 9 Explore the Hive database/tables you have created using the methods in the lesson.

### cases table

**Average number of days late for all cases**

In [25]:
spark.sql(f"""
SELECT MEAN({cases_table_name}.num_days_late)
FROM {cases_table_name}
""").show()

+------------------+
|avg(num_days_late)|
+------------------+
|-49.07486758369696|
+------------------+



Negative means non-late

**Average number of days late for late cases**

In [26]:
spark.sql(f"""
SELECT MEAN({cases_table_name}.num_days_late)
FROM {cases_table_name}
WHERE {cases_table_name}.case_late = "YES"
""").show()

+------------------+
|avg(num_days_late)|
+------------------+
|25.420263278915435|
+------------------+



**Council districts ranked by number of 311 filings**

In [27]:
spark.sql(f"""
SELECT {cases_table_name}.council_district, COUNT({cases_table_name}.council_district)
FROM {cases_table_name}
GROUP BY {cases_table_name}.council_district
ORDER BY COUNT({cases_table_name}.council_district) DESC
""").show()

+----------------+-----------------------+
|council_district|count(council_district)|
+----------------+-----------------------+
|               1|                 119309|
|               2|                 114745|
|               5|                 114609|
|               3|                 102706|
|               4|                  93778|
|               6|                  74095|
|               7|                  72445|
|              10|                  62926|
|               8|                  42345|
|               9|                  40916|
|               0|                   3830|
+----------------+-----------------------+



**Council districts ranked by most late cases**

In [28]:
spark.sql(f"""
SELECT {cases_table_name}.council_district, COUNT(*)
FROM {cases_table_name}
WHERE {cases_table_name}.case_late = "YES"
GROUP BY {cases_table_name}.council_district
ORDER BY COUNT(*) DESC
""").show()

+----------------+--------+
|council_district|count(1)|
+----------------+--------+
|               2|   14572|
|               3|   11799|
|               5|   11681|
|               1|   11515|
|               6|   10037|
|               7|    9351|
|               4|    9147|
|              10|    6901|
|               8|    4717|
|               9|    4229|
|               0|     554|
+----------------+--------+



**Council districts with the most late cases as a percentage of filings**

In [29]:
cases_per_dist = spark.sql(f"""
SELECT {cases_table_name}.council_district, COUNT(*) AS total_cases
FROM {cases_table_name}
GROUP BY {cases_table_name}.council_district
ORDER BY total_cases DESC
""")

cases_per_dist.show()
cases_per_dist.count()

+----------------+-----------+
|council_district|total_cases|
+----------------+-----------+
|               1|     119309|
|               2|     114745|
|               5|     114609|
|               3|     102706|
|               4|      93778|
|               6|      74095|
|               7|      72445|
|              10|      62926|
|               8|      42345|
|               9|      40916|
|               0|       3830|
+----------------+-----------+



11

In [30]:
late_per_dist = spark.sql(f"""
SELECT {cases_table_name}.council_district, COUNT(*) AS late_cases
FROM {cases_table_name}
WHERE {cases_table_name}.case_late = "YES"
GROUP BY {cases_table_name}.council_district
ORDER BY late_cases DESC
""")

late_per_dist.show()
late_per_dist.count()

+----------------+----------+
|council_district|late_cases|
+----------------+----------+
|               2|     14572|
|               3|     11799|
|               5|     11681|
|               1|     11515|
|               6|     10037|
|               7|      9351|
|               4|      9147|
|              10|      6901|
|               8|      4717|
|               9|      4229|
|               0|       554|
+----------------+----------+



11

In [31]:
cases_per_dist.alias("total_by_dist")\
    .join(late_per_dist.alias("late_by_dist"), ["council_district"])\
    .select(col("council_district"), spark_round((col("late_by_dist.late_cases") / col("total_by_dist.total_cases")), 2).alias("ratio"))\
    .orderBy("ratio", ascending=False)\
    .show()

+----------------+-----+
|council_district|ratio|
+----------------+-----+
|               6| 0.14|
|               0| 0.14|
|               2| 0.13|
|               7| 0.13|
|              10| 0.11|
|               3| 0.11|
|               8| 0.11|
|               1|  0.1|
|               5|  0.1|
|               4|  0.1|
|               9|  0.1|
+----------------+-----+



### depts table

**The department divisions**

In [32]:
spark.sql(f"""
SELECT DISTINCT {depts_table_name}.dept_division
FROM {depts_table_name}
""").show(100)

+--------------------+
|       dept_division|
+--------------------+
|     311 Call Center|
|               Brush|
|     Clean and Green|
|Code Enforcement ...|
|   Dangerous Premise|
|Dangerous Premise...|
|         District 10|
|          District 2|
|          District 3|
|          District 7|
|          District 9|
|Engineering Division|
|    Facility License|
|    Field Operations|
| Food Establishments|
|  Signs and Markings|
|Traffic Engineeri...|
|           Tree Crew|
|              Vector|
|Clean and Green N...|
|    Code Enforcement|
|Code Enforcement ...|
|Director's Office...|
|          District 1|
|          District 6|
|          District 8|
|            Graffiti|
|   Graffiti (IntExp)|
|       Miscellaneous|
|        Reservations|
|               Shops|
|    Shops (Internal)|
|             Signals|
|         Solid Waste|
|         Storm Water|
|Storm Water Engin...|
|             Streets|
|              Trades|
|    Waste Collection|
+--------------------+



## 10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

In [33]:
cases_df_from_sql = spark.sql(f"""
SELECT *
FROM {cases_table_name}
""")

cases_df_from_sql.show(5)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [34]:
depts_df_from_sql = spark.sql(f"""
SELECT *
FROM {depts_table_name}
""")

depts_df_from_sql.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows

