In [1]:
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pandas as pd

# This CATALOG_URL works for the "docker compose" testing and development environment
# Change 'lakekeeper' if you are not running on "docker compose" (f. ex. 'localhost' if Lakekeeper is running locally).
CATALOG_URL = "http://lakekeeper:8181/catalog"
WAREHOUSE = "irisa-ot"

SPARK_VERSION = pyspark.__version__
SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])
ICEBERG_VERSION = "1.9.2"

In [2]:
ICEBERG_VERSION

'1.9.2'

In [3]:
SPARK_VERSION


'3.5.6'

In [4]:
SPARK_MINOR_VERSION

'3.5'

# Connect with Spark

In [5]:
# Bring in runtime + all bundles (AWS, Azure, GCP) so Lakekeeper can decide
HADOOP_VERSION = "3.4.2"   

spark_jars_packages = (
    f"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},"
    f"org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION},"
    f"org.apache.hadoop:hadoop-aws:{HADOOP_VERSION},"
    f"com.amazonaws:aws-java-sdk-bundle:1.12.698"
)

config = {

    f"spark.sql.catalog.lakekeeper": "org.apache.iceberg.spark.SparkCatalog",
    f"spark.sql.catalog.lakekeeper.type": "rest",
    f"spark.sql.catalog.lakekeeper.uri": CATALOG_URL,
    f"spark.sql.catalog.lakekeeper.warehouse": WAREHOUSE,
    "spark.sql.defaultCatalog": "lakekeeper",
}
    # "spark.jars.packages": spark_jars_packages,
    # we already downloaded the required jars in the spark Dockerfile 

In [6]:
spark_config = SparkConf().setMaster('spark://spark-master:7077').setAppName("Iceberg-REST-Cluster")
for k, v in config.items():
    spark_config = spark_config.set(k, v)

spark = SparkSession.builder.config(conf=spark_config).getOrCreate()

spark.sql("USE lakekeeper")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/09 07:51:01 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


DataFrame[]

## Read and Write Tables

In [7]:
# spark.sql(f"CREATE NAMESPACE IF NOT EXISTS spark_namespace")
spark.sql("SHOW NAMESPACES").toPandas()

Unnamed: 0,namespace
0,irisa
1,banking
2,spark_namespace


In [8]:
data = pd.DataFrame([[1, 'a-string', 2.2]], columns=['id', 'strings', 'floats'])
# sdf = spark.createDataFrame(data)
# sdf.writeTo(f"spark_namespace.cluster_table").createOrReplace()

In [9]:
spark.sql(f"SELECT * FROM banking.source_transactions").toPandas()

                                                                                

Unnamed: 0,transactionId,userId,timestamp,amount,currency,city,country,merchantName,paymentMethod,ipAddress,voucherCode,affiliateId
0,12a10068-bf4f-48d7-b1f5-6350c8d4bb6d,nmunoz,2025-09-09 07:49:19.857246,159.95,USD,Mccannborough,Ireland,Joseph PLC,debit_card,214.235.230.82,DISCOUNT10,408ab1b0-c9a9-4a2d-bc3d-0982e400b09e
1,8dd12659-0eee-422b-bf05-3283138b4189,bellandrew,2025-09-09 07:49:20.730578,555.57,USD,North Paige,Dominica,Bell-Thompson,debit_card,204.83.191.41,,eba763ab-6a10-4dbd-8f16-17e369785561
2,abcc6255-b864-492c-8bab-82404e726477,martin95,2025-09-09 07:49:20.937325,799.16,GBP,West Christianport,Cuba,Morrison-Boyd,debit_card,59.169.19.124,,46d44608-bfac-4d5c-90b0-7edc7bcd92ec
3,285dc3e6-2caf-46bd-8253-26a0b8c50158,perezmichelle,2025-09-09 07:49:21.268129,698.33,USD,East Hannah,Cuba,"Moore, Salazar and Howard",debit_card,160.157.79.243,DISCOUNT10,5dab46f6-842c-461d-97af-3d91a5b57351
4,dc38da22-8a26-404b-91c5-9becf7b880fb,garrettdennis,2025-09-09 07:49:22.033480,851.87,USD,East Gina,Latvia,Johns-Velasquez,debit_card,123.123.227.30,,313897e0-5e89-41a7-a8a4-00d60d410977
...,...,...,...,...,...,...,...,...,...,...,...,...
385,5b2e754a-bbf3-42e6-965c-fc6e39e9fd25,ogreen,2025-09-09 07:50:05.985877,836.83,GBP,North Paulfurt,Latvia,Mills LLC,credit_card,189.237.77.142,,e9d63413-427c-4e39-a7f3-fcd7406664e1
386,43a78a23-d7b9-45f7-a5a6-43c1f8da83db,wrodriguez,2025-09-09 07:50:06.744038,293.14,GBP,Lake William,Botswana,Mcdonald Inc,credit_card,186.156.134.1,DISCOUNT10,20e3080f-bcf1-441a-87f9-e9799af1971e
387,bf3c0d50-63dd-4dd8-afbd-f1acae463e6e,hyates,2025-09-09 07:50:06.945984,393.17,GBP,Michaelstad,Tokelau,Riley-Mata,credit_card,110.218.112.125,,c8fec6aa-b7f2-4271-bb0c-a071f477904a
388,41acc25a-5ba8-4541-964e-2701edbc3936,martinezpaul,2025-09-09 07:50:07.865912,292.51,GBP,Rhodesview,Anguilla,"Simmons, Lee and Mahoney",credit_card,49.140.139.134,,e0117134-5e23-4fca-96aa-188234d4d076


In [None]:
spark.sql("""
SELECT 
    merchantName, 
    SUM(amount) AS total_sales, 
    COUNT(*) AS transaction_count
FROM my_datalake.banking.source_transactions
GROUP BY merchantName
ORDER BY total_sales DESC
LIMIT 10;
""").toPandas()

In [12]:
spark.stop()