# Acquire

Using case.csv & dept.csv:

1. read into spark environment (df_case, df_dept)
1. write df_case and df_dept back to disk into their own directories (my_cases and my_depts)
1. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)
1. Read your parquet files back into your spark environment.
1. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)
1. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)
1. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)
1. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.
1. Explore the Hive database/tables you have created using the methods in the lesson.
1. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

In [1]:
import pyspark
import pandas as pd
from pyspark.sql.types import *

In [2]:
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [3]:
spark.read.csv('./sa311/case.csv')

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string]

In [4]:
spark.read.format('csv').load('./sa311/case.csv')

DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string, _c8: string, _c9: string, _c10: string, _c11: string, _c12: string, _c13: string]

In [5]:
df_dept = (spark.read
           .option('header', True)
           .option('inferSchema', True)
           .format('csv')
           .load('./sa311/dept.csv'))

df_dept.show(1)

+---------------+----------------+----------------------+-------------------+
|  dept_division|       dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+----------------+----------------------+-------------------+
|311 Call Center|Customer Service|      Customer Service|                YES|
+---------------+----------------+----------------------+-------------------+
only showing top 1 row



In [6]:
df_case = (spark.read
 .option('header', True)
 .option('inferSchema', True)
 .format('csv')
 .load('./sa311/case.csv'))

df_case.show(1)

+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|     num_days_late|case_closed|   dept_division|service_request_type|SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO|-998.5087616000001|        YES|Field Operations|        Stray Animal|   999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
+----------+----------------+----------------+------------+---------+------------------+-----------+----------------+--------------------+--------+-----------+---------

In [7]:
df_dept.dtypes

[('dept_division', 'string'),
 ('dept_name', 'string'),
 ('standardized_dept_name', 'string'),
 ('dept_subject_to_SLA', 'string')]

In [8]:
df_case.dtypes

[('case_id', 'int'),
 ('case_opened_date', 'string'),
 ('case_closed_date', 'string'),
 ('SLA_due_date', 'string'),
 ('case_late', 'string'),
 ('num_days_late', 'double'),
 ('case_closed', 'string'),
 ('dept_division', 'string'),
 ('service_request_type', 'string'),
 ('SLA_days', 'double'),
 ('case_status', 'string'),
 ('source_id', 'string'),
 ('request_address', 'string'),
 ('council_district', 'int')]

### Saving as csv files.

In [9]:
(df_case.write
 .format('csv')
 .mode('overwrite')
 .option('header', True)
 .save('my_cases'))

In [10]:
spark.read.option('header', True).csv('my_cases').show(3)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO| -0.077511574|        YES|Field Operations|     Officer Standby|      0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO| -2.743912037|        YES|Waste Collection|           No Pickup|   3.801875|     Closed|   138793|1906

In [11]:
df_dept.write.format('csv').mode("overwrite").\
    option("header", True).save("my_depts")

In [12]:
spark.read.option('header', True).csv('my_depts').show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
|Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



### Saving as parquet files

In [13]:
df_case.write.format('parquet').mode('overwrite').option('header', True).save('my_cases_parquet')

In [14]:
df_dept.write.format('parquet').mode('overwrite').option('header', True).save('my_depts_parquet')

### Reading in parquet files

In [15]:
spark.read.option('header', True).parquet('my_cases_parquet').show(3)

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO| -0.077511574|        YES|Field Operations|     Officer Standby|      0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO| -2.743912037|        YES|Waste Collection|           No Pickup|   3.801875|     Closed|   138793|1906

In [16]:
spark.read.option('header', True).parquet('my_depts_parquet').show(3)

+---------------+--------------------+----------------------+-------------------+
|  dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+---------------+--------------------+----------------------+-------------------+
|311 Call Center|    Customer Service|      Customer Service|                YES|
|          Brush|Solid Waste Manag...|           Solid Waste|                YES|
|Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
+---------------+--------------------+----------------------+-------------------+
only showing top 3 rows



### Read each csv into a pandas dataframe

In [17]:
cases_pdf = pd.read_csv('./sa311/case.csv')
depts_pdf = pd.read_csv('./sa311/dept.csv')

### Convert the pandas dataframes into spark dataframes
Fun

In [63]:
import pyspark.sql.types as T
schema = T.StructType([T.StructField('case_id', T.StringType()),
                      T.StructField('case_opened_date', T.StringType()),
                      T.StructField('case_closed_date', T.StringType()),
                      T.StructField('SLA_due_date', T.StringType()),
                      T.StructField('case_late', T.StringType()),
                      T.StructField('num_days_late', T.StringType()),
                      T.StructField('case_closed', T.StringType()),
                      T.StructField('dept_division', T.StringType()),
                      T.StructField('service_request_type', T.StringType()),
                      T.StructField('SLA_days', T.StringType()),
                      T.StructField('case_status', T.StringType()),
                      T.StructField('source_id', T.StringType()),
                      T.StructField('request_address', T.StringType()),
                      T.StructField('council_district', T.StringType())])

spark.createDataFrame(cases_pdf, schema).show(5)

+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO| -2.012604167|        YES|     Storm Water|Removal Of Obstru...|4.322222222|     Closed| svcCRMSS|2215  GOL

In [66]:
schema = T.StructType([T.StructField('dept_division', T.StringType()),
                      T.StructField('dept_name', T.StringType()),
                      T.StructField('standardized_dept_name', T.StringType()),
                      T.StructField('dept_subject_to_SLA', T.StringType())])

spark.createDataFrame(depts_pdf, schema).show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



### Convert the spark dataframes back into pandas dataframes. 

In [19]:
cases_sdf = spark.read.option('header', True).csv('my_cases')
depts_sdf = spark.read.option('header', True).csv('my_depts')

In [20]:
cases_pdf1 = cases_sdf.toPandas()
depts_pdf1 = depts_sdf.toPandas()

In [21]:
cases_pdf1.head(3)

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014551581,5/28/18 13:14,5/28/18 14:23,5/28/18 16:14,NO,-0.077511574,YES,Field Operations,Officer Standby,0.125,Closed,NO10960,"7003 RAVENSDALE, San Antonio, 78250",6
1,1014551583,5/28/18 13:15,5/29/18 14:38,6/1/18 8:30,NO,-2.743912037,YES,Waste Collection,No Pickup,3.801875,Closed,138793,"1906 MOSSY CREEK DR, San Antonio, 78245",4
2,1014551584,5/28/18 13:16,5/29/18 14:31,6/5/18 8:30,NO,-6.749131944,YES,Waste Collection,Lost/Stolen Cart,7.801087963,Closed,142989,"103 SPRINGWOOD LN, San Antonio, 78216",1


In [22]:
depts_pdf1.head(3)

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES


### Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables

In [23]:
cases_sdf.write.saveAsTable('df_cases_hive')

In [25]:
spark.sql('DESCRIBE df_cases_hive').show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   string|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   string|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   string|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   string|   null|
+--------------------+---------+-------+



In [26]:
depts_sdf.write.saveAsTable('df_depts_hive')

In [27]:
spark.sql('DESCRIBE df_depts_hive').show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



### Explore the Hive database/tables you have created using the methods in the lesson

In [38]:
spark.sql('SELECT case_closed_date, case_closed FROM df_cases_hive').show()

+----------------+-----------+
|case_closed_date|case_closed|
+----------------+-----------+
|   5/28/18 14:23|        YES|
|   5/29/18 14:38|        YES|
|   5/29/18 14:31|        YES|
|   5/29/18 12:13|        YES|
|   7/10/18 11:42|        YES|
|   5/29/18 14:38|        YES|
|   5/31/18 10:15|        YES|
|    6/5/18 13:40|        YES|
|   5/30/18 13:21|        YES|
|     6/5/18 8:37|        YES|
|   5/28/18 14:23|        YES|
|   5/28/18 14:17|        YES|
|   5/29/18 15:43|        YES|
|    5/30/18 6:32|        YES|
|   5/28/18 15:43|        YES|
|    5/30/18 8:31|        YES|
|   5/29/18 14:06|        YES|
|    5/29/18 9:51|        YES|
|   5/28/18 14:13|        YES|
|   5/29/18 12:18|        YES|
+----------------+-----------+
only showing top 20 rows



In [41]:
spark.sql('USE default')
spark.sql('SHOW TABLES').show()

+--------+-------------+-----------+
|database|    tableName|isTemporary|
+--------+-------------+-----------+
| default|df_cases_hive|      false|
| default|df_depts_hive|      false|
+--------+-------------+-----------+



In [44]:
for table in ['df_cases_hive', 'df_depts_hive']:
    spark.sql(f"DESCRIBE {table}").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   string|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   string|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   string|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   string|   null|
+--------------------+---------+-------+

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_

In [46]:
for table in ['df_cases_hive', 'df_depts_hive']:
    spark.sql(f"SELECT * FROM {table} LIMIT 3").show()

+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+-------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO| -0.077511574|        YES|Field Operations|     Officer Standby|      0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO| -2.743912037|        YES|Waste Collection|           No Pickup|   3.801875|     Closed|   138793|1906

In [113]:
(spark
 .sql('SELECT dept_division, service_request_type, request_address, case_opened_date, case_closed_date \
 FROM df_cases_hive WHERE request_address LIKE "1950  DONALDSON%"')
 .show(50)
)

+----------------+--------------------+--------------------+----------------+----------------+
|   dept_division|service_request_type|     request_address|case_opened_date|case_closed_date|
+----------------+--------------------+--------------------+----------------+----------------+
|Waste Collection|           No Pickup|1950  DONALDSON A...|   9/25/17 17:31|    9/28/17 7:30|
+----------------+--------------------+--------------------+----------------+----------------+

