## Overview of Spark Metastore

Let us get an overview of Spark Metastore and how we can leverage it to manage databases and tables on top of Big Data based file systems such as HDFS, s3 etc.

* Quite often we need to deal with structured data and the most popular way of processing structured data is by using Databases, Tables and then SQL.
* Spark Metastore (similar to Hive Metastore) will facilitate us to manage databases and tables.
* Typically Metastore is setup using traditional relational database technologies such as **Oracle**, **MySQL**, **Postgres** etc.

## Exploring Spark Catalog

Let us get an overview of Spark Catalog to manage Spark Metastore tables as well as temporary views. 
* Let us say `spark` is of type `SparkSession`. There is an attribute as part of `spark` called as catalog and it is of type pyspark.sql.catalog.Catalog.
* We can access catalog using `spark.catalog`.
* We can permanently or temporarily create tables or views on top of data in a Data Frame.
* Metadata such as table names, column names, data types etc for the permanent tables or views will be stored in Metastore. We can access the metadata using `spark.catalog` which is exposed as part of SparkSession object.
* `spark.catalog` also provide us the details related to temporary views that are being created. Metadata of these temporary views will not be stored in Spark Metastore.
* Permanent tables are typically created using databases in spark metastore. If not specified, the tables will be created in **default** database.
* There are several methods that are part of `spark.catalog`. We will explore them in the later topics.
* Following are some of the tasks that can be performed using `spark.catalog` object.
  * Check current database and switch to different databases.
  * Create permanent table in metastore.
  * Create or drop temporary views.
  * Register functions.
* All the above tasks can be performed using SQL style commands passed to `spark.sql`.

In [4]:
from pyspark.sql import SparkSession

import getpass
username = getpass.getuser()

spark = SparkSession. \
    builder. \
    config('spark.ui.port', '0'). \
    config("spark.sql.debug.maxToStringFields", 1000). \
    config("spark.dynamicAllocation.enabled", "false"). \
    config("spark.sql.catalogImplementation","hive"). \
    config("spark.sql.warehouse.dir", "hdfs://0.0.0.0:9000/user/hive/warehouse/"). \
    enableHiveSupport(). \
    appName(f'{username} | Python - Spark Metastore'). \
    master('yarn'). \
    getOrCreate()

# config("hive.metastore.uris", "jdbc:postgresql://localhost:6432/metastore"). \
# ("hive.metastore.uris", "thrift://METASTORE:9083")

In [5]:
spark.catalog


<pyspark.sql.catalog.Catalog at 0x7f8cd8380f40>

## Creating Metastore Tables using catalog

Data Frames can be written into Metastore Tables using APIs such as `saveAsTable` and `insertInto` available as part of write on top of objects of type Data Frame.

* We can create a new table using Data Frame using `saveAsTable`. We can also create an empty table by using `spark.catalog.createTable` or `spark.catalog.createExternalTable`.
* We can also prefix the database name to write data into tables belonging to a particular database. If the database is not specified then the session will be attached to default database.
* We can also attach or connect the current session to a specific database using `spark.catalog.setCurrentDatabase`.
* Databases can be created using `spark.sql("CREATE DATABASE database_name")`. We can list Databases using `spark.sql` or `spark.catalog.listDatabases()`
* We can use modes such as `append`, `overwrite` and `error` with `saveAsTable`. Default is error.
* We can use modes such as `append` and `overwrite` with `insertInto`. Default is append.
* When we use `saveAsTable`, following happens:
  * Check for table if the table already exists. By default `saveAsTable` will throw exception.
  * If the table does not exists the table will be created.
  * Data from Data Frame will be copied into the table.
  * We can alter the behavior by using mode. We can overwrite the existing table or we can append into it.
* We can list the tables using `spark.catalog.listTables` after switching to appropriate database using `spark.catalog.setCurrentDatabase`.
* We can also switch the database and list tables using `spark.sql`.

In [6]:
spark.catalog.listDatabases()

                                                                                

[Database(name='default', description='Default Hive database', locationUri='hdfs://0.0.0.0:9000/user/hive/warehouse'),
 Database(name='retail_db', description='', locationUri='hdfs://0.0.0.0:9000/user/hive/warehouse/retail_db.db')]

In [10]:
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")


21/08/30 11:27:53 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException


In [11]:
spark.sql(f"CREATE DATABASE {username}_demo_db")

21/08/30 11:34:54 WARN ObjectStore: Failed to get database nghiaht7_demo_db, returning NoSuchObjectException


In [12]:
spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='hdfs://0.0.0.0:9000/user/hive/warehouse'),
 Database(name='nghiaht7_demo_db', description='', locationUri='hdfs://0.0.0.0:9000/user/hive/warehouse/nghiaht7_demo_db.db'),
 Database(name='retail_db', description='', locationUri='hdfs://0.0.0.0:9000/user/hive/warehouse/retail_db.db')]

In [14]:
spark.catalog.setCurrentDatabase(f'{username}_demo_db')

In [16]:
spark.catalog.currentDatabase()

'nghiaht7_demo_db'

In [17]:
l = [("X", )]
df = spark.createDataFrame(l, schema="dummy STRING")

In [18]:
spark.catalog.listTables()

[]

In [19]:
df.show()

[Stage 9:>                                                          (0 + 1) / 1]

+-----+
|dummy|
+-----+
|    X|
+-----+



                                                                                

use spark context create dataframe --> write to spark warehouse --> so can manage by spark.catalog

In [32]:
df.write.saveAsTable("dual", mode='overwrite')

                                                                                

In [22]:
spark.catalog.listTables()

[Table(name='dual', database='nghiaht7_demo_db', description=None, tableType='MANAGED', isTemporary=False)]

Can query using SparkSQL

In [23]:
spark.sql("SELECT * FROM nghiaht7_demo_db.dual;")

                                                                                

dummy
X


In [24]:
spark.read.table("dual").show()

[Stage 17:>                                                         (0 + 1) / 1]

+-----+
|dummy|
+-----+
|    X|
+-----+



                                                                                

In [25]:
# direct query because current database is nghiaht7_demo_db

spark.sql('SELECT * FROM dual').show()

[Stage 18:>                                                         (0 + 1) / 1]

+-----+
|dummy|
+-----+
|    X|
+-----+



                                                                                

In [26]:
spark.sql("DROP TABLE dual")
spark.catalog.listTables()

[]

In [28]:
df.schema

StructType(List(StructField(dummy,StringType,true)))

## Create and drop Table using Spark.Catalog

In [29]:
schema = df.schema

In [30]:
spark.catalog.createTable('dual2', schema=schema)

dummy


In [33]:
spark.catalog.listTables()

[Table(name='dual', database='nghiaht7_demo_db', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='dual2', database='nghiaht7_demo_db', description=None, tableType='MANAGED', isTemporary=False)]

In [34]:
df.write.insertInto?

In [36]:
df.write.insertInto('dual2')
df.write.insertInto('dual')

                                                                                

In [37]:
spark.read.table("dual").show()

+-----+
|dummy|
+-----+
|    X|
|    X|
+-----+



In [38]:
spark.sql('SELECT * FROM dual2').show()

+-----+
|dummy|
+-----+
|    X|
|    X|
+-----+



In [39]:
spark.sql("DROP TABLE dual")
spark.sql("DROP TABLE dual2")

In [40]:
spark.sql(f"DROP DATABASE {username}_demo_db")

In [7]:
# We can use CASCADE to drop database along with tables.
spark.sql(f"DROP DATABASE IF EXISTS {username}_demo_db CASCADE")

21/08/30 20:22:43 WARN ObjectStore: Failed to get database nghiaht7_demo_db, returning NoSuchObjectException
21/08/30 20:22:43 WARN ObjectStore: Failed to get database nghiaht7_demo_db, returning NoSuchObjectException


## Inferring Schema for Tables

When we want to create a table using `spark.catalog.createTable` or using `spark.catalog.createExternalTable`, we need to specify Schema.

* Schema can be inferred from the Dataframe and then can be passed using `StructType` object while creating the table.
* `StructType` takes list of objects of type `StructField`.
* `StructField` is built using column name and data type. All the data types are available under `pyspark.sql.types`.
* We need to pass table name and schema for `spark.catalog.createTable`.
* We have to pass path along with name and schema for `spark.catalog.createExternalTable`.
* We can use source to define file format along with applicable options. For example, if we want to create a table for CSV, then source will be csv and we can pass applicable options for CSV such as sep, header etc.

In [None]:
spark.catalog.createExternalTable?

# Signature:
# spark.catalog.createExternalTable(
#     tableName,
#     path=None,
#     source=None,
#     schema=None,
#     **options,
# )

In [9]:
spark.sql(f"CREATE DATABASE IF NOT EXISTS {username}_airtraffic")

21/08/30 20:24:01 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
21/08/30 20:24:01 WARN ObjectStore: Failed to get database nghiaht7_airtraffic, returning NoSuchObjectException


In [10]:
spark.catalog.setCurrentDatabase(f"{username}_airtraffic")

In [11]:
spark.catalog.currentDatabase()

'nghiaht7_airtraffic'

In [21]:
!hdfs dfs -put /home/nghiaht7/data-engineer/data-engineering-essentials/data/airport-codes.csv /user/nghiaht7/

In [23]:
!hdfs dfs -ls -R /user | grep airport

-rw-r--r--   1 nghiaht7 hadoop       6791 2021-08-30 20:33 /user/nghiaht7/airport-codes.csv


In [25]:
airport_codes_path = "hdfs://0.0.0.0:9000/user/nghiaht7/airport-codes.csv"

In [24]:
spark.sql('DROP TABLE IF EXISTS airport_codes')

In [26]:
# tạo từ spark.catalog nên path cũng nên nằm trong phần hdfs://0.0.0.0:9000/user ...

spark.catalog. \
    createExternalTable("airport_codes",
                        path=airport_codes_path,
                        source="csv",
                        sep=",",
                        header="true",
                        inferSchema="true"
                       )

21/08/30 20:34:59 WARN HiveExternalCatalog: Couldn't find corresponding Hive SerDe for data source provider csv. Persisting data source table `nghiaht7_airtraffic`.`airport_codes` into Hive metastore in Spark SQL specific format, which is NOT compatible with Hive.
                                                                                

IATA_CODE,CITY,STATE,COUNTRY
ABE,Allentown,PA,USA
ABI,Abilene,TX,USA
ABQ,Albuquerque,NM,USA
ABR,Aberdeen,SD,USA
ABY,Albany,GA,USA
ACK,Nantucket,MA,USA
ACT,Waco,TX,USA
ACV,Arcata/Eureka,CA,USA
ACY,Atlantic City,NJ,USA
ADK,Adak,AK,USA


In [27]:
spark.catalog.listTables()

[Table(name='airport_codes', database='nghiaht7_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False)]

In [28]:
spark.read.table("airport_codes").show()

[Stage 14:>                                                         (0 + 1) / 1]

+---------+-------------+-----+-------+
|IATA_CODE|         CITY|STATE|COUNTRY|
+---------+-------------+-----+-------+
|      ABE|    Allentown|   PA|    USA|
|      ABI|      Abilene|   TX|    USA|
|      ABQ|  Albuquerque|   NM|    USA|
|      ABR|     Aberdeen|   SD|    USA|
|      ABY|       Albany|   GA|    USA|
|      ACK|    Nantucket|   MA|    USA|
|      ACT|         Waco|   TX|    USA|
|      ACV|Arcata/Eureka|   CA|    USA|
|      ACY|Atlantic City|   NJ|    USA|
|      ADK|         Adak|   AK|    USA|
|      ADQ|       Kodiak|   AK|    USA|
|      AEX|   Alexandria|   LA|    USA|
|      AGS|      Augusta|   GA|    USA|
|      AKN|  King Salmon|   AK|    USA|
|      ALB|       Albany|   NY|    USA|
|      ALO|     Waterloo|   IA|    USA|
|      AMA|     Amarillo|   TX|    USA|
|      ANC|    Anchorage|   AK|    USA|
|      APN|       Alpena|   MI|    USA|
|      ASE|        Aspen|   CO|    USA|
+---------+-------------+-----+-------+
only showing top 20 rows



                                                                                

In [29]:
spark.sql('DESCRIBE FORMATTED airport_codes').show(100, False)

+----------------------------+---------------------------------------------------------+-------+
|col_name                    |data_type                                                |comment|
+----------------------------+---------------------------------------------------------+-------+
|IATA_CODE                   |string                                                   |null   |
|CITY                        |string                                                   |null   |
|STATE                       |string                                                   |null   |
|COUNTRY                     |string                                                   |null   |
|                            |                                                         |       |
|# Detailed Table Information|                                                         |       |
|Database                    |nghiaht7_airtraffic                                      |       |
|Table                       |

In [30]:
spark.catalog.listColumns('airport_codes')

[Column(name='IATA_CODE', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='CITY', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='STATE', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='COUNTRY', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

## Define Schema for Tables using StructType

When we want to create a table using `spark.catalog.createTable` or using `spark.catalog.createExternalTable`, we need to specify Schema.

* Schema can be inferred or we can pass schema using `StructType` object while creating the table..
* `StructType` takes list of objects of type `StructField`.
* `StructField` is built using column name and data type. All the data types are available under `pyspark.sql.types`.
* We need to pass table name and schema for `spark.catalog.createTable`.
* We have to pass path along with name and schema for `spark.catalog.createExternalTable`.

In [31]:
from pyspark.sql.types import StructField, StructType, \
    IntegerType, StringType, FloatType

In [32]:
employeesSchema = StructType([
    StructField("employee_id", IntegerType()),
    StructField("first_name", StringType()),
    StructField("last_name", StringType()),
    StructField("salary", FloatType()),
    StructField("nationality", StringType())
])

In [33]:
employeesSchema

StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))

In [34]:
employeesSchema.simpleString()

'struct<employee_id:int,first_name:string,last_name:string,salary:float,nationality:string>'

In [35]:
spark.sql('DROP TABLE IF EXISTS employees')

In [36]:
spark.catalog.createTable("employees", schema=employeesSchema)

employee_id,first_name,last_name,salary,nationality


In [37]:
spark.catalog.listTables()

[Table(name='airport_codes', database='nghiaht7_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='employees', database='nghiaht7_airtraffic', description=None, tableType='MANAGED', isTemporary=False)]

In [38]:
spark.catalog.listColumns('employees')

[Column(name='employee_id', description=None, dataType='int', nullable=True, isPartition=False, isBucket=False),
 Column(name='first_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='last_name', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False),
 Column(name='salary', description=None, dataType='float', nullable=True, isPartition=False, isBucket=False),
 Column(name='nationality', description=None, dataType='string', nullable=True, isPartition=False, isBucket=False)]

## Inserting into Existing Tables

Let us understand how we can insert data into existing tables using `insertInto`.

* We can use modes such as `append` and `overwrite` with `insertInto`. Default is `append`.
* When we use `insertInto`, following happens:
  * If the table does not exist, `insertInto` will throw an exception.
  * If the table exists, by default data will be appended.
  * We can alter the behavior by using keyword argument overwrite. It is by default False, we can pass True to replace existing data.

In [39]:
employees = [(1, "Scott", "Tiger", 1000.0, "united states"),
             (2, "Henry", "Ford", 1250.0, "India"),
             (3, "Nick", "Junior", 750.0, "united KINGDOM"),
             (4, "Bill", "Gomes", 1500.0, "AUSTRALIA")
            ]

In [40]:
spark.read.table('employees').schema

StructType(List(StructField(employee_id,IntegerType,true),StructField(first_name,StringType,true),StructField(last_name,StringType,true),StructField(salary,FloatType,true),StructField(nationality,StringType,true)))

In [42]:
employeesDF = spark.createDataFrame(employees,
    schema="""employee_id INT, first_name STRING, last_name STRING,
              salary FLOAT, nationality STRING
           """
)

employeesDF.show()

[Stage 21:>                                                         (0 + 1) / 1]

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
+-----------+----------+---------+------+--------------+



                                                                                

In [44]:
employeesDF.write.insertInto("employees", overwrite=True)

                                                                                

In [45]:
spark.read.table("employees").show()

[Stage 24:>                                                         (0 + 1) / 1]

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+



                                                                                

In [46]:
spark.sql('SELECT * FROM employees').show()

[Stage 27:>                                                         (0 + 1) / 1]

+-----------+----------+---------+------+--------------+
|employee_id|first_name|last_name|salary|   nationality|
+-----------+----------+---------+------+--------------+
|          3|      Nick|   Junior| 750.0|united KINGDOM|
|          4|      Bill|    Gomes|1500.0|     AUSTRALIA|
|          1|     Scott|    Tiger|1000.0| united states|
|          2|     Henry|     Ford|1250.0|         India|
+-----------+----------+---------+------+--------------+



                                                                                

## Read and Process data from Metastore Tables

Let us see how we can read tables using functions such as `spark.read.table` and process data using Data Frame APIs.

* Using Data Frame APIs - `spark.read.table("table_name")`.
* We can also prefix the database name to read tables belonging to a particular database.
* When we read the table, it will result in a Data Frame.
* Once Data Frame is created we can use functions such as `filter` or `where`, `groupBy`, `sort` or `orderBy` to process the data in the Data Frame.

In [52]:
airport_codes_df = spark. \
    read. \
    csv(airport_codes_path,
        sep=",",
        header=True,
        inferSchema=True
       )

                                                                                

In [49]:
spark.catalog.listTables()

[Table(name='airport_codes', database='nghiaht7_airtraffic', description=None, tableType='EXTERNAL', isTemporary=False),
 Table(name='employees', database='nghiaht7_airtraffic', description=None, tableType='MANAGED', isTemporary=False)]

In [50]:
airport_codes_catalog = spark.read.table("airport_codes")

In [51]:
type(airport_codes_catalog)

pyspark.sql.dataframe.DataFrame

In [53]:
airport_codes_catalog.printSchema()

root
 |-- IATA_CODE: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- STATE: string (nullable = true)
 |-- COUNTRY: string (nullable = true)



In [56]:
from pyspark.sql.functions import count, lit, col

airport_codes_catalog. \
    groupBy("state"). \
    agg(count(lit(1)).alias('airport_count')). \
    orderBy(col('airport_count').desc()). \
    show()



+-----+-------------+
|state|airport_count|
+-----+-------------+
|   TX|           24|
|   CA|           22|
|   AK|           19|
|   FL|           17|
|   MI|           15|
|   NY|           14|
|   CO|           10|
|   MN|            8|
|   PA|            8|
|   ND|            8|
|   NC|            8|
|   WI|            8|
|   MT|            8|
|   LA|            7|
|   IL|            7|
|   GA|            7|
|   VA|            7|
|   WY|            6|
|   ID|            6|
|   AL|            5|
+-----+-------------+
only showing top 20 rows



                                                                                

## Creating Partitioned Tables

We can also create partitioned tables as part of Spark Metastore Tables.

* There are some challenges in creating partitioned tables directly using `spark.catalog.createTable`.
* But if the directories are similar to partitioned tables with data, we should be able to create partitioned tables.
* Let us create partitioned table for `orders` by `order_month`.

## Saving as Partitioned Tables

We can also create partitioned tables while using `saveAsTable` function to write data from Dataframe into a metastore table.

* Let us create partitioned table for `orders` by `order_month`.

Creating Temp Views
So far we spoke about permanent metastore tables. Now let us understand how to create temporary views using a Data Frame.

We can create temporary view for a Data Frame using createTempView or createOrReplaceTempView.
createOrReplaceTempView will replace existing view, if it already exists.
While tables in Metastore are permanent, views are temporary.
Once the application exits, temporary views will be deleted or flushed out.

## Using Spark SQL

Let us understand how we can use Spark SQL to process data in Metastore Tables and Temporary Views.

* Once tables in metastore or temporary views are created, we can run queries against the tables or temporary views to perform all standard transformations.
* We will create metastore tables for orders and order_items data sets. We will also create temporary view for products data set.
* We will create metastore tables using `spark.sql` by passing `CREATE TABLE` statements as strings.
* Using Spark SQL, we will join metastore tables and temporary view in the same query.