## Spark SQL Catalog

In [2]:
! pip install delta

Collecting delta
  Downloading delta-0.4.2.tar.gz (4.1 kB)
  Preparing metadata (setup.py) ... [?25ldone
[?25hBuilding wheels for collected packages: delta
  Building wheel for delta (setup.py) ... [?25ldone
[?25h  Created wheel for delta: filename=delta-0.4.2-py3-none-any.whl size=2928 sha256=330ea2043a65e564af99ab42082bb9448b41fdcdbc81dca746dbcb7a566dcc84
  Stored in directory: /home/jovyan/.cache/pip/wheels/06/c9/f4/15ff81c648b9fc73aae5886b41204ada25bd73cbb41b9fad78
Successfully built delta
Installing collected packages: delta
Successfully installed delta-0.4.2


In [None]:
! pip install delta-spark

Collecting delta-spark
  Downloading delta_spark-2.4.0-py3-none-any.whl (20 kB)
Collecting pyspark<3.5.0,>=3.4.0
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [91m━━━━━━━━[0m[90m╺[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m65.3/310.8 MB[0m [31m1.1 MB/s[0m eta [36m0:03:37[0m

In [3]:
import os
# Import required modules
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from datetime import datetime
from pyspark.sql.functions import col
from pyspark.sql import functions as f
from pyspark.sql import *
from pyspark.sql.types import *
# Delta is a storage layer for data lakes
from delta.tables import * 
# DeltaTable is the main class for Delta tables
from delta.tables import DeltaTable

ModuleNotFoundError: No module named 'delta.tables'

In [2]:
# Initialize Spark session with Hive support
spark = SparkSession.builder \
    .appName("ListDatabasesExample") \
    .config("spark.driver.extraClassPath", "/home/jovyan/work/jars/*") \
    .enableHiveSupport() \
    .getOrCreate()

# List all databases
databases = spark.sql("SHOW DATABASES")
databases.show(truncate=False)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


+---------+
|namespace|
+---------+
|default  |
+---------+



In [9]:
# Using the Spark SQL Catalog to List Databases

spark.catalog.listDatabases()

[Database(name='default', description='Default Hive database', locationUri='hdfs://namenode:8020/user/hive/warehouse')]

In [7]:
from pyspark.sql import Row

# Convert the list of databases to a list of rows
rows = [Row(databaseName=db.name, description=db.description, locationUri=db.locationUri) for db in spark.catalog.listDatabases()]

# Create a DataFrame from the list of rows
df = spark.createDataFrame(rows)

# Show the DataFrame without truncating
df.show(truncate=False)

[Stage 5:=====>                                                   (1 + 10) / 11]

+------------+---------------------+----------------------------------------+
|databaseName|description          |locationUri                             |
+------------+---------------------+----------------------------------------+
|default     |Default Hive database|hdfs://namenode:8020/user/hive/warehouse|
+------------+---------------------+----------------------------------------+



                                                                                

## Import Data

In [11]:
# Load the necessary PySpark libraries
from pyspark.sql import SparkSession

# Configure PostgreSQL JDBC connection properties
jdbc_driver = spark.conf.get("spark.jdbc.driver.class", "org.postgresql.Driver")
db_host = spark.conf.get("spark.jdbc.host", "oasispostgres")
db_port = spark.conf.get("spark.jdbc.port", "5432")
default_db = spark.conf.get("spark.jdbc.default.db", "airflow")
db_table = spark.conf.get("spark.jdbc.table", "bettercustomers")
db_user = spark.conf.get("spark.jdbc.user", "airflow")
db_pass = spark.conf.get("spark.jdbc.password", "airflow")

# Construct the connection URL
connection_url = f"jdbc:postgresql://{db_host}:{db_port}/{default_db}"

# Read the data from the PostgreSQL table
better_customers = spark.read \
    .format("jdbc") \
    .option("url", connection_url) \
    .option("driver", jdbc_driver) \
    .option("dbtable", db_table) \
    .option("user", db_user) \
    .option("password", db_pass) \
    .load()

# Show the data
better_customers.show()

# Create or replace a temporary view with the data
better_customers.createOrReplaceTempView("customers")

+---+--------------------+--------------------+----------+---------+--------------------+
| id|             created|             updated|first_name|last_name|               email|
+---+--------------------+--------------------+----------+---------+--------------------+
|  1|2023-03-19 23:22:...|2023-03-19 23:22:...|      John|      Doe| johndoe@example.com|
|  2|2023-03-19 23:22:...|2023-03-19 23:22:...|      Jane|    Smith|janesmith@example...|
|  3|2023-03-19 23:22:...|2023-03-19 23:22:...|       Bob|  Johnson|bobjohnson@exampl...|
|  4|2023-03-19 23:22:...|2023-03-19 23:22:...|     Alice|      Lee|alicelee@example.com|
|  5|2023-03-19 23:22:...|2023-03-19 23:22:...|     David|      Kim|davidkim@example.com|
|  6|2023-03-19 23:22:...|2023-03-19 23:22:...|     Linda|   Nguyen|lindanguyen@examp...|
|  7|2023-03-19 23:22:...|2023-03-19 23:22:...|      Mike|   Garcia|mikegarcia@exampl...|
|  8|2023-03-19 23:22:...|2023-03-19 23:22:...|     Emily|     Chen|emilychen@example...|
|  9|2023-

In [19]:
db_name = "coffee_co_common"
db_description = "This database stores common information regarding inventory, stores, and customers"

# Find the default database in the list of databases
default_database = None
for db in spark.catalog.listDatabases():
    if db.name == "default":
        default_database = db
        break

if default_database is None:
    raise ValueError("Default database not found")

default_warehouse = default_database.locationUri
warehouse_prefix = f"{default_warehouse}/common"

# Create the database using Spark SQL
spark.sql(f"""
CREATE DATABASE IF NOT EXISTS {db_name}
COMMENT '{db_description}'
LOCATION '{warehouse_prefix}'
WITH DBPROPERTIES(TEAM='core', LEAD='scott', TEAM_SLACK='#help_coffee_common');
""")

DataFrame[]

In [26]:
from pyspark.sql import Row

# Convert the list of databases to a list of rows
rows = [Row(databaseName=db.name, description=db.description, locationUri=db.locationUri) for db in spark.catalog.listDatabases()]

# Create a DataFrame from the list of rows
df = spark.createDataFrame(rows)

# Show the DataFrame without truncating
df.show(truncate=False)

+----------------+----------------------------------------------------------------------------------+-----------------------------------------------+
|databaseName    |description                                                                       |locationUri                                    |
+----------------+----------------------------------------------------------------------------------+-----------------------------------------------+
|coffee_co_common|This database stores common information regarding inventory, stores, and customers|hdfs://namenode:8020/user/hive/warehouse/common|
|default         |Default Hive database                                                             |hdfs://namenode:8020/user/hive/warehouse       |
+----------------+----------------------------------------------------------------------------------+-----------------------------------------------+



In [23]:
tables = spark.catalog.listTables()

# Define schema for the DataFrame
schema = StructType([
    StructField("database", StringType(), True),
    StructField("tableName", StringType(), True),
    StructField("isTemporary", StringType(), True),
])

# Extract relevant data from the Table objects
table_data = [(table.database, table.name, str(table.isTemporary)) for table in tables]

# Create DataFrame with the table data
tables_df = spark.createDataFrame(table_data, schema)
tables_df.show(truncate=False)



+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
|null    |customers|True       |
+--------+---------+-----------+



                                                                                

In [24]:
# Set the current database to coffee_co_common
coffee_co_database_name = "coffee_co_common"
spark.catalog.setCurrentDatabase(coffee_co_database_name)

# Assuming that you have a DataFrame named better_customers
# Save the DataFrame as a persistent table in the current database
better_customers.write \
    .mode("error") \
    .saveAsTable("customers")

                                                                                

In [25]:
tables = spark.catalog.listTables()

# Define schema for the DataFrame
schema = StructType([
    StructField("database", StringType(), True),
    StructField("tableName", StringType(), True),
    StructField("isTemporary", StringType(), True),
])

# Extract relevant data from the Table objects
table_data = [(table.database, table.name, str(table.isTemporary)) for table in tables]

# Create DataFrame with the table data
tables_df = spark.createDataFrame(table_data, schema)
tables_df.show(truncate=False)

+----------------+---------+-----------+
|database        |tableName|isTemporary|
+----------------+---------+-----------+
|coffee_co_common|customers|False      |
|null            |customers|True       |
+----------------+---------+-----------+



In [28]:
# Define the database and table name
db_name = "coffee_co_common"
table_name = "customers"

# Check if the table exists in the specified database
table_exists = spark.catalog._jcatalog.tableExists(db_name, table_name)

In [29]:
table_exists

True