Ingestion data to Iceberg using Pyspark

In [1]:
import subprocess


# Function to install Maven (Linux/macOS only; Windows users must install manually)
def install_maven():
    try:
        # Check if Maven is already installed
        subprocess.run(["mvn", "-version"], check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        print("Maven is already installed.")
    except FileNotFoundError:
        print("Maven not found. Installing Maven...")
        subprocess.run(["sudo", "apt", "update"], check=True)
        subprocess.run(["sudo", "apt", "install", "-y", "maven"], check=True)  # For Linux (apt)
        print("Maven installed successfully.")


# Function to download a specific JAR using Maven
def download_jar(group_id, artifact_id, version):
    try:
        print(f"Downloading JAR: {group_id}:{artifact_id}:{version}")
        subprocess.run([
            "mvn", "dependency:copy",
            f"-Dartifact={group_id}:{artifact_id}:{version}"
        ], check=True)
        print(f"JAR downloaded successfully")
    except subprocess.CalledProcessError as e:
        print(f"Failed to download JAR: {e}")


# Ensure Maven is installed
install_maven()
# Download required JAR(s)
download_jar("org.slf4j", "slf4j-api", "1.7.30")


Maven is already installed.
Downloading JAR: org.slf4j:slf4j-api:1.7.30
[INFO] Scanning for projects...
[INFO] 
[INFO] ------------------< org.apache.maven:standalone-pom >-------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] --------------------------------[ pom ]---------------------------------
[INFO] 
[INFO] --- dependency:3.7.0:copy (default-cli) @ standalone-pom ---
[INFO] Configured Artifact: org.slf4j:slf4j-api:1.7.30:jar
[INFO] org.slf4j:slf4j-api:1.7.30:jar already exists in /Users/truongngocson/Documents/Projects/apache-iceberg/notebooks/${project.basedir}/target/dependency
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  0.428 s
[INFO] Finished at: 2025-08-25T14:16:47+07:00
[INFO] ------------------------------------------------------------------------
JAR downloaded successfully


In [2]:
import pyspark
import os
from pyspark.sql import SparkSession

## DEFINE VARIABLES
catalog_uri = os.getenv('CATALOG_URI', "http://localhost:19120/api/v1")
warehouse = "s3://warehouse/"
storage_uri = os.getenv('STORAGE_URI', "http://127.0.0.1:9000")
# Define the JDBC connection properties
jdbc_url = os.getenv('JDBC_URL', "jdbc:postgresql://localhost:5435/mydb")
properties = {
    "user": "myuser",
    "password": "mypassword",
    "driver": "org.postgresql.Driver"
}
local_jars = ','.join([
    'slf4j-api-1.7.30.jar'
])

## CONFIGURE SPARK SESSION
conf = (
    pyspark.SparkConf()
    .setAppName('Iceberg Ingestion')
    .set('spark.jars.packages',
         'org.postgresql:postgresql:42.7.3,'
         'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.0,'
         'org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.77.1,'
         'software.amazon.awssdk:bundle:2.24.8,'
         'software.amazon.awssdk:url-connection-client:2.24.8')
    .set('spark.sql.extensions',
         'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,'
         'org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
    .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
    .set('spark.sql.catalog.nessie.uri', catalog_uri)
    .set('spark.sql.catalog.nessie.ref', 'main')
    .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
    .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
    .set('spark.sql.catalog.nessie.s3.endpoint', storage_uri)
    .set('spark.sql.catalog.nessie.warehouse', warehouse)
    .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')

)

## START SPARK SESSION
spark = SparkSession.builder.config(conf=conf).getOrCreate()

25/08/25 14:16:48 WARN Utils: Your hostname, MacBook-Air-cua-Ngoc-2.local resolves to a loopback address: 127.0.0.1; using 192.168.1.5 instead (on interface en0)
25/08/25 14:16:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /Users/truongngocson/.ivy2/cache
The jars for the packages stored in: /Users/truongngocson/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c0c733b7-eb17-4e03-885c-b9e9fbc33731;1.0
	confs: [default]
	found org.postgresql#postgresql;42.7.3 in central
	found org.checkerframework#checker-qual;3.42.0 in local-m2-cache
	found org.apache.iceberg#iceberg-spark-runtime-

:: loading settings :: url = jar:file:/Users/truongngocson/Documents/Projects/apache-iceberg/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


	found software.amazon.awssdk#url-connection-client;2.24.8 in central
	found software.amazon.awssdk#utils;2.24.8 in central
	found org.reactivestreams#reactive-streams;1.0.4 in local-m2-cache
	found software.amazon.awssdk#annotations;2.24.8 in central
	found org.slf4j#slf4j-api;1.7.30 in local-m2-cache
	found software.amazon.awssdk#http-client-spi;2.24.8 in central
	found software.amazon.awssdk#metrics-spi;2.24.8 in central
:: resolution report :: resolve 195ms :: artifacts dl 9ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.5.0 from central in [default]
	org.checkerframework#checker-qual;3.42.0 from local-m2-cache in [default]
	org.postgresql#postgresql;42.7.3 from central in [default]
	org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.77.1 from central in [default]
	org.reactivestreams#reactive-streams;1.0.4 from local-m2-cache in [default]
	org.slf4j#slf4j-api;1.7.30 from local-m2-cache in [default]
	software.amazon.awssdk#annotatio

Start the ingestion

In [4]:
print("Spark Running")

# Read the sales_data table from Postgres into a Spark DataFrame
sales_df = spark.read.jdbc(url=jdbc_url, table="fashion_sales", properties=properties)

# Show the first few rows of the dataset
sales_df.show()

#Create a namespace
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.sales;")

# Write the DataFrame to an Iceberg table in the Nessie catalog
sales_df.writeTo("nessie.sales.fashion_sales").createOrReplace()

# Verify that the data was written to Iceberg by reading the table
spark.read.table("nessie.sales.fashion_sales").show()

print("Ingested data successfully into Iceberg")

Spark Running
+---+----------------+-----------+------------+----------+--------------+------------------+--------------+
| id|    product_name|   category|sales_amount|sales_date|store_location|customer_age_group| campaign_name|
+---+----------------+-----------+------------+----------+--------------+------------------+--------------+
|  1|  Slim Fit Jeans|      Denim|       89.99|2024-03-01|      New York|             18-24| Spring Launch|
|  2|  Leather Jacket|  Outerwear|      249.99|2024-03-01|   Los Angeles|             25-34| Spring Launch|
|  3| Graphic T-Shirt|       Tops|       39.99|2024-03-02|       Chicago|             18-24| March Madness|
|  4|    Summer Dress|    Dresses|      129.99|2024-03-03|      New York|             35-44| March Madness|
|  5| Casual Sneakers|   Footwear|       99.99|2024-03-03|   Los Angeles|             25-34| Spring Launch|
|  6|      Silk Scarf|Accessories|      303.30|2025-04-23|       Phoenix|             55-64| Spring Launch|
|  7|  Leather