<h1>Table of Contents<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"><li><span><a href="#Imports" data-toc-modified-id="Imports-1"><span class="toc-item-num">1&nbsp;&nbsp;</span>Imports</a></span></li><li><span><a href="#Spark" data-toc-modified-id="Spark-2"><span class="toc-item-num">2&nbsp;&nbsp;</span>Spark</a></span></li></ul></div>

### Imports

In [1]:
%matplotlib inline

# disable warnings
import warnings
warnings.filterwarnings('ignore')

# data wrangling
import pandas as pd
import numpy as np
import itertools
import math
from random import randint
from datetime import datetime
from scipy import stats

# data visualization
import matplotlib.pyplot as plt
import seaborn as sns
from mpl_toolkits.mplot3d import Axes3D

# spark
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import col, expr
from pyspark.sql.functions import rand
import pyspark
import multiprocessing
from pyspark.sql import SparkSession
import pyspark.sql.types as T

### Spark

In [2]:
spark = SparkSession.builder.master("local").appName("read").\
    enableHiveSupport().\
    getOrCreate()

1. Read into spark environment (`df_case`, `df_dept`)

In [3]:
df_case = spark.read.format("csv").\
    option("sep", ",").\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/case.csv")

df_dept = spark.read.format("csv").\
    option("sep", ",").\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/dept.csv")

2. Write `df_case` and `df_dept` back to disk into their own directories (`my_cases` and `my_depts`)


In [4]:
df_case.write.format('csv').mode('overwrite').option('header', True).save('sa311/my_cases.csv')
df_dept.write.format('csv').mode('overwrite').option('header', True).save('sa311/my_depts.csv')

3. Write `df_case` and `df_dept` to parquet files (`my_cases_parquet` and `my_depts_parquet`)

In [5]:
df_case.write.format('parquet').mode('overwrite').option('header', True).mode(
    'overwrite').save('sa311/my_cases_parquet')
df_dept.write.format('parquet').mode('overwrite').option('header', True).mode(
    'overwrite').save('sa311/my_depts_parquet')

4. Read your parquet files back into your spark environment.

In [6]:
df_case = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/my_cases_parquet")

df_dept = spark.read.format('parquet').\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/my_depts_parquet")

5. Read `case.csv` and `dept.csv` into a pandas dataframe. (`cases_pdf`, `depts_pdf`)

In [7]:
cases_pdf = pd.read_csv('sa311/case.csv')
depts_pdf = pd.read_csv('sa311/dept.csv')

6. Convert the pandas dataframes into spark dataframes (`cases_sdf`, `depts_sdf`)

In [None]:
depts_pdf.isnull().sum()

In [None]:
depts_pdf.info()

In [11]:
cases_pdf.case_closed_date.fillna('na', inplace=True)
cases_pdf.SLA_due_date.fillna('na', inplace=True)
cases_pdf.num_days_late.fillna(0.0, inplace=True)
cases_pdf.SLA_days.fillna(0.0, inplace=True)

In [12]:
depts_pdf.dept_name.fillna('na', inplace=True)

In [13]:
cases_sdf = spark.createDataFrame(cases_pdf)

In [None]:
depts_sdf = spark.createDataFrame(depts_pdf)

7. Convert the spark dataframes back into pandas dataframes. (`cases_pdf1`, `depts_pdf1`)


In [9]:
sdf_depts = spark.read.format("csv").\
    option("sep", ",").\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/dept.csv")

In [11]:
sdf_cases = spark.read.format("csv").\
    option("sep", ",").\
    option("header", True).\
    option("inferSchema", True).\
    load("sa311/case.csv")

8. Write the spark dataframes (`cases_sdf`, `depts_sdf`) to Hive tables.

In [12]:
import uuid

In [14]:
table_name = "df_" + str(uuid.uuid4().hex)  
sdf_depts.write.saveAsTable(table_name)

In [16]:
table_name2 = "df_" + str(uuid.uuid4().hex)
sdf_cases.write.saveAsTable(table_name2)

In [17]:
table_name

'df_034f4c256c6042e3aa9dd93d8e460616'

In [18]:
table_name2

'df_b45958b336fe4b338c7aee316111b950'