In [1]:
import pyspark
import multiprocessing
import uuid   # Create unique table name

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.types as T

### 1. Read into spark environment (df_case, df_dept)

In [2]:
spark = SparkSession.builder.master("local").appName("read").\
    enableHiveSupport().\
    getOrCreate()

In [3]:
df_case = spark.read.csv('./sa311/case.csv', sep=",", header=True, inferSchema=True)

In [4]:
df_case.show()

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Remova

In [5]:
df_dept = spark.read.csv('./sa311/dept.csv', sep=",", header=True, inferSchema=True)

In [6]:
df_dept.show()

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Code Enforcement ...|                null|  DSD/Code Enforcement|                YES|
|   Dangerous Premise|Code Enforcement ...|  DSD/Code Enforcement|                YES|
|Dangerous Premise...|Code Enforcement ...|

### 2. Write df_case and df_dept back to disk into their own directories (my_cases and my_depts) 

In [7]:
df_case.write.format('csv').mode("overwrite").option("header","true").save("sa311/my_cases")

In [8]:
df_dept.write.format('csv').mode("overwrite").option("header","true").save("sa311/my_depts")

### 3. Write df_case and df_dept to parquet files (my_cases_parquet and my_depts_parquet) 

In [9]:
df_case.write.format('parquet').mode('overwrite').option('header','true').save('sa311/my_cases_parquet')

In [10]:
df_dept.write.format('parquet').mode('overwrite').option('header','true').save('sa311/my_depts_parquet')

### 4. Read your parquet files back into your spark environment. 


In [11]:
spark.read.parquet('sa311/my_cases_parquet')

DataFrame[case_id: int, case_opened_date: string, case_closed_date: string, SLA_due_date: string, case_late: string, num_days_late: double, case_closed: string, dept_division: string, service_request_type: string, SLA_days: double, case_status: string, source_id: string, request_address: string, council_district: int]

In [12]:
spark.read.parquet('sa311/my_depts_parquet')

DataFrame[dept_division: string, dept_name: string, standardized_dept_name: string, dept_subject_to_SLA: string]

### 5. Read case.csv and dept.csv into a pandas dataframe. (cases_pdf, depts_pdf) 

In [13]:
cases_pdf = pd.read_csv('sa311/case.csv', sep=',')
cases_pdf.head()

Unnamed: 0,case_id,case_opened_date,case_closed_date,SLA_due_date,case_late,num_days_late,case_closed,dept_division,service_request_type,SLA_days,case_status,source_id,request_address,council_district
0,1014127332,1/1/18 0:42,1/1/18 12:29,9/26/20 0:42,NO,-998.508762,YES,Field Operations,Stray Animal,999.0,Closed,svcCRMLS,"2315 EL PASO ST, San Antonio, 78207",5
1,1014127333,1/1/18 0:46,1/3/18 8:11,1/5/18 8:30,NO,-2.012604,YES,Storm Water,Removal Of Obstruction,4.322222,Closed,svcCRMSS,"2215 GOLIAD RD, San Antonio, 78223",3
2,1014127334,1/1/18 0:48,1/2/18 7:57,1/5/18 8:30,NO,-3.022338,YES,Storm Water,Removal Of Obstruction,4.320729,Closed,svcCRMSS,"102 PALFREY ST W, San Antonio, 78223",3
3,1014127335,1/1/18 1:29,1/2/18 8:13,1/17/18 8:30,NO,-15.011481,YES,Code Enforcement,Front Or Side Yard Parking,16.291887,Closed,svcCRMSS,"114 LA GARDE ST, San Antonio, 78223",3
4,1014127336,1/1/18 1:34,1/1/18 13:29,1/1/18 4:34,YES,0.372164,YES,Field Operations,Animal Cruelty(Critical),0.125,Closed,svcCRMSS,"734 CLEARVIEW DR, San Antonio, 78228",7


In [14]:
cases_pdf.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 841704 entries, 0 to 841703
Data columns (total 14 columns):
case_id                 841704 non-null int64
case_opened_date        841704 non-null object
case_closed_date        823594 non-null object
SLA_due_date            841671 non-null object
case_late               841704 non-null object
num_days_late           841671 non-null float64
case_closed             841704 non-null object
dept_division           841704 non-null object
service_request_type    841704 non-null object
SLA_days                841671 non-null float64
case_status             841704 non-null object
source_id               841704 non-null object
request_address         841704 non-null object
council_district        841704 non-null int64
dtypes: float64(2), int64(2), object(10)
memory usage: 89.9+ MB


In [15]:
# cases_pdf.case_opened_date = pd.to_datetime(cases_pdf.case_opened_date, infer_datetime_format=True)
# cases_pdf.case_closed_date = pd.to_datetime(cases_pdf.case_closed_date, infer_datetime_format=True)
# cases_pdf.SLA_due_date = pd.to_datetime(cases_pdf.SLA_due_date, infer_datetime_format=True)

In [16]:
depts_pdf = pd.read_csv('sa311/dept.csv', sep=',')
depts_pdf.head()

Unnamed: 0,dept_division,dept_name,standardized_dept_name,dept_subject_to_SLA
0,311 Call Center,Customer Service,Customer Service,YES
1,Brush,Solid Waste Management,Solid Waste,YES
2,Clean and Green,Parks and Recreation,Parks & Recreation,YES
3,Clean and Green Natural Areas,Parks and Recreation,Parks & Recreation,YES
4,Code Enforcement,Code Enforcement Services,DSD/Code Enforcement,YES


In [17]:

mySchema = T.StructType([ StructField("case_id", IntegerType(), True)\
                       ,StructField("case_opened_date", StringType(), True)\
                       ,StructField("case_closed_date", StringType(), True)\
                       ,StructField("SLA_due_date", StringType(), True)\
                       ,StructField("case_late", StringType(), True)\
                       ,StructField("num_days_late", IntegerType(), True)\
                       ,StructField("case_closed", StringType(), True)\
                       ,StructField("dept_division", StringType(), True)\
                       ,StructField("service_request_type", StringType(), True)\
                       ,StructField("SLA_days", IntegerType(), True)\
                       ,StructField("case_status", StringType(), True)\
                       ,StructField("source_id", StringType(), True)\
                       ,StructField("request_address", StringType(), True)\
                       ,StructField("council_district", IntegerType(), True)])

### 6. Convert the pandas dataframes into spark dataframes (cases_sdf, depts_sdf) 


In [61]:
cases_sdf = spark.createDataFrame(cases_pdf, schema=mySchema)
cases_sdf.show(5)

TypeError: field num_days_late: IntegerType can not accept object -998.5087616 in type <class 'float'>

### 7. Convert the spark dataframes back into pandas dataframes. (cases_pdf1, depts_pdf1)

### 8. Write the spark dataframes (cases_sdf, depts_sdf) to Hive tables. 

In [19]:
df_case.show()

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Remova

In [20]:
table_name = "df_" + str(uuid.uuid4().hex)  
df_case.write.saveAsTable(table_name)
table_name

'df_228c7d59e3b440dc994c578cda233aca'

In [21]:
query = "DESCRIBE %s" % table_name
spark.sql(query).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



In [41]:
table_name2 = "df_" + str(uuid.uuid4().hex)  
df_dept.write.saveAsTable(table_name2)
table_name2

'df_f38d1bdc3f2649298545b7e0fd9e055e'

In [23]:
query = "DESCRIBE %s" % table_name2
spark.sql(query).show()

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
|             case_id|      int|   null|
|    case_opened_date|   string|   null|
|    case_closed_date|   string|   null|
|        SLA_due_date|   string|   null|
|           case_late|   string|   null|
|       num_days_late|   double|   null|
|         case_closed|   string|   null|
|       dept_division|   string|   null|
|service_request_type|   string|   null|
|            SLA_days|   double|   null|
|         case_status|   string|   null|
|           source_id|   string|   null|
|     request_address|   string|   null|
|    council_district|      int|   null|
+--------------------+---------+-------+



### 9. Explore the Hive database/tables you have created using the methods in the lesson. 

In [9]:
spark.sql("SHOW DATABASES").show()


+------------+
|databaseName|
+------------+
|     default|
+------------+



In [10]:
spark.sql("USE default")
spark.sql("SHOW TABLES").show()



+--------+--------------------+-----------+
|database|           tableName|isTemporary|
+--------+--------------------+-----------+
| default|df_228c7d59e3b440...|      false|
| default|df_f38d1bdc3f2649...|      false|
+--------+--------------------+-----------+



In [11]:
spark.catalog.listTables()

[Table(name='df_228c7d59e3b440dc994c578cda233aca', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='df_f38d1bdc3f2649298545b7e0fd9e055e', database='default', description=None, tableType='MANAGED', isTemporary=False)]

In [14]:
spark.catalog.listTables()

[Table(name='df_228c7d59e3b440dc994c578cda233aca', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='df_f38d1bdc3f2649298545b7e0fd9e055e', database='default', description=None, tableType='MANAGED', isTemporary=False)]

### Accidentally created multiple tables, so deleted them using the DROP TABLE spark sql query above...

### 10. Read from the tables into two spark dataframes (cases_sdf, depts_sdf)

In [15]:
cases_sdf = spark.sql(f"SELECT * FROM {'df_228c7d59e3b440dc994c578cda233aca'}")
cases_sdf.show(5)

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|   SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+-----------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|      999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Removal Of Obstru...|4.322222222| 

In [16]:
dept_sdf = spark.sql(f"SELECT * FROM {'df_f38d1bdc3f2649298545b7e0fd9e055e'}")
dept_sdf.show(5)

+--------------------+--------------------+----------------------+-------------------+
|       dept_division|           dept_name|standardized_dept_name|dept_subject_to_SLA|
+--------------------+--------------------+----------------------+-------------------+
|     311 Call Center|    Customer Service|      Customer Service|                YES|
|               Brush|Solid Waste Manag...|           Solid Waste|                YES|
|     Clean and Green|Parks and Recreation|    Parks & Recreation|                YES|
|Clean and Green N...|Parks and Recreation|    Parks & Recreation|                YES|
|    Code Enforcement|Code Enforcement ...|  DSD/Code Enforcement|                YES|
+--------------------+--------------------+----------------------+-------------------+
only showing top 5 rows



### Change the data types of these dataframes...

In [17]:
cases_sdf.show()

+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|case_opened_date|case_closed_date|SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+----------------+----------------+------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|     1/1/18 0:42|    1/1/18 12:29|9/26/20 0:42|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|     1/1/18 0:46|     1/3/18 8:11| 1/5/18 8:30|       NO|-2.0126041669999997|        YES|     Storm Water|Remova

In [19]:
cases_sdf.select('case_opened_date').show(3)

+----------------+
|case_opened_date|
+----------------+
|     1/1/18 0:42|
|     1/1/18 0:46|
|     1/1/18 0:48|
+----------------+
only showing top 3 rows



In [22]:
from pyspark.sql import functions as f

cases_sdf = cases_sdf.withColumn("case_opened_date", 
                   f.to_timestamp(f.col("case_opened_date"), 
                                  "M/d/yy H:mm")).\
        withColumn("case_closed_date", 
                   f.to_timestamp(f.col("case_closed_date"),
                                  "M/d/yy H:mm")).\
        withColumn("SLA_due_date", 
                   f.to_timestamp(f.col("SLA_due_date"),
                                  "M/d/yy H:mm"))

In [23]:
cases_sdf.show()

+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|   case_id|   case_opened_date|   case_closed_date|       SLA_due_date|case_late|      num_days_late|case_closed|   dept_division|service_request_type|          SLA_days|case_status|source_id|     request_address|council_district|
+----------+-------------------+-------------------+-------------------+---------+-------------------+-----------+----------------+--------------------+------------------+-----------+---------+--------------------+----------------+
|1014127332|2018-01-01 00:42:00|2018-01-01 12:29:00|2020-09-26 00:42:00|       NO| -998.5087616000001|        YES|Field Operations|        Stray Animal|             999.0|     Closed| svcCRMLS|2315  EL PASO ST,...|               5|
|1014127333|2018-01-01 00:46:00|2018-01-03 08:11:00|2018-01-05 08:30:00|

In [24]:
cases_sdf.printSchema()

root
 |-- case_id: integer (nullable = true)
 |-- case_opened_date: timestamp (nullable = true)
 |-- case_closed_date: timestamp (nullable = true)
 |-- SLA_due_date: timestamp (nullable = true)
 |-- case_late: string (nullable = true)
 |-- num_days_late: double (nullable = true)
 |-- case_closed: string (nullable = true)
 |-- dept_division: string (nullable = true)
 |-- service_request_type: string (nullable = true)
 |-- SLA_days: double (nullable = true)
 |-- case_status: string (nullable = true)
 |-- source_id: string (nullable = true)
 |-- request_address: string (nullable = true)
 |-- council_district: integer (nullable = true)



In [26]:
dept_sdf.printSchema()

root
 |-- dept_division: string (nullable = true)
 |-- dept_name: string (nullable = true)
 |-- standardized_dept_name: string (nullable = true)
 |-- dept_subject_to_SLA: string (nullable = true)

