In [None]:
import pyspark
from pyspark.sql import SparkSession
import os

## DEFINE SENSITIVE VARIABLES
NESSIE_URI = os.environ.get("NESSIE_URI") # Nessie Server URI
NESSIE_WAREHOUSE_LOCATION = os.environ.get("NESSIE_WAREHOUSE_LOCATION") # BUCKET TO WRITE DATA TOO
MINIO_ACCESS_KEY = os.environ.get("MINIO_ACCESS_KEY") # MINIO CREDENTIALS
MINIO_SECRET_ACCESS_KEY = os.environ.get("MINIO_SECRET_ACCESS_KEY") # MINIO CREDENTIALS
MINIO_S3_ENDPOINT= os.environ.get("MINIO_S3_ENDPOINT") # MINIO S3 ENDPOINT

# We must set a few explicit environment variables to make spark and pyIceberg happy
os.environ["AWS_ACCESS_KEY_ID"] = MINIO_ACCESS_KEY
os.environ["AWS_SECRET_ACCESS_KEY"] = MINIO_SECRET_ACCESS_KEY
os.environ["AWS_S3_ENDPOINT"] = MINIO_S3_ENDPOINT

print(f"MINIO_S3_ENDPOINT={MINIO_S3_ENDPOINT}")
print(f"NESSIE_URI={NESSIE_URI}")
print(f"NESSIE_WAREHOUSE_LOCATION={NESSIE_WAREHOUSE_LOCATION}")

In [None]:
conf = (
    pyspark.SparkConf()
        .setAppName('minio_datalake_with_spark_table_create')
        .set('spark.jars.packages', 'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.3,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.5_2.12:0.76.3,software.amazon.awssdk:bundle:2.17.178,software.amazon.awssdk:url-connection-client:2.17.178')
        .set('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions')
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set('spark.sql.catalog.nessie.s3.endpoint', MINIO_S3_ENDPOINT)
        .set('spark.sql.catalog.nessie.warehouse', NESSIE_WAREHOUSE_LOCATION)
        .set('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO')
        .set('spark.hadoop.fs.s3a.access.key', MINIO_ACCESS_KEY)
        .set('spark.hadoop.fs.s3a.secret.key', MINIO_SECRET_ACCESS_KEY)
)

## Start Spark Session
spark = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark Running")

In [None]:
## Drop Table if exists
spark.sql("DROP TABLE IF EXISTS nessie.subjects")

## Create a Table
spark.sql("CREATE TABLE nessie.subjects (subject STRING) USING iceberg;").show()

## Insert Some Data
spark.sql("INSERT INTO nessie.subjects VALUES ('Math'), ('Social Studies'), ('Geography')").show()

In [None]:
## Query the Data
spark.sql("SELECT * FROM nessie.subjects;").show()