Using case.csv & dept.csv: 
1. read into spark environment (df_case, df_dept) 
2. write df_case and df_dept back to disk into their own directories (my_cases and my_depts) 
3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet) 
4. Read your parquet files back into your spark environment. 
5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf) 
6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf) 
7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1) 
8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables. 
9. Explore the Hive database/tables you have created using the methods in the lesson. 
10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

read into environment <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [1]:
import pyspark
spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [2]:
df_case = (spark.read
 .option('header', True)
 .format('csv')
 .load('./sa311/case.csv'))

In [3]:
df_case.head()

Row(case_id='1014127332', case_opened_date='1/1/18 0:42', case_closed_date='1/1/18 12:29', SLA_due_date='9/26/20 0:42', case_late='NO', num_days_late='-998.5087616000001', case_closed='YES', dept_division='Field Operations', service_request_type='Stray Animal', SLA_days='999.0', case_status='Closed', source_id='svcCRMLS', request_address='2315  EL PASO ST, San Antonio, 78207', council_district='5')

In [4]:
df_dept = (spark.read
 .option('header', True)
 .format('csv')
 .load('./sa311/dept.csv'))

In [5]:
df_dept.head()

Row(dept_division='311 Call Center', dept_name='Customer Service', standardized_dept_name='Customer Service', dept_subject_to_SLA='YES')

write back into their own directories <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [6]:
df_case.write.format('csv').mode("overwrite").\
    option("header","true").save("sa311/df_case")

In [7]:
df_dept.write.format('csv').mode("overwrite").\
    option("header","true").save("sa311/df_dept")

write to parquet files <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [8]:
df_case.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/df_case_parquet')

In [9]:
df_dept.write.format('parquet').mode('overwrite').\
    option('header','true').save('sa311/df_dept_parquet')

read parquet back into spark <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [10]:
df_case = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/df_case_parquet")

In [11]:
df_dept = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/df_dept_parquet")

read csv files to pandas df <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [12]:
import pandas as pd
cases_pdf= pd.read_csv("sa311/case.csv", sep=",")
cases_pdf.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [13]:
depts_pdf = pd.read_csv("sa311/dept.csv", sep=",")
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


In [14]:
#pandas to spark

In [15]:
# import pyspark.sql.types as T

In [16]:
# schema = T.StructType([T.StructField('x', T.StringType())])

convert spark df to Hive tables <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [17]:
import uuid   # Create unique table name
table_name = "df_" + str(uuid.uuid4().hex)  
df_case.write.saveAsTable(table_name)

In [18]:
table_name2 = "df_" + str(uuid.uuid4().hex)  
df_dept.write.saveAsTable(table_name2)

explore Hive tables <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [26]:
query1 = "DESCRIBE %s" % table_name
hive_cases = spark.sql(query1)
hive_cases.show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   string|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   string|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   string|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   string|   null|
+--------------------+---------+-------+



In [27]:
query2 = "DESCRIBE %s" % table_name2
hive_depts = spark.sql(query2)
hive_depts.show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|       dept_division|   string|   null|
|           dept_name|   string|   null|
|standardized_dept...|   string|   null|
| dept_subject_to_SLA|   string|   null|
+--------------------+---------+-------+



read Hive tables to spark df <<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

In [31]:
spark.sql("SHOW DATABASES").show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [32]:
spark.sql("USE default")
spark.sql("SHOW TABLES").show()

+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|df_52994663b53044...|      false|
| default|df_bcd53088db2d44...|      false|
+--------+--------------------+-----------+



In [33]:
spark.sql(f"DESCRIBE {table_name}").show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|   string|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   string|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   string|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|   string|   null|
+--------------------+---------+-------+



In [34]:
spark.sql(f"SELECT * FROM {table_name} LIMIT 10").show()

+----------+----------------+----------------+-------------+---------+------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|     num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO|      -0.077511574|        YES|Field Operations|     Officer Standby|      0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO|      -2.743912037|        YES|Waste Collection|           No Pickup|   3.801875| 

In [35]:
cases_sdf = spark.sql(f"SELECT * FROM {table_name}")
cases_sdf.show(5)

+----------+----------------+----------------+-------------+---------+------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date| SLA_due_date|case_late|     num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+-------------+---------+------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014551581|   5/28/18 13:14|   5/28/18 14:23|5/28/18 16:14|       NO|      -0.077511574|        YES|Field Operations|     Officer Standby|      0.125|     Closed|  NO10960|7003  RAVENSDALE,...|               6|
|1014551583|   5/28/18 13:15|   5/29/18 14:38|  6/1/18 8:30|       NO|      -2.743912037|        YES|Waste Collection|           No Pickup|   3.801875| 