# Imports 

In [1]:
import os

In [2]:
import pyspark
from pyspark.sql import SparkSession

# Setup 

In [3]:
NESSIE_URI = "http://nessie:19120/api/v1"
AWS_ACCESS_KEY = "minio"
AWS_SECRET_KEY = "minio123"
AWS_S3_ENDPOINT = "http://minio:9000"
WAREHOUSE = "s3a://lake/"

In [4]:
conf = (
    pyspark.SparkConf()
        .setAppName('app_name')
        .set("spark.sql.execution.pyarrow.enabled", "true")
        .set(
            "spark.jars.packages",
            f"org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.4.2,org.projectnessie.nessie-integrations:nessie-spark-extensions-3.2_2.12:0.74.0",
        )
        .set('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog')
        .set('spark.sql.catalog.nessie.uri', NESSIE_URI)
        .set('spark.sql.catalog.nessie.ref', 'main')
        .set('spark.sql.catalog.nessie.authentication.type', 'NONE')
        .set('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog')
        .set("spark.sql.catalog.nessie.warehouse", "file://" + os.getcwd() + "/spark-warehouse/iceberg")
        .set('spark.hadoop.fs.s3a.access.key', 'minio')
        .set('spark.hadoop.fs.s3a.secret.key', 'minio123')
        .set('spark.hadoop.fs.s3a.endpoint', "http://minio:9000")
        .set('spark.hadoop.fs.s3a.connection.ssl.enabled', "false")
        .set('spark.hadoop.fs.s3a.path.style.access', "true")
        .set(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions",
        )
)

In [5]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.iceberg#iceberg-spark-runtime-3.2_2.12 added as a dependency
org.projectnessie.nessie-integrations#nessie-spark-extensions-3.2_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-14fdafba-931c-4086-9760-4b695d096d52;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;1.4.2 in central
	found org.projectnessie.nessie-integrations#nessie-spark-extensions-3.2_2.12;0.74.0 in central
:: resolution report :: resolve 126ms :: artifacts dl 7ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-3.2_2.12;1.4.2 from central in [default]
	org.projectnessie.nessie-integrations#nessie-spark-extensions-3.2_2.12;0.74.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| sear

# Clean Up

In [6]:
spark.sql("DROP TABLE IF EXISTS nessie.taxis.trips").toPandas()

In [7]:
spark.sql("DROP NAMESPACE IF EXISTS nessie.taxis;").toPandas()

In [8]:
spark.sql("DROP BRANCH IF EXISTS batch2 IN nessie").toPandas()

Unnamed: 0,status
0,OK


# Query

In [9]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.taxis;").toPandas()

In [10]:
spark.sql("LIST REFERENCES IN nessie").toPandas()

Unnamed: 0,refType,name,hash
0,Branch,main,e3e025593a4b6e5fa426eeab1c482b12f305a490086012...


In [11]:
spark.sql(
"""
CREATE TABLE IF NOT EXISTS nessie.taxis.trips(
    vendor_id INT,
    pickup_datetime TIMESTAMP,
    dropoff_datetime TIMESTAMP,
    passenger_count INT,
    pickup_location_id INT,
    dropoff_location_id INT,
    fare_amount FLOAT
)
USING iceberg;
"""
).toPandas()

In [12]:
spark.sql(
"""
CREATE OR REPLACE TEMPORARY VIEW trips_table USING csv
OPTIONS (path "/data/yellow_tripdata_sample_2019_01.csv", header true);
"""
).toPandas()

In [13]:
spark.sql("SELECT * FROM trips_table LIMIT 5;").toPandas()

Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_location_id,dropoff_location_id,fare_amount
0,1,2019-01-15 03:36:12,2019-01-15 03:42:19,1,230,48,6.5
1,1,2019-01-25 18:20:32,2019-01-25 18:26:55,1,112,112,6.0
2,1,2019-01-05 06:47:31,2019-01-05 06:52:19,1,107,4,6.0
3,1,2019-01-09 15:08:02,2019-01-09 15:20:17,1,143,158,11.0
4,1,2019-01-25 18:49:51,2019-01-25 18:56:44,1,246,90,6.5


In [14]:
spark.sql("""
INSERT INTO nessie.taxis.trips
SELECT 
    CAST(vendor_id AS INT),
    CAST(pickup_datetime AS TIMESTAMP),
    CAST(dropoff_datetime AS TIMESTAMP),
    CAST(passenger_count AS INT),
    CAST(pickup_location_id AS INT),
    CAST(dropoff_location_id AS INT),
    CAST(fare_amount AS FLOAT)
FROM trips_table;
"""
).toPandas()

                                                                                

In [15]:
spark.sql("SELECT * FROM nessie.taxis.trips LIMIT 5;").toPandas()

  series = series.astype(t, copy=False)
  series = series.astype(t, copy=False)


Unnamed: 0,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_location_id,dropoff_location_id,fare_amount
0,1,2019-01-15 03:36:12,2019-01-15 03:42:19,1,230,48,6.5
1,1,2019-01-25 18:20:32,2019-01-25 18:26:55,1,112,112,6.0
2,1,2019-01-05 06:47:31,2019-01-05 06:52:19,1,107,4,6.0
3,1,2019-01-09 15:08:02,2019-01-09 15:20:17,1,143,158,11.0
4,1,2019-01-25 18:49:51,2019-01-25 18:56:44,1,246,90,6.5


In [18]:
spark.sql("DROP VIEW IF EXISTS trips_table").toPandas()

Create a new branch and ingest more data there

In [None]:
spark.sql("CREATE BRANCH batch2 IN nessie FROM main").toPandas()

In [None]:
spark.sql("LIST REFERENCES IN nessie").toPandas()

In [None]:
spark.sql("USE REFERENCE batch2 IN nessie").toPandas()

In [None]:
spark.sql(
"""
CREATE OR REPLACE TEMPORARY VIEW trips_table USING csv
OPTIONS (path "/data/yellow_tripdata_sample_2019_02.csv", header true);
"""
).toPandas()

In [None]:
spark.sql("""
INSERT INTO nessie.taxis.trips
SELECT 
    CAST(vendor_id AS INT),
    CAST(pickup_datetime AS TIMESTAMP),
    CAST(dropoff_datetime AS TIMESTAMP),
    CAST(passenger_count AS INT),
    CAST(pickup_location_id AS INT),
    CAST(dropoff_location_id AS INT),
    CAST(fare_amount AS FLOAT)
FROM trips_table;
"""
).toPandas()

In [None]:
spark.sql("SELECT COUNT(*) FROM nessie.taxis.trips;").toPandas()

In [None]:
spark.sql("SELECT COUNT(*) FROM trips_table;").toPandas()

In [None]:
spark.sql("MERGE BRANCH batch2 INTO main IN nessie").toPandas()

In [None]:
spark.sql("SELECT COUNT(*) FROM nessie.taxis.trips").toPandas()