In [57]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# warehouse_location points to the default location for managed databases and tables
warehouse_location = 'hdfs://hdfs-nn:9000/warehouse'

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL Hive integration TABD project") \
    .config("spark.sql.warehouse.dir", warehouse_location) \
    .config("hive.metastore.uris", "thrift://hive-metastore:9083") \
    .enableHiveSupport() \
    .getOrCreate()

In [58]:
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+---------+
|namespace|
+---------+
|  default|
|     demo|
+---------+



In [59]:
spark.sql(
    """
    DROP DATABASE IF EXISTS project_tabd CASCADE
    """
)

DataFrame[]

In [60]:
# you can choose any location in HDFS, just be organized 
# Your data lake will grow with time and will become a swamp
spark.sql(
    """
    CREATE DATABASE project_tabd LOCATION 'hdfs://hdfs-nn:9000/warehouse/tabd.db/'
    """
)

DataFrame[]

In [61]:
spark.sql(
    """
    SHOW DATABASES
    """
).show()

+------------+
|   namespace|
+------------+
|     default|
|        demo|
|project_tabd|
+------------+



In [62]:
spark.sql(
    """
    SHOW TABLES FROM project_tabd
    """
).show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [63]:
spark.sql(
    """
    DROP TABLE IF EXISTS project_tabd.parquet_table
    """
)

spark.sql(
    """
    CREATE EXTERNAL TABLE project_tabd.parquet_table_weather (
        station_id INT,
        month INT,
        day INT,
        station_name VARCHAR(30),
        latitude DECIMAL(10,0),
        longitude DECIMAL(10,0),
        air_temperature_avg FLOAT,
        air_temperature_max FLOAT,
        air_temperature_min FLOAT,
        rain_precipitation_qty FLOAT,
        global_radiation_total FLOAT
        
    )
    STORED AS PARQUET
    PARTITIONED BY (
        year INT
    )
    LOCATION 'hdfs://hdfs-nn:9000/warehouse/tabd.db/parquet_table/'
    """
)

DataFrame[]

In [64]:
spark.sql(
    """
    SHOW TABLES FROM project_tabd
    """
).show()

+------------+--------------------+-----------+
|    database|           tableName|isTemporary|
+------------+--------------------+-----------+
|project_tabd|parquet_table_wea...|      false|
+------------+--------------------+-----------+



In [None]:
# Let's look into HDFS

In [66]:
spark.sql(
    """
    SELECT *
    FROM project_tabd.parquet_table_weather
    """
).show()

+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+
|station_id|month|day|station_name|latitude|longitude|air_temperature_avg|air_temperature_max|air_temperature_min|rain_precipitation_qty|global_radiation_total|year|
+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+
+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+



In [67]:
spark.sql(
    """
    DESCRIBE FORMATTED project_tabd.parquet_table_weather
    """
).toPandas()

Unnamed: 0,col_name,data_type,comment
0,station_id,int,
1,month,int,
2,day,int,
3,station_name,varchar(30),
4,latitude,"decimal(10,0)",
5,longitude,"decimal(10,0)",
6,air_temperature_avg,float,
7,air_temperature_max,float,
8,air_temperature_min,float,
9,rain_precipitation_qty,float,


In [None]:
# Let's put the files into HDFS

In [68]:
# recover partitions is needed so that the Hive Metastore (Catalog)
# is updated. Otherwise Hive and the querying engines do not know
# that there are new parittions in the partitioned table.
spark.catalog.recoverPartitions("project_tabd.parquet_table_weather")

spark.sql(
    """
    SELECT *
    FROM project_tabd.parquet_table_weather
    """
).show()

+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+
|station_id|month|day|station_name|latitude|longitude|air_temperature_avg|air_temperature_max|air_temperature_min|rain_precipitation_qty|global_radiation_total|year|
+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+
+----------+-----+---+------------+--------+---------+-------------------+-------------------+-------------------+----------------------+----------------------+----+



In [69]:
spark.stop()