# 2. Parquet to Iceberg

This notebook demonstrates the process of reading a Parquet file from the `grupo-2` bucket in MinIO and saving it to another bucket using the Apache Iceberg table format. The workflow utilizes `dlt` and Iceberg libraries, with data managed as a tabular dataset using the Nessie catalog for efficient querying and versioning. Note that this requires MinIO access and the Iceberg library installation. The Notebook:

* Uses the MinIO API on port 9000 with credentials inferred from .dlt/secrets.toml.
* Reads Parquet files from a specified bucket (e.g., s3://grupo-2/grupo_2_parquet/df_data).

In [1]:
%pip install pandas pyarrow fsspec dlt[filesystem] s3fs adlfs pyiceberg[s3fs,sql-sqlite] toml

Collecting pandas
  Downloading pandas-2.3.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m91.2/91.2 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting pyarrow
  Downloading pyarrow-21.0.0-cp311-cp311-manylinux_2_28_x86_64.whl.metadata (3.3 kB)
Collecting fsspec
  Downloading fsspec-2025.9.0-py3-none-any.whl.metadata (10 kB)
Collecting s3fs
  Downloading s3fs-2025.9.0-py3-none-any.whl.metadata (1.4 kB)
Collecting adlfs
  Downloading adlfs-2025.8.0-py3-none-any.whl.metadata (7.7 kB)
Collecting toml
  Downloading toml-0.10.2-py2.py3-none-any.whl.metadata (7.1 kB)
Collecting dlt[filesystem]
  Downloading dlt-1.15.0-py3-none-any.whl.metadata (12 kB)
Collecting pyiceberg[s3fs,sql-sqlite]
  Downloading pyiceberg-0.9.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.5 kB)
Collecting numpy>=1.23.2 (from pandas)
  Downloading numpy-2.3.2-cp311-cp311-manylinux_2_27_x86

In [None]:
pre_2020_table = catalog.load_table("demo.pre_2020")
pre_2020_table.merge_into(
    source_table=pre_2020,
    merge_condition="target.Id = source.Id",
    update={"*"},
    insert={"*"}
)

In [None]:
table.append(df)  # Agrega el Arrow Table a la tabla Iceberg (escribe Parquet subyacente)
len(table.scan().to_arrow())  # Escanea la tabla, convierte a Arrow, cuenta filas
arrow_table = table.scan().to_arrow()  # Escanea todo
arrow_table.to_pandas()  # A Pandas completo
arrow_table.to_pandas().head()  # Solo las primeras filas

In [2]:
# General utilities
import os
import toml
import logging
from typing import Optional

# Data manipulation
import pandas as pd

# dlt: Reading from filesystem
import dlt
from dlt.sources.filesystem import filesystem, read_parquet

# PyArrow: Reading and Convertion
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.dataset as ds
import pyarrow.fs as fs

# PyIceberg
from pyiceberg.catalog import load_catalog
from pyiceberg.table import Table
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import (
    BooleanType, IntegerType, LongType, FloatType, DoubleType,
    StringType, TimestampType, DateType
)

In [3]:
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger("parquet_to_iceberg")

In [None]:
# Configure the pipeline
try:
    pipeline = dlt.pipeline(
        pipeline_name="sources",  
        destination="filesystem",
    )
    logger.info(f"Pipeline configured successfully with name: {pipeline.pipeline_name}")
except Exception as e:
    logger.error(f"Error configuring pipeline: {str(e)}")
    raise


In [None]:
filesystem_source = filesystem() | read_parquet()

In [None]:
info = pipeline.run(filesystem_pipe)
print(info)

In [None]:
print(pipeline.last_trace.last_normalize_info)

In [4]:
# Cargar el archivo de configuración
config = toml.load("/home/jovyan/work/.dlt/secrets.toml")

# Extraer credenciales
creds = config["sources"]["credentials"]

# Exportar a variables de entorno
os.environ["AWS_ACCESS_KEY_ID"] = creds["aws_access_key_id"]
os.environ["AWS_SECRET_ACCESS_KEY"] = creds["aws_secret_access_key"]
os.environ["AWS_ENDPOINT_URL"] = creds.get("endpoint_url", "")  

In [6]:
# Si estás en local, usa la ruta al directorio donde dlt guardó los Parquet
dataset = ds.dataset(
    source="s3://grupo-2/grupo_2_parquet/df_data",  # Ajusta según tu ruta
    format="parquet"
)

# Convertir a Arrow Table
table = dataset.to_table()

In [7]:
# Convertir a Pandas
df = table.to_pandas()
print(df.info())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3475226 entries, 0 to 3475225
Data columns (total 20 columns):
 #   Column                 Dtype         
---  ------                 -----         
 0   vendor_id              int32         
 1   tpep_pickup_datetime   datetime64[us]
 2   tpep_dropoff_datetime  datetime64[us]
 3   passenger_count        float64       
 4   trip_distance          float64       
 5   ratecode_id            float64       
 6   store_and_fwd_flag     object        
 7   pu_location_id         int32         
 8   do_location_id         int32         
 9   payment_type           int64         
 10  fare_amount            float64       
 11  extra                  float64       
 12  mta_tax                float64       
 13  tip_amount             float64       
 14  tolls_amount           float64       
 15  improvement_surcharge  float64       
 16  total_amount           float64       
 17  congestion_surcharge   float64       
 18  airport_fee           

In [8]:
# Ver esquema
print(table.schema)

vendor_id: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: double
trip_distance: double
ratecode_id: double
store_and_fwd_flag: string
pu_location_id: int32
do_location_id: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
cbd_congestion_fee: double


In [9]:
schema_dict = {field.name: str(field.type) for field in table.schema}
print(schema_dict)

{'vendor_id': 'int32', 'tpep_pickup_datetime': 'timestamp[us]', 'tpep_dropoff_datetime': 'timestamp[us]', 'passenger_count': 'double', 'trip_distance': 'double', 'ratecode_id': 'double', 'store_and_fwd_flag': 'string', 'pu_location_id': 'int32', 'do_location_id': 'int32', 'payment_type': 'int64', 'fare_amount': 'double', 'extra': 'double', 'mta_tax': 'double', 'tip_amount': 'double', 'tolls_amount': 'double', 'improvement_surcharge': 'double', 'total_amount': 'double', 'congestion_surcharge': 'double', 'airport_fee': 'double', 'cbd_congestion_fee': 'double'}


In [10]:
def infer_iceberg_schema_from_arrow_schema(arrow_schema: pa.Schema) -> Schema:
    fields = []
    fid = 1

    for field in arrow_schema:
        arrow_type = field.type

        if pa.types.is_boolean(arrow_type):
            iceberg_type = BooleanType()
        elif pa.types.is_int32(arrow_type):
            iceberg_type = IntegerType()
        elif pa.types.is_int64(arrow_type):
            iceberg_type = LongType()
        elif pa.types.is_float32(arrow_type):
            iceberg_type = FloatType()
        elif pa.types.is_float64(arrow_type):
            iceberg_type = DoubleType()
        elif pa.types.is_string(arrow_type):
            iceberg_type = StringType()
        elif pa.types.is_timestamp(arrow_type):
            iceberg_type = TimestampType()
        elif pa.types.is_date(arrow_type):
            iceberg_type = DateType()
        else:
            iceberg_type = StringType()  # Fallback genérico

        fields.append(NestedField(fid, field.name, iceberg_type, required=not field.nullable))
        fid += 1

    return Schema(*fields)

In [11]:
iceberg_schema = infer_iceberg_schema_from_arrow_schema(table.schema)
print(iceberg_schema)

table {
  1: vendor_id: optional int
  2: tpep_pickup_datetime: optional timestamp
  3: tpep_dropoff_datetime: optional timestamp
  4: passenger_count: optional double
  5: trip_distance: optional double
  6: ratecode_id: optional double
  7: store_and_fwd_flag: optional string
  8: pu_location_id: optional int
  9: do_location_id: optional int
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
  20: cbd_congestion_fee: optional double
}


In [12]:
catalog = load_catalog(
    "nessie",
    uri="http://nessie:19120/iceberg/",
    type="rest"
)

namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: []


In [13]:
catalog.create_namespace("proyecto")

In [14]:
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('proyecto',)]


In [25]:
catalog.create_table("proyecto.grupo_2", schema=iceberg_schema)

BadRequestError: IllegalArgumentException: Location for ICEBERG_TABLE 'proyecto.grupo_2' cannot be associated with any configured object storage location: Invalid secret URI, must be in the form 'urn:nessie-secret:<provider>:<secret-name>'

BadRequestError: IllegalArgumentException: Location for ICEBERG_TABLE 'proyecto.grupo_2_iceberg' cannot be associated with any configured object storage location: Invalid secret URI, must be in the form 'urn:nessie-secret:<provider>:<secret-name>'

In [None]:
grupo_2_table = catalog.load_table("proyecto.grupo_2_iceberg")
grupo_2_table.overwrite(grupo_2_table)

In [None]:
pre_2020_table.merge_into(
    source_table=pre_2020,
    merge_condition="target.Id = source.Id",
    update={"*"},
    insert={"*"}
)