In [1]:
import pyspark
from pyspark.sql.types import *
from pyspark.sql.functions import *

In [2]:
builder = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.0") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "warehouse") \
    .config("spark.sql.defaultCatalog", "local")

In [None]:
spark = builder.getOrCreate()
# It might take briefly to start the Spark process.
# Wait for some output below to let the Spark fully started.

In [None]:
spark

In [5]:
schema = StructType([
    StructField("vendor_id", LongType(), True),
    StructField("trip_id", LongType(), True),
    StructField("trip_distance", FloatType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True)
])

In [6]:
schema

StructType([StructField('vendor_id', LongType(), True), StructField('trip_id', LongType(), True), StructField('trip_distance', FloatType(), True), StructField('fare_amount', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True)])

In [7]:
df = spark.createDataFrame([], schema)

In [8]:
df

DataFrame[vendor_id: bigint, trip_id: bigint, trip_distance: float, fare_amount: double, store_and_fwd_flag: string]

In [None]:
# create Iceberg Table called 'taxis' in 'local' Catalog of 'nycdb' database
df.writeTo("local.nycdb.taxis").create()

In [10]:
schema = spark.table("local.nycdb.taxis").schema

In [11]:
schema

StructType([StructField('vendor_id', LongType(), True), StructField('trip_id', LongType(), True), StructField('trip_distance', FloatType(), True), StructField('fare_amount', DoubleType(), True), StructField('store_and_fwd_flag', StringType(), True)])

In [12]:
data = [
    (1, 1000371, 1.8, 15.32, "N"),
    (2, 1000372, 2.5, 22.15, "N"),
    (2, 1000373, 0.9, 9.01, "N"),
    (1, 1000374, 8.4, 42.13, "Y")
]

In [13]:
data

[(1, 1000371, 1.8, 15.32, 'N'),
 (2, 1000372, 2.5, 22.15, 'N'),
 (2, 1000373, 0.9, 9.01, 'N'),
 (1, 1000374, 8.4, 42.13, 'Y')]

In [14]:
df = spark.createDataFrame(data, schema)

In [15]:
df

DataFrame[vendor_id: bigint, trip_id: bigint, trip_distance: float, fare_amount: double, store_and_fwd_flag: string]

In [16]:
df.writeTo("local.nycdb.taxis").append()

In [17]:
df = spark.table("local.nycdb.taxis")

In [18]:
df

DataFrame[vendor_id: bigint, trip_id: bigint, trip_distance: float, fare_amount: double, store_and_fwd_flag: string]

In [19]:
df.printSchema()

root
 |-- vendor_id: long (nullable = true)
 |-- trip_id: long (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)



In [20]:
df.count()

4

In [21]:
df.show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        1|1000371|          1.8|      15.32|                 N|
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [22]:
spark.sql("SELECT * FROM local.nycdb.taxis ORDER BY vendor_id DESC").show()

+---------+-------+-------------+-----------+------------------+
|vendor_id|trip_id|trip_distance|fare_amount|store_and_fwd_flag|
+---------+-------+-------------+-----------+------------------+
|        2|1000372|          2.5|      22.15|                 N|
|        2|1000373|          0.9|       9.01|                 N|
|        1|1000371|          1.8|      15.32|                 N|
|        1|1000374|          8.4|      42.13|                 Y|
+---------+-------+-------------+-----------+------------------+



In [23]:
spark.sql("CREATE TABLE local.demodb.srctbl (id bigint, data string) USING iceberg")

DataFrame[]

In [24]:
spark.sql("INSERT INTO local.demodb.srctbl VALUES (1, 'a'), (2, 'b'), (3, 'c');")

DataFrame[]

In [25]:
spark.sql("SELECT * FROM local.demodb.srctbl;").show()

+---+----+
| id|data|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+



In [26]:
# we can go back (time travel) to check previous snapshots of the table
spark.sql("SELECT * FROM local.demodb.srctbl.snapshots").show()

+--------------------+-------------------+---------+---------+--------------------+--------------------+
|        committed_at|        snapshot_id|parent_id|operation|       manifest_list|             summary|
+--------------------+-------------------+---------+---------+--------------------+--------------------+
|2024-08-03 17:56:...|9180568718528832331|     NULL|   append|warehouse/demodb/...|{spark.app.id -> ...|
+--------------------+-------------------+---------+---------+--------------------+--------------------+



In [27]:
!ls warehouse

[1m[36mdemodb[m[m [1m[36mmydb[m[m   [1m[36mnycdb[m[m
