In [1]:
import logging
import os
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, LongType, DoubleType, StringType

In [2]:
os.environ['AWS_REGION']='us-east-1'
os.environ['AWS_ACCESS_KEY_ID']='minioadmin'
os.environ['AWS_SECRET_ACCESS_KEY']='minioadmin'
os.environ['ENDPOINT']='http://127.0.0.1:54793'

In [3]:
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")
logger = logging.getLogger("MinIOSparkJob")

In [4]:
spark = SparkSession.builder.appName("Read from MinIO").getOrCreate()

In [5]:
conf = (
    SparkConf()
    .set("spark.sql.extensions","org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .set("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog")
    .set("spark.sql.catalog.demo.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
    .set("spark.sql.catalog.demo.warehouse", "s3a://warehouse/") # Location of the Iceberg tables
    .set("spark.sql.catalog.demo.s3.endpoint", "http://127.0.0.1:54793")
    .set("spark.sql.defaultCatalog", "demo") # Name of the Iceberg catalog
    .set("spark.sql.catalogImplementation", "in-memory")
    .set("spark.sql.catalog.demo.type", "hadoop") # Iceberg catalog type
)

In [6]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [7]:
def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.access.key", "minioadmin")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "minioadmin")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://127.0.0.1:54793")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark_context._jsc.hadoopConfiguration().set("fs.s3a.fast.upload", "true")

In [8]:
load_config(spark.sparkContext)

In [13]:
# Read data from S3
df = spark.read.format("csv").option("header", "true").load("s3a://datalake/abc.csv")

In [14]:
df.show()

+---+----+-----+
| id|name|class|
+---+----+-----+
|  1|   2|    3|
|  2|   a|    c|
|  3|   b|    f|
+---+----+-----+



In [15]:
# write data to iceberg table
df.write.format("iceberg").mode("append").saveAsTable("demo.test")

In [12]:
# show data from iceberg table
spark.sql("select * from demo.test").show()

+---+----+-----+
| id|name|class|
+---+----+-----+
|  1|   2|    3|
+---+----+-----+



In [28]:
spark.sql("INSERT INTO demo.a VALUES (1, 'test', 5)")

DataFrame[]

In [31]:
# show data from iceberg table
spark.sql("select * from demo.a").show()

+---+------+-----+
| id|  name|class|
+---+------+-----+
|  1|update|    5|
|  1|     2|    3|
+---+------+-----+



In [30]:
update_query = """
    UPDATE demo.a
    SET name = 'update'
    WHERE name = 'test'
"""
spark.sql(update_query)

DataFrame[]

In [46]:
spark.sql("SELECT * FROM demo.a VERSION AS OF 1334291243484622656").show()

+---+----+-----+
| id|name|class|
+---+----+-----+
|  1|test|    5|
|  1|   2|    3|
+---+----+-----+



In [48]:
# history of the table
spark.sql("SELECT * FROM demo.a.history").show()

+--------------------+-------------------+-------------------+-------------------+
|     made_current_at|        snapshot_id|          parent_id|is_current_ancestor|
+--------------------+-------------------+-------------------+-------------------+
|2024-05-09 10:55:...|3255105007450558558|               NULL|               true|
|2024-05-09 10:56:...|1334291243484622656|3255105007450558558|               true|
|2024-05-09 10:58:...|8116700340912039566|1334291243484622656|               true|
+--------------------+-------------------+-------------------+-------------------+



In [16]:
delete_query = """
    DELETE FROM demo.test
    WHERE name = 'a'
"""
spark.sql(delete_query)

spark.sql("select * from demo.test").show()

+---+----+-----+
| id|name|class|
+---+----+-----+
|  1|   2|    3|
|  3|   b|    f|
+---+----+-----+



In [18]:
rename_column= """
    ALTER TABLE demo.test RENAME COLUMN class TO class_name;
"""    
spark.sql(rename_column)

DataFrame[]

In [None]:
drop_query ="""
    DROP TABLE demo.test
"""    
spark.sql(drop_query)

In [None]:
spark.sql("INSERT INTO demo.a select * from demo.test")