## Project : Housing Data Delta Table Load

#### Module Import

In [0]:
import pandas as pd
import sklearn
from pyspark.sql import SparkSession



#### Creating Spark Session

In [0]:
spark = SparkSession.builder.appName('Housing_Delta_Table').getOrCreate()

#### Defining Schema for Delta Table

In [0]:
columns=sklearn.datasets.fetch_california_housing().feature_names

In [0]:
columns

Out[8]: ['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude']

In [0]:
schema = [i+' DOUBLE' for i in columns]
schema

Out[12]: ['MedInc DOUBLE',
 'HouseAge DOUBLE',
 'AveRooms DOUBLE',
 'AveBedrms DOUBLE',
 'Population DOUBLE',
 'AveOccup DOUBLE',
 'Latitude DOUBLE',
 'Longitude DOUBLE']

In [0]:
schema=','.join(schema)

#### Creating Delta Table

In [0]:
spark.sql("CREATE TABLE IF NOT EXISTS housing_dataset ({}) USING DELTA " \
          "OPTIONS (PATH '/FileStore/tables/delta_table') ".format(schema) )

Out[27]: DataFrame[]

In [0]:
spark.catalog.listTables()

Out[40]: [Table(name='housing_dataset', catalog='spark_catalog', namespace=['default'], description=None, tableType='EXTERNAL', isTemporary=False)]

In [0]:
spark.sql('select * from housing_dataset').show()

+------+--------+--------+---------+----------+--------+--------+---------+
|MedInc|HouseAge|AveRooms|AveBedrms|Population|AveOccup|Latitude|Longitude|
+------+--------+--------+---------+----------+--------+--------+---------+
+------+--------+--------+---------+----------+--------+--------+---------+



#### Fetching Housing Dataset

In [0]:
data = pd.DataFrame(sklearn.datasets.fetch_california_housing().data,
                    columns=sklearn.datasets.fetch_california_housing().feature_names)

In [0]:
from pprint import pprint
pprint(data)

       MedInc  HouseAge  AveRooms  AveBedrms  Population  AveOccup  Latitude  \
0      8.3252      41.0  6.984127   1.023810       322.0  2.555556     37.88   
1      8.3014      21.0  6.238137   0.971880      2401.0  2.109842     37.86   
2      7.2574      52.0  8.288136   1.073446       496.0  2.802260     37.85   
3      5.6431      52.0  5.817352   1.073059       558.0  2.547945     37.85   
4      3.8462      52.0  6.281853   1.081081       565.0  2.181467     37.85   
...       ...       ...       ...        ...         ...       ...       ...   
20635  1.5603      25.0  5.045455   1.133333       845.0  2.560606     39.48   
20636  2.5568      18.0  6.114035   1.315789       356.0  3.122807     39.49   
20637  1.7000      17.0  5.205543   1.120092      1007.0  2.325635     39.43   
20638  1.8672      18.0  5.329513   1.171920       741.0  2.123209     39.43   
20639  2.3886      16.0  5.254717   1.162264      1387.0  2.616981     39.37   

       Longitude  
0        -122.23  
1

#### Creation of Spark Dataframe

In [0]:
spark_df = spark.createDataFrame(data)

#### Ingest Data into Delta Table

In [0]:
spark_df.write.format("delta").mode("overwrite").save("/FileStore/tables/delta_table")


#### Visualize Data from Delta Table

In [0]:
spark.sql('select * from housing_dataset').show()

+------+--------+------------------+------------------+----------+------------------+--------+---------+
|MedInc|HouseAge|          AveRooms|         AveBedrms|Population|          AveOccup|Latitude|Longitude|
+------+--------+------------------+------------------+----------+------------------+--------+---------+
|8.3252|    41.0| 6.984126984126984|1.0238095238095237|     322.0|2.5555555555555554|   37.88|  -122.23|
|8.3014|    21.0| 6.238137082601054|0.9718804920913884|    2401.0| 2.109841827768014|   37.86|  -122.22|
|7.2574|    52.0| 8.288135593220339| 1.073446327683616|     496.0|2.8022598870056497|   37.85|  -122.24|
|5.6431|    52.0|5.8173515981735155|1.0730593607305936|     558.0| 2.547945205479452|   37.85|  -122.25|
|3.8462|    52.0| 6.281853281853282|1.0810810810810811|     565.0|2.1814671814671813|   37.85|  -122.25|
|4.0368|    52.0| 4.761658031088083|1.1036269430051813|     413.0| 2.139896373056995|   37.85|  -122.25|
|3.6591|    52.0|4.9319066147859925|0.9513618677042801|

In [0]:
spark.sql('DROP TABLE housing_dataset')

Out[25]: DataFrame[]

In [0]:
pip list

Package                           Version
--------------------------------- -----------
argon2-cffi                       20.1.0
async-generator                   1.10
attrs                             21.2.0
backcall                          0.2.0
backports.entry-points-selectable 1.1.1
black                             22.3.0
bleach                            4.0.0
boto3                             1.21.18
botocore                          1.24.18
certifi                           2021.10.8
cffi                              1.14.6
chardet                           4.0.0
charset-normalizer                2.0.4
click                             8.0.3
cryptography                      3.4.8
cycler                            0.10.0
Cython                            0.29.24
dbus-python                       1.2.16
debugpy                           1.4.1
decorator                         5.1.0
defusedxml                        0.7.1
distlib                           0.3.6
distro           