# Demo 1: HMS Catalog with Apache Iceberg

This lesson demonstrates using **Hive Metastore (HMS)** with Apache Iceberg.

**Architecture:**
- Spark (Query Engine)
- Hive Metastore via Thrift (Catalog)
- PostgreSQL (HMS backing database - private network)
- MinIO (S3-compatible storage)

**Network Configuration:**
- HMS: `hive_metastore:9083` (internal Docker network)
- MinIO: `minio_hms:9000` (internal Docker network)
- Host can access MinIO at `localhost:9400` (port mapping)

**Benefits:**
- Production-ready
- Industry standard
- Compatible with existing Hive infrastructure
- ACID guarantees
- Easy migration to S3

## 1. Configure Spark with HMS Catalog

In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

# Configuration
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"
MINIO_HOST = "minio-hms"  # DNS-safe alias for MinIO container
MINIO_PORT = "9000"  # Internal container port (NOT 9400 which is host port)
MINIO_ENDPOINT = f"{MINIO_HOST}:{MINIO_PORT}"
MINIO_HTTP_ENDPOINT = f"http://{MINIO_HOST}:{MINIO_PORT}"
WAREHOUSE_PATH = "s3a://warehouse"

# HMS configuration
HMS_HOST = "hive-metastore"  # Docker service name for Hive Metastore
HMS_PORT = "9083"  # Internal container port
HMS_URI = f"thrift://{HMS_HOST}:{HMS_PORT}"

conf = (
    pyspark.SparkConf()
        .setAppName('iceberg_hms_catalog')
        .set('spark.jars.packages', 
             'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
             'org.apache.iceberg:iceberg-hive-metastore:1.5.0,'
             'org.apache.hadoop:hadoop-aws:3.3.4,'
             'com.amazonaws:aws-java-sdk-bundle:1.12.262'
            )
        .set('spark.sql.extensions', 
             'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions')
        
        # HMS Catalog Configuration (Iceberg)
        .set('spark.sql.catalog.spark_catalog', 
             'org.apache.iceberg.spark.SparkSessionCatalog')
        .set('spark.sql.catalog.spark_catalog.type', 'hive')
        .set('spark.sql.catalog.spark_catalog.uri', HMS_URI)
        .set('spark.sql.catalog.spark_catalog.warehouse', WAREHOUSE_PATH)
        .set('spark.sql.catalog.spark_catalog.io-impl',
             'org.apache.iceberg.hadoop.HadoopFileIO')

        # HMS Configuration (Hadoop/Spark Hive Client)
        .set('spark.hadoop.hive.metastore.uris', HMS_URI)
        .set('spark.hadoop.hive.metastore.client.connect.retry.delay', '5')
        .set('spark.hadoop.hive.metastore.client.socket.timeout', '1800')
        
        # MinIO S3 Configuration for Iceberg S3FileIO (AWS SDK v2)
        .set('spark.sql.catalog.spark_catalog.s3.endpoint', MINIO_HTTP_ENDPOINT)
        .set('spark.sql.catalog.spark_catalog.s3.path-style-access', 'true')
        .set('spark.sql.catalog.spark_catalog.s3.access-key-id', MINIO_ACCESS_KEY)
        .set('spark.sql.catalog.spark_catalog.s3.secret-access-key', MINIO_SECRET_KEY)
        
        # MinIO S3 Configuration for Hadoop s3a filesystem
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
        .set('spark.hadoop.fs.s3a.endpoint', MINIO_ENDPOINT)
        .set('spark.hadoop.fs.s3a.path.style.access', 'true')
        .set('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
        .set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

        .set("spark.sql.warehouse.dir", "s3a://warehouse")                  # Spark default
        .set("spark.sql.catalog.spark_catalog.warehouse", "s3a://warehouse") # Iceberg catalog
        .set("spark.hadoop.hive.metastore.warehouse.dir", "s3a://warehouse") # Hive
)

# Force stop any existing session first
try:
    spark.stop()
    print("Stopped existing Spark session")
except:
    pass

# NOW create new session
spark = SparkSession.builder.config(conf=conf).getOrCreate()

# ALWAYS set Hadoop config explicitly (this is the critical part)
hadoop_conf = spark.sparkContext._jsc.hadoopConfiguration()
hadoop_conf.set("hive.metastore.uris", HMS_URI)
hadoop_conf.set("hive.metastore.client.connect.retry.delay", "5")
hadoop_conf.set("hive.metastore.client.socket.timeout", "1800")
hadoop_conf.set("fs.s3a.endpoint", MINIO_ENDPOINT)
hadoop_conf.set("fs.s3a.access.key", MINIO_ACCESS_KEY)
hadoop_conf.set("fs.s3a.secret.key", MINIO_SECRET_KEY)
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

print("Hadoop config explicitly set")
print(f"  HMS URI: {hadoop_conf.get('hive.metastore.uris')}")
print(f"  S3 endpoint: {hadoop_conf.get('fs.s3a.endpoint')}")

Hadoop config explicitly set
  HMS URI: thrift://hive-metastore:9083
  S3 endpoint: minio-hms:9000


## 2. Connectivity Tests

Run these tests to verify HMS and MinIO are working before creating tables.


In [2]:
import socket, os, subprocess, sys

print("PID:", os.getpid())
print("Hostname:", os.uname().nodename)

print("Python DNS:", socket.gethostbyname("hive_metastore"))

# shell commands from the same kernel
print(subprocess.check_output(["getent", "hosts", "hive_metastore"]).decode().strip())
print(subprocess.check_output(["cat", "/etc/resolv.conf"]).decode().strip())

PID: 118
Hostname: a3b111b7aa3f
Python DNS: 172.23.0.3
172.23.0.3      hive_metastore
# Generated by Docker Engine.
# This file can be edited; Docker Engine will not make further changes once it
# has been modified.

nameserver 127.0.0.11
search local
options ndots:0

# Based on host file: '/etc/resolv.conf' (internal resolver)
# ExtServers: [host(10.0.0.109) host(1.1.1.1) host(2001:558:feed::1) host(2001:558:feed::2)]
# Overrides: []
# Option ndots from: internal


In [3]:
spark.sparkContext._jvm.org.apache.hadoop.util.VersionInfo.getVersion()

'3.3.4'

In [4]:
import socket
import urllib.request

print("=" * 60)
print("CONNECTIVITY TESTS")
print("=" * 60)

# Test 1: Network DNS Resolution
print("\n1. DNS Resolution:")
for service in ['hive_metastore', 'minio-hms']:
    try:
        ip = socket.gethostbyname(service)
        print(f"   OK: {service} -> {ip}")
    except Exception as e:
        print(f"   FAIL: {service}: {e}")

# Test 2: Port Connectivity
print("\n2. Port Connectivity:")
for service, port in [('hive_metastore', 9083), ('minio-hms', 9000)]:
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(5)
        result = sock.connect_ex((service, port))
        sock.close()
        status = "OK" if result == 0 else "FAIL"
        print(f"   {status}: {service}:{port}")
    except Exception as e:
        print(f"   FAIL: {service}:{port}: {e}")

# Test 3: HMS from Spark
print("\n3. HMS Connection:")
try:
    ns = spark.sql("SHOW NAMESPACES").collect()
    print(f"   OK: Connected ({len(ns)} namespace(s))")
except Exception as e:
    print(f"   FAIL: {e}")

# Test 4: Hadoop Config Check
print("\n4. Hadoop Configuration:")
hconf = spark.sparkContext._jsc.hadoopConfiguration()
print(f"   hive.metastore.uris: {hconf.get('hive.metastore.uris') or 'NOT SET'}")
print(f"   fs.s3a.endpoint: {hconf.get('fs.s3a.endpoint') or 'NOT SET'}")

# Test 5: S3A Write/Read
print("\n5. S3A Write/Read Test:")
try:
    test_df = spark.createDataFrame([(1, "test")], ["id", "val"])
    test_path = "s3a://warehouse/_test"
    test_df.write.mode("overwrite").parquet(test_path)
    count = spark.read.parquet(test_path).count()
    print(f"   OK: Write and read successful (count: {count})")
except Exception as e:
    print(f"   FAIL: {str(e)[:100]}")



# Test 6: Create Iceberg Table
print("\n6. HMS + Iceberg + S3 Integration:")

hconf = spark.sparkContext._jsc.hadoopConfiguration()
print("hive.metastore.uris =", hconf.get("hive.metastore.uris"))
print("spark.sql.catalog.spark_catalog.uri =", spark.conf.get("spark.sql.catalog.spark_catalog.uri", "NOT SET"))

try:
    spark.sql("DROP TABLE IF EXISTS spark_catalog.default._test_table")
    spark.sql("""
        CREATE TABLE spark_catalog.default._test_table (id INT, msg STRING)
        USING iceberg
        LOCATION 's3a://warehouse/_test_table'
    """)
    spark.sql("INSERT INTO spark_catalog.default._test_table VALUES (1, 'success')")
    result = spark.sql("SELECT * FROM spark_catalog.default._test_table").collect()
    spark.sql("DROP TABLE spark_catalog.default._test_table")
    print(f"   OK: HMS + Iceberg + S3 working. Result: {result[0].msg}")
except Exception as e:
    print(f"   FAIL: {str(e)}")

print("\n" + "=" * 60)


CONNECTIVITY TESTS

1. DNS Resolution:
   OK: hive_metastore -> 172.23.0.3
   OK: minio-hms -> 172.23.0.2

2. Port Connectivity:
   OK: hive_metastore:9083
   OK: minio-hms:9000

3. HMS Connection:
   OK: Connected (1 namespace(s))

4. Hadoop Configuration:
   hive.metastore.uris: thrift://hive-metastore:9083
   fs.s3a.endpoint: minio-hms:9000

5. S3A Write/Read Test:
   OK: Write and read successful (count: 1)

6. HMS + Iceberg + S3 Integration:
hive.metastore.uris = thrift://hive-metastore:9083
spark.sql.catalog.spark_catalog.uri = thrift://hive-metastore:9083
   OK: HMS + Iceberg + S3 working. Result: success



## 3. Load Sample Data


In [5]:
# Create default namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS spark_catalog.default")

# Load CSV data
df_2011 = spark.read.csv(
    "../datasets/df_open_2011.csv",
    header=True,
    inferSchema=True
)

df_2011.createOrReplaceTempView("csv_open_2011")

print(f"Loaded {df_2011.count()} rows")
df_2011.show(5)


Loaded 13126 rows
+------------+---------------+---------+--------+------+------+-------------------+-------------------+--------+-------------------+-----------+-------------------+---+------+------+-----------+------------+--------+----+
|competitorId| competitorName|firstName|lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|         regionName|affiliateId|      affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+---------------+---------+--------+------+------+-------------------+-------------------+--------+-------------------+-----------+-------------------+---+------+------+-----------+------------+--------+----+
|       47661|     Dan Bailey|      Dan|  Bailey|   ACT|     M|               NULL|               NULL|       6|       Central East|          0|    CrossFit Legacy| 27|  NULL|  NULL|          1|          43|       1|2011|
|      124483| Joshua Bridges|   Joshua| Bridges|   ACT|     M|               NULL|           

## 4. Create Iceberg Table with HMS + MinIO


In [8]:
# Create Iceberg table from CSV data
spark.sql("""
    CREATE TABLE IF NOT EXISTS spark_catalog.default.df_open_2011_hms
    USING iceberg 
    AS SELECT * FROM csv_open_2011
""")

print("✓ Created Iceberg table: spark_catalog.default.df_open_2011_hms")

✓ Created Iceberg table: spark_catalog.default.df_open_2011_hms


## 5. Query the Table


In [9]:
spark.sql("SELECT * FROM spark_catalog.default.df_open_2011_hms LIMIT 10").show()

spark.sql("SELECT COUNT(*) as total FROM spark_catalog.default.df_open_2011_hms").show()


+------------+--------------------+---------+--------------------+------+------+-------------------+-------------------+--------+-------------------+-----------+-------------------+---+------+------+-----------+------------+--------+----+
|competitorId|      competitorName|firstName|            lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|         regionName|affiliateId|      affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+--------------------+---------+--------------------+------+------+-------------------+-------------------+--------+-------------------+-----------+-------------------+---+------+------+-----------+------------+--------+----+
|       47661|          Dan Bailey|      Dan|              Bailey|   ACT|     M|               NULL|               NULL|       6|       Central East|          0|    CrossFit Legacy| 27|  NULL|  NULL|          1|          43|       1|2011|
|      124483|      Joshua Bridges|   Joshua

## 6. Verify Table in HMS


In [10]:
# Show all tables in HMS
spark.sql("SHOW TABLES IN spark_catalog.default").show()

# Show table details
spark.sql("DESCRIBE EXTENDED spark_catalog.default.df_open_2011_hms").show(truncate=False)


+---------+-------------+-----------+
|namespace|    tableName|isTemporary|
+---------+-------------+-----------+
|         |csv_open_2011|      false|
+---------+-------------+-----------+

+-------------------+---------+-------+
|col_name           |data_type|comment|
+-------------------+---------+-------+
|competitorId       |int      |NULL   |
|competitorName     |string   |NULL   |
|firstName          |string   |NULL   |
|lastName           |string   |NULL   |
|status             |string   |NULL   |
|gender             |string   |NULL   |
|countryOfOriginCode|string   |NULL   |
|countryOfOriginName|string   |NULL   |
|regionId           |int      |NULL   |
|regionName         |string   |NULL   |
|affiliateId        |int      |NULL   |
|affiliateName      |string   |NULL   |
|age                |int      |NULL   |
|height             |string   |NULL   |
|weight             |string   |NULL   |
|overallRank        |int      |NULL   |
|overallScore       |int      |NULL   |
|genderId

## 7. HMS Catalog Connectivity Tests

Test HMS catalog's connections to its dependencies (PostgreSQL and MinIO).


In [None]:
import socket

print("=" * 60)
print("HMS CATALOG CONNECTIVITY TESTS")
print("=" * 60)

# Test 1: Can HMS reach PostgreSQL?
print("\n1. HMS -> PostgreSQL Connection:")
print("   HMS uses PostgreSQL at postgres_hms:5432 (private network)")
try:
    # Try to resolve postgres_hms (may not be accessible from notebook container)
    try:
        ip = socket.gethostbyname('postgres_hms')
        print(f"   OK: Can resolve postgres_hms -> {ip}")
        # Try to connect
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(2)
        result = sock.connect_ex(('postgres_hms', 5432))
        sock.close()
        if result == 0:
            print("   OK: PostgreSQL port 5432 is accessible from notebook")
        else:
            print("   INFO: PostgreSQL not directly accessible (private network)")
            print("         Expected: HMS can access it, notebook cannot")
    except socket.gaierror:
        print("   INFO: Cannot resolve postgres_hms from notebook container")
        print("         Expected: postgres_hms is on HMS private network")
except Exception as e:
    print(f"   INFO: {e}")

# Test 2: Verify HMS is running and accessible
print("\n2. HMS Service Health:")
try:
    # Check if we can connect to HMS Thrift port
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)
    result = sock.connect_ex(('hive-metastore', 9083))
    sock.close()
    if result == 0:
        print("   OK: HMS Thrift service (9083) is running")
    else:
        print("   FAIL: HMS Thrift service not accessible")
    
    # Test HMS via Spark
    spark.sql("SHOW NAMESPACES").collect()
    print("   OK: HMS responds to Spark queries")
    
    # Check if HMS can list tables (means it connected to PostgreSQL)
    tables = spark.sql("SHOW TABLES IN spark_catalog.default").collect()
    print(f"   OK: HMS can query metadata ({len(tables)} tables)")
    print("   OK: HMS -> PostgreSQL connection works")
except Exception as e:
    print(f"   FAIL: HMS health check failed: {e}")

# Test 3: HMS -> MinIO Connection
print("\n3. HMS -> MinIO S3 Connection:")
print("   HMS configured with:")
print("   - HIVE_METASTORE_WAREHOUSE_DIR=s3a://warehouse/")
print("   - AWS_ACCESS_KEY_ID=admin")
print("   - MinIO endpoint via spark.hadoop.fs.s3a.endpoint")

try:
    # Create a table to force HMS to interact with S3
    spark.sql("DROP TABLE IF EXISTS spark_catalog.default._hms_s3_test")
    
    spark.sql("""
        CREATE TABLE spark_catalog.default._hms_s3_test (
            id INT,
            test STRING
        )
        USING iceberg
        LOCATION 's3a://warehouse/test/_hms_s3_test'
    """)
    
    # Insert data (this will write to MinIO via Iceberg)
    spark.sql("INSERT INTO spark_catalog.default._hms_s3_test VALUES (1, 'hms-s3-test')")
    
    # Read back (HMS must read metadata from S3)
    result = spark.sql("SELECT * FROM spark_catalog.default._hms_s3_test").collect()
    
    # Check table location in HMS metadata
    location_df = spark.sql("""
        DESCRIBE EXTENDED spark_catalog.default._hms_s3_test
    """).filter("col_name = 'Location'")
    location = location_df.collect()[0].data_type
    
    print(f"   OK: Created table with S3 location: {location}")
    print("   OK: Wrote data to MinIO via Iceberg")
    print(f"   OK: Read data back: {result[0].test}")
    print("   OK: HMS -> MinIO connection working")
    
    # Cleanup
    spark.sql("DROP TABLE spark_catalog.default._hms_s3_test")
    print("   OK: Test table cleaned up")
    
except Exception as e:
    print(f"   FAIL: HMS -> MinIO test failed: {e}")
    import traceback
    print(f"   Error: {traceback.format_exc()[:300]}")

# Test 4: Verify HMS Warehouse Configuration
print("\n4. HMS Warehouse Configuration:")
try:
    # Check if HMS warehouse dir is set correctly
    result = spark.sql("DESCRIBE EXTENDED spark_catalog.default").filter(
        "col_name = 'Catalog'").collect()
    
    print(f"   Catalog type: {result[0].data_type if result else 'N/A'}")
    
    # Check a table's location to verify S3 path
    tables = spark.sql("SHOW TABLES IN spark_catalog.default").collect()
    if len(tables) > 0:
        first_table = tables[0].tableName
        loc_result = spark.sql(f"""
            DESCRIBE EXTENDED spark_catalog.default.{first_table}
        """).filter("col_name = 'Location'").collect()
        
        if loc_result:
            location = loc_result[0].data_type
            print(f"   Sample table location: {location}")
            if location.startswith("s3a://warehouse"):
                print("   OK: Tables are using correct S3 warehouse path")
            else:
                print("   INFO: Unexpected warehouse path")
except Exception as e:
    print(f"   INFO: {e}")

# Test 5: Network Topology Summary
print("\n5. Network Topology Summary:")
print("   [Notebook] -> [hive-metastore] -> [postgres_hms]")
print("   [Notebook] -> [minio-hms]")
print("   [hive-metastore] -> [minio-hms]")
print("   [hive-metastore] -> [postgres_hms] (private network)")
print("   OK: HMS can reach PostgreSQL and MinIO")
print("   OK: Notebook can reach HMS and MinIO")
print("   OK: Notebook cannot reach PostgreSQL (expected)")

print("\n" + "=" * 60)
print("RESULT: HMS catalog has required connections")
print("=" * 60)
