**Note:** This is a DDL notebook. Run this only once

In [1]:
from pyspark.sql import SparkSession

import sys
sys.path.append("/workspace/seed")

from seed.iceberg_setup import conf

spark: SparkSession = SparkSession.builder.config(conf=conf).getOrCreate()
print("Spark is up and running!")



:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12 added as a dependency
software.amazon.awssdk#bundle added as a dependency
org.slf4j#slf4j-simple added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a88af8c6-2b44-4dea-90b8-b99093b22c3c;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.5_2.12;1.8.1 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.5_2.12;0.83.1 in central
	found software.amazon.awssdk#bundle;2.29.52 in central
	found org.slf4j#slf4j-simple;2.0.7 in central
	found org.slf4j#slf4j-api;2.0.7 in central
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found org.wildfly.openssl#wildfly-ope

Spark is up and running!


In [2]:
# Create a namespace in nessie catalog

# spark.sql("DROP NAMESPACE IF EXISTS nessie.dev;")

spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.dev;")

spark.sql("SHOW NAMESPACES FROM nessie").show()

+---------+
|namespace|
+---------+
|      dev|
+---------+



In [3]:
# Create a test table in the namespace

spark.sql("CREATE OR REPLACE TABLE nessie.dev.names (name STRING) USING iceberg TBLPROPERTIES ('gc.enabled' = 'true');")

spark.sql("INSERT INTO nessie.dev.names VALUES ('vinay');")

spark.sql("SELECT * FROM nessie.dev.names;").show()

spark.sql("SHOW TABLES FROM nessie;").show()

# spark.sql("DROP TABLE nessie.dev.names PURGE;")

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
25/04/25 06:51:37 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'dev.names' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.
                                                                                

+-----+
| name|
+-----+
|vinay|
+-----+

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|      dev|    names|      false|
+---------+---------+-----------+



In [4]:
# Read flights data from parquet file

df = spark.read.parquet("s3a://seed/flights-1m.parquet")
df.show(5)

df.printSchema()

df.createOrReplaceTempView("raw_flights")

spark.sql("""
    SELECT
        MIN(FL_DATE) AS min_date,
        MAX(FL_DATE) AS max_date,
        COUNT(*) AS num_rows
    FROM raw_flights;
""").show()

spark.sql("""
    SELECT
        FL_DATE,
        COUNT(*) AS num_rows
    FROM raw_flights
    GROUP BY FL_DATE
    ORDER BY FL_DATE
    LIMIT 10;
""").show()

25/04/25 06:51:40 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


+----------+---------+---------+--------+--------+---------+---------+
|   FL_DATE|DEP_DELAY|ARR_DELAY|AIR_TIME|DISTANCE| DEP_TIME| ARR_TIME|
+----------+---------+---------+--------+--------+---------+---------+
|2006-01-01|        5|       19|     350|    2475| 9.083333|12.483334|
|2006-01-02|      167|      216|     343|    2475|11.783334|15.766666|
|2006-01-03|       -7|       -2|     344|    2475| 8.883333|12.133333|
|2006-01-04|       -5|      -13|     331|    2475| 8.916667|    11.95|
|2006-01-05|       -3|      -17|     321|    2475|     8.95|11.883333|
+----------+---------+---------+--------+--------+---------+---------+
only showing top 5 rows

root
 |-- FL_DATE: date (nullable = true)
 |-- DEP_DELAY: short (nullable = true)
 |-- ARR_DELAY: short (nullable = true)
 |-- AIR_TIME: short (nullable = true)
 |-- DISTANCE: short (nullable = true)
 |-- DEP_TIME: float (nullable = true)
 |-- ARR_TIME: float (nullable = true)

+----------+----------+--------+
|  min_date|  max_date|n

In [5]:
# Create flights table from parquet file

spark.sql("""
    CREATE TABLE IF NOT EXISTS nessie.dev.flights 
    USING iceberg
    PARTITIONED BY (fl_date)
    TBLPROPERTIES ('gc.enabled' = 'true')
    AS
    SELECT
        *
    FROM raw_flights;
""")

spark.sql("SELECT * FROM nessie.dev.flights LIMIT 5;").show()

# spark.sql("DROP TABLE nessie.dev.flights PURGE;")

25/04/25 06:51:48 WARN NessieUtil: The Iceberg property 'gc.enabled' and/or 'write.metadata.delete-after-commit.enabled' is enabled on table 'dev.flights' in NessieCatalog. This will likely make data in other Nessie branches and tags and in earlier, historical Nessie commits inaccessible. The recommended setting for those properties is 'false'. Use the 'nessie-gc' tool for Nessie reference-aware garbage collection.


+----------+---------+---------+--------+--------+--------+---------+
|   FL_DATE|DEP_DELAY|ARR_DELAY|AIR_TIME|DISTANCE|DEP_TIME| ARR_TIME|
+----------+---------+---------+--------+--------+--------+---------+
|2006-02-28|       -7|        0|     340|    2475|8.883333|12.166667|
|2006-02-28|        6|      -14|     270|    2475|     9.6|17.516666|
|2006-02-28|       -9|      -13|     338|    2475|   11.85|14.833333|
|2006-02-28|        6|       -9|     274|    2475|    12.6|20.666666|
|2006-02-28|      -12|       -1|     498|    3784|9.883333|14.666667|
+----------+---------+---------+--------+--------+--------+---------+

