![iceberg-logo](https://www.apache.org/logos/res/iceberg/iceberg.png)

### [Docker, Spark, and Iceberg: The Fastest Way to Try Iceberg!](https://tabular.io/blog/docker-spark-and-iceberg/)

In [None]:
from pyiceberg import __version__

__version__

## Load NYC Taxi/Limousine Trip Data

For this notebook, we will use the New York City Taxi and Limousine Commision Trip Record Data that's available on the AWS Open Data Registry. This contains data of trips taken by taxis and for-hire vehicles in New York City. We'll save this into an iceberg table called `taxis`.

To be able to rerun the notebook several times, let's drop the table if it exists to start fresh.

In [None]:
%%sql

CREATE DATABASE IF NOT EXISTS nyc;

In [None]:
%%sql
DROP TABLE IF EXISTS nyc.taxis;

In [None]:
%%sql
CREATE TABLE IF NOT EXISTS nyc.taxis (
    VendorID              bigint,
    tpep_pickup_datetime  timestamp,
    tpep_dropoff_datetime timestamp,
    passenger_count       double,
    trip_distance         double,
    RatecodeID            double,
    store_and_fwd_flag    string,
    PULocationID          bigint,
    DOLocationID          bigint,
    payment_type          bigint,
    fare_amount           double,
    extra                 double,
    mta_tax               double,
    tip_amount            double,
    tolls_amount          double,
    improvement_surcharge double,
    total_amount          double,
    congestion_surcharge  double,
    airport_fee           double
)
USING iceberg
PARTITIONED BY (days(tpep_pickup_datetime))

In [None]:
%%sql

TRUNCATE TABLE nyc.taxis

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Jupyter").getOrCreate()

for filename in [
    "yellow_tripdata_2022-04.parquet",
    "yellow_tripdata_2022-03.parquet",
    "yellow_tripdata_2022-02.parquet",
    "yellow_tripdata_2022-01.parquet",
    "yellow_tripdata_2021-12.parquet",
]:
    df = spark.read.parquet(f"/home/iceberg/data/{filename}")
    df.write.mode("append").saveAsTable("nyc.taxis")

## Load data into a PyArrow Dataframe

We'll fetch the table using the REST catalog that comes with the setup.

In [None]:
from pyiceberg.catalog import load_catalog
from pyiceberg.expressions import GreaterThanOrEqual

catalog = load_catalog('default')

In [None]:
tbl = catalog.load_table('nyc.taxis')

sc = tbl.scan(row_filter=GreaterThanOrEqual("tpep_pickup_datetime", "2022-01-01T00:00:00.000000+00:00"))

In [None]:
df = sc.to_arrow().to_pandas()

In [None]:
len(df)

In [None]:
df.info()

In [None]:
df

In [None]:
df.hist(column='fare_amount')

In [None]:
import numpy as np
from scipy import stats

stats.zscore(df['fare_amount'])

# Remove everything larger than 3 stddev
df = df[(np.abs(stats.zscore(df['fare_amount'])) < 3)]
# Remove everything below zero
df = df[df['fare_amount'] > 0]

In [None]:
df.hist(column='fare_amount')

# DuckDB

Use DuckDB to Query the PyArrow Dataframe directly.

In [None]:
%load_ext sql
%config SqlMagic.autopandas = True
%config SqlMagic.feedback = False
%config SqlMagic.displaycon = False
%sql duckdb:///:memory:

In [None]:
%sql SELECT * FROM df LIMIT 20

In [None]:
%%sql --save tip-amount --no-execute

SELECT tip_amount
FROM df

In [None]:
%sqlplot histogram --table df --column tip_amount --bins 22 --with tip-amount


In [None]:
%%sql --save tip-amount-filtered --no-execute

WITH tip_amount_stddev AS (
    SELECT STDDEV_POP(tip_amount) AS tip_amount_stddev
    FROM df
)

SELECT tip_amount
FROM df, tip_amount_stddev
WHERE tip_amount > 0
  AND tip_amount < tip_amount_stddev * 3

In [None]:
%sqlplot histogram --table tip-amount-filtered --column tip_amount --bins 50 --with tip-amount-filtered


# Iceberg ❤️ PyArrow and DuckDB

This notebook shows how you can load data into a PyArrow dataframe and query it using DuckDB easily. Iceberg allows you to take a slice out of the data that you need for your analysis, while reducing the time that you have to wait for the data and without polluting the memory with data that you're not going to use.