# Exercises
## Using case.csv & dept.csv:

#### 1. read into spark environment (df_case, df_dept)
#### 2. write df_case and df_dept back to disk into their own directories (my_cases and my_depts)
#### 3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)
#### 4. Read your parquet files back into your spark environment.
#### 5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)
#### 6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)
#### 7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)
#### 8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.
#### 9. Explore the Hive database/tables you have created using the methods in the lesson.
#### 10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

In [1]:
import pyspark

In [2]:
import pandas as pd
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.getOrCreate()

#### 1. read into spark environment (df_case, df_dept)

In [4]:
(spark.read
 .option('header', True)
 .option('inferSchema', True)
 .format('csv')
 .load('./sa311/case.csv'))

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [6]:
df_case = (spark.read.csv('./sa311/case.csv', header=True, inferSchema=True))

In [7]:
df_dept = spark.read.csv('./sa311/dept.csv', header=True, inferSchema=True)

#### 2. write df_case and df_dept back to disk into their own directories (my_cases and my_depts)

In [8]:
df_case.write.csv('my_cases')
df_dept.write.csv('my_depts')

#### 3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet)

In [9]:
df_case.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/my_caes_parquet')
# typoed cases but whatever
df_dept.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/my_depts_parquet')

#### 4. Read your parquet files back into your spark environment.

In [11]:
# made the typo in 'caes' consistent...
df_case = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/my_caes_parquet")

df_dept = spark.read.format('parquet').\
    option('header', True).\
    option('inferSchema', True).\
    load('sa311/my_depts_parquet')

#### 5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf)

In [12]:
cases_pdf = pd.read_csv('./sa311/case.csv')
depts_pdf = pd.read_csv('./sa311/dept.csv')

#### 6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf)

In [16]:
schema = spark.read.csv('./sa311/case.csv', header=True, inferSchema=True).schema
cases_sdf = spark.createDataFrame(cases_pdf, schema=schema)

In [19]:
schema = spark.read.csv('./sa311/dept.csv', header=True, inferSchema=True).schema
depts_sdf = spark.createDataFrame(depts_pdf, schema=schema)

#### 7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)

In [20]:
cases_pdf1 = cases_sdf.toPandas()
cases_pdf1.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [21]:
depts_pdf1 = depts_sdf.toPandas()

In [22]:
depts_pdf1.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


#### 8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables.

In [23]:
import uuid
table_name1 = 'df_' + str(uuid.uuid4().hex)
table_name2 = 'df_' + str(uuid.uuid4().hex)
cases_sdf.write.saveAsTable(table_name1)
depts_sdf.write.saveAsTable(table_name2)

#### 9. Explore the Hive database/tables you have created using the methods in the lesson.

In [28]:
query1 = "DESCRIBE %s" % table_name1
query2 = "DESCRIBE %s" % table_name2
spark.sql(query1).show()
print(table_name1)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+

df_c851995fdc3347fabb097dd8a33ed63d


In [29]:
spark.sql(query2).show()
print(table_name2)

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+

df_e7e74eda954544419c8fa5b6e6e61689


#### 10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

In [30]:
spark.sql("SHOW DATABASES").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [31]:
spark.sql("USE default")
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|df_c851995fdc3347...|      false|
| default|df_e7e74eda954544...|      false|
+--------+--------------------+-----------+



In [32]:
spark.sql(f'DESCRIBE {table_name1}').show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



In [33]:
spark.sql(f'DESCRIBE {table_name2}').show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



In [35]:
cases_sdf = spark.sql(f'SELECT * FROM {table_name1}')
depts_sdf = spark.sql(f'SELECT * FROM {table_name2}')

In [36]:
query1 = 'DROP TABLE IF EXISTS %s' % table_name1
query2 = 'DROP TABLE IF EXISTS %s' % table_name2

spark.sql(query1)
spark.sql(query2)

DataFrame[]

In [37]:
spark.stop()