In [1]:
import pyspark
from pyspark.sql import SparkSession
import os

In [3]:
## DEFINE SENSITIVE VARIABLES
NESSIE_URI = "http://nessie:19120/api/v1"
MINIO_ACCESS_KEY = "admin"
MINIO_SECRET_KEY = "password"

In [4]:
conf = (
    pyspark.SparkConf()
        .setAppName('iceberg-demo')
  		#packages
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.1,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.3_2.12:0.67.0,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
  		#SQL Extensions
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
  		#Configuring Catalog
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.warehouse', 's3a://warehouse')
        .set('spark.sql.catalog.nessie.s3.endpoint', 'http://minio:9000')
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
  		#MINIO CREDENTIALS
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_KEY)
)

In [5]:
## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

:: loading settings :: url = jar:file:/home/docker/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/docker/.ivy2/cache
The jars for the packages stored in: /home/docker/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.3_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
software.amazon.awssdk#url-connection-client added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9436d9e4-e557-4c5b-a8bd-4e1246ec1274;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.3_2.12;1.3.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.3_2.12;0.67.0 in central
	found software.amazon.awssdk#bundle;2.17.178 in central
	found software.amazon.eventstream#eventstream;1.0.1 in central
	found software.amazon.awssdk#url-connection-client;2.17.178 in central
	found software.amazon.awssdk#utils;2.17.178 in central
	found org.reactivestreams#reactive-streams;1.0.3 in central

24/02/14 08:27:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


Spark Running


In [10]:
## Test Run a Query
spark.sql("CREATE database if not exists nessie.db1;")

DataFrame[]

In [11]:
## Test Run a Query
spark.sql("CREATE TABLE IF NOT EXISTS nessie.db1.test1 (name string) USING iceberg;").show()
spark.sql("INSERT INTO nessie.db1.test1 VALUES ('test');").show()
spark.sql("SELECT * FROM nessie.db1.test1;").show()

++
||
++
++



                                                                                

++
||
++
++

+----+
|name|
+----+
|test|
+----+



In [12]:
## LOAD A CSV INTO AN SQL VIEW
csv_df = spark.read.format("csv").option("header", "true").load("../datasets/df_open_2023.csv")
csv_df.createOrReplaceTempView("csv_open_2023")

In [13]:
spark.sql("select competitorId, competitorName, firstName, lastName from csv_open_2023 limit 10").show()

+------------+-----------------+---------+-----------+
|competitorId|   competitorName|firstName|   lastName|
+------------+-----------------+---------+-----------+
|      469656|    Jeffrey Adler|  Jeffrey|      Adler|
|      300638|   Tola Morakinyo|     Tola|  Morakinyo|
|      676693|   Colten Mertens|   Colten|    Mertens|
|      663689|Tyler Christophel|    Tyler|Christophel|
|     1031875|  Roldan Goldbaum|   Roldan|   Goldbaum|
|      327636| Samuel Cournoyer|   Samuel|  Cournoyer|
|       40955|     Ricky Garard|    Ricky|     Garard|
|      671093|    Dallin Pepper|   Dallin|     Pepper|
|      945268|      Reggie Fasa|   Reggie|       Fasa|
|      642094|     Cam Crockett|      Cam|   Crockett|
+------------+-----------------+---------+-----------+



In [14]:
## CREATE AN ICEBERG TABLE FROM THE SQL VIEW
spark.sql("CREATE TABLE IF NOT EXISTS nessie.db1.df_open_2023 USING iceberg AS SELECT * FROM csv_open_2023;").show()

                                                                                

++
||
++
++



In [15]:
## QUERY THE ICEBERG TABLE
spark.sql("SELECT * FROM nessie.db1.df_open_2023 limit 10;").show()

+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|competitorId|   competitorName|firstName|   lastName|status|gender|countryOfOriginCode|countryOfOriginName|regionId|        regionName|affiliateId|       affiliateName|age|height|weight|overallRank|overallScore|genderId|year|
+------------+-----------------+---------+-----------+------+------+-------------------+-------------------+--------+------------------+-----------+--------------------+---+------+------+-----------+------------+--------+----+
|      469656|    Jeffrey Adler|  Jeffrey|      Adler|   ACT|     M|                 CA|             Canada|      35|North America East|      18059| CrossFit Wonderland| 29| 69 in|197 lb|          1|         107|       1|2023|
|      300638|   Tola Morakinyo|     Tola|  Morakinyo|   ACT|     M|                 US|    

In [16]:
spark.stop()