In [1]:
pip install pyiceberg

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install --upgrade pynessie


Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install --upgrade typing_extensions

Note: you may need to restart the kernel to use updated packages.


In [4]:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.fs as fs
import pyarrow.dataset as ds

from pyiceberg.catalog import load_catalog
from pyiceberg.schema import Schema, NestedField
from pyiceberg.types import *


In [5]:
# Definir el catálogo Nessie
catalog = load_catalog(
    "nessie",
    **{
        "uri": "http://rest:8181",
    }
)


In [6]:
print(catalog)

nessie (<class 'pyiceberg.catalog.rest.RestCatalog'>)


In [7]:
# ----------------------------
# Verificar conexión listando los namespaces
# ----------------------------
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('project-frameworks',)]


In [8]:
#catalog.create_namespace("project-frameworks")

In [9]:
# Volver a listar para confirmar
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

Namespaces: [('project-frameworks',)]


In [10]:
from pyiceberg.catalog import load_catalog

In [11]:
# ----------------------------
# Conectar a MinIO como S3
# ----------------------------
s3 = fs.S3FileSystem(
    access_key="admin",
    secret_key="password",
    endpoint_override="http://minio:9000",
    region="us-east-1",
)


In [12]:
print(s3.get_file_info(fs.FileSelector("", recursive=False)))


[<FileInfo for 'warehouse': type=FileType.Directory>]


In [13]:
print(s3.get_file_info(fs.FileSelector("warehouse", recursive=True)))


[<FileInfo for 'warehouse/project-frameworks/project/metadata/00000-dc647f4f-4c44-4d67-822b-3372c9f86769.metadata.json': type=FileType.File, size=1940>, <FileInfo for 'warehouse/project-frameworks/project/metadata': type=FileType.Directory>, <FileInfo for 'warehouse/project-frameworks/project': type=FileType.Directory>, <FileInfo for 'warehouse/project-frameworks': type=FileType.Directory>, <FileInfo for 'warehouse/yellow_trip_20250911124659/_dlt_loads/yellow_trip_data__1757551619.2035155.jsonl': type=FileType.File, size=194>, <FileInfo for 'warehouse/yellow_trip_20250911124659/_dlt_loads': type=FileType.Directory>, <FileInfo for 'warehouse/yellow_trip_20250911124659': type=FileType.Directory>, <FileInfo for 'warehouse/yellow_trip_20250911124659/_dlt_pipeline_state/yellow_trip_data_pipeline__1757551619.2035155__53daa9a8b871038afca91189eddb71d85605e281698d749323ad03ee9ff31994.jsonl': type=FileType.File, size=541>, <FileInfo for 'warehouse/yellow_trip_20250911124659/_dlt_pipeline_state':

In [14]:
path = "warehouse/yellow_trip_20250911124659/yellow_trip_data"

In [15]:
dataset = ds.dataset(path, filesystem=s3, format="parquet")

In [16]:
from pyspark.sql import SparkSession

# ----------------------------
# Inicializar Spark con Iceberg + Nessie + MinIO
# ----------------------------
spark = SparkSession.builder \
    .appName("Iceberg + Nessie + MinIO") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
    .config("spark.sql.catalog.spark_catalog.type", "hive") \
    .getOrCreate()

# Configuraciones para S3/MinIO
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.access.key", "admin")
hadoop_conf.set("fs.s3a.secret.key", "password")
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")
hadoop_conf.set("fs.s3a.path.style.access", "true")
hadoop_conf.set("fs.s3a.connection.ssl.enabled", "false")


In [17]:

table = dataset.to_table()
table_schema = table.schema

print("Schema Arrow:")
print(table_schema)

Schema Arrow:
vendor_id: int32
tpep_pickup_datetime: timestamp[us, tz=UTC]
tpep_dropoff_datetime: timestamp[us, tz=UTC]
passenger_count: int64
trip_distance: double
ratecode_id: int64
store_and_fwd_flag: large_string
pu_location_id: int32
do_location_id: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
cbd_congestion_fee: double


In [18]:
from pyiceberg.types import (
    LongType, IntegerType, FloatType, DoubleType, StringType, BinaryType,
    TimestampType, TimestamptzType
)

def arrow_to_iceberg(arrow_schema: pa.Schema) -> Schema:
    fields = []
    field_id = 1

    for field in arrow_schema:
        name = field.name
        arrow_type = field.type

        if pa.types.is_int64(arrow_type):
            iceberg_type = LongType()
        elif pa.types.is_int32(arrow_type):
            iceberg_type = IntegerType()
        elif pa.types.is_float32(arrow_type):
            iceberg_type = FloatType()
        elif pa.types.is_float64(arrow_type):
            iceberg_type = DoubleType()
        elif pa.types.is_string(arrow_type) or pa.types.is_large_string(arrow_type):
            iceberg_type = StringType()
        elif pa.types.is_binary(arrow_type) or pa.types.is_large_binary(arrow_type):
            iceberg_type = BinaryType()
        elif pa.types.is_timestamp(arrow_type):
            # Diferenciar timestamp naive vs con zona horaria
            if arrow_type.tz is None:
                iceberg_type = TimestampType()      # naive
            else:
                iceberg_type = TimestamptzType()    # con tz
        else:
            raise ValueError(f"Tipo no soportado: {arrow_type}")

        fields.append(NestedField(field_id, name, iceberg_type, required=not field.nullable))
        field_id += 1

    return Schema(*fields)


In [19]:
# ----------------------------
# Crear la tabla en el catálogo Nessie si no existe
# ----------------------------
if not catalog.table_exists("project-frameworks.project"):
    catalog.create_table("project-frameworks.project", schema=post_schema)
else:
    print("⚠ La tabla project-frameworks.project ya existe, no se creó de nuevo.")

⚠ La tabla project-frameworks.project ya existe, no se creó de nuevo.


In [20]:
import requests

url = "http://nessie:19120"  
try:
    response = requests.get(url)
    print("Status code:", response.status_code)
    print("Contenido:", response.text)
except Exception as e:
    print("Error:", e)


Status code: 200
Contenido: <!doctype html><html lang="en"><head><meta charset="utf-8"/><link href="/favicon.ico" rel="icon"/><meta content="width=device-width,initial-scale=1" name="viewport"/><meta content="#000000" name="theme-color"/><meta content="Nessie Repository Manager" name="description"/><link href="/logo192.png" rel="apple-touch-icon"/><link href="/manifest.json" rel="manifest"/><link crossorigin="anonymous" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.4.1/css/bootstrap.min.css" integrity="sha384-Vkoo8x4CGsO3+Hhxv8T/Q5PaXtkKtu6ug5TOeNV6gBiFeWPGFN9MuhOf23Q9Ifjh" rel="stylesheet"/><link rel="stylesheet" href="https://fonts.googleapis.com/icon?family=Material+Icons"/><title>Nessie</title><script defer="defer" src="/static/js/main.afc60113.js"></script><link href="/static/css/main.3d9a3219.css" rel="stylesheet"></head><body><noscript>You need to enable JavaScript to run this app.</noscript><div id="root"></div></body></html>


In [21]:
import requests

r = requests.get("http://nessie:19120/api/v1/config")
print(r.status_code)
print(r.text)


200
{
  "defaultBranch" : "main",
  "maxSupportedApiVersion" : 2
}


In [23]:
import pyarrow.dataset as ds

dataset = ds.dataset("warehouse/yellow_trip_20250911124659/yellow_trip_data", filesystem=s3)
table = dataset.to_table()
table_schema = table.schema


In [24]:
post_schema = arrow_to_iceberg(table_schema)
print(post_schema)


table {
  1: vendor_id: optional int
  2: tpep_pickup_datetime: optional timestamptz
  3: tpep_dropoff_datetime: optional timestamptz
  4: passenger_count: optional long
  5: trip_distance: optional double
  6: ratecode_id: optional long
  7: store_and_fwd_flag: optional string
  8: pu_location_id: optional int
  9: do_location_id: optional int
  10: payment_type: optional long
  11: fare_amount: optional double
  12: extra: optional double
  13: mta_tax: optional double
  14: tip_amount: optional double
  15: tolls_amount: optional double
  16: improvement_surcharge: optional double
  17: total_amount: optional double
  18: congestion_surcharge: optional double
  19: airport_fee: optional double
  20: cbd_congestion_fee: optional double
}


In [29]:
project = catalog.load_table("project-frameworks.project")
project.append(table) 
print("✅ Datos insertados en Iceberg")

ValueError: Mismatch in fields:
┏━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃    ┃ Table field                                  ┃ Dataframe field                                ┃
┡━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│ ✅ │ 1: vendor_id: optional int                   │ 1: vendor_id: optional int                     │
│ ❌ │ 2: tpep_pickup_datetime: optional timestamp  │ 2: tpep_pickup_datetime: optional timestamptz  │
│ ❌ │ 3: tpep_dropoff_datetime: optional timestamp │ 3: tpep_dropoff_datetime: optional timestamptz │
│ ✅ │ 4: passenger_count: optional long            │ 4: passenger_count: optional long              │
│ ✅ │ 5: trip_distance: optional double            │ 5: trip_distance: optional double              │
│ ✅ │ 6: ratecode_id: optional long                │ 6: ratecode_id: optional long                  │
│ ✅ │ 7: store_and_fwd_flag: optional string       │ 7: store_and_fwd_flag: optional string         │
│ ✅ │ 8: pu_location_id: optional int              │ 8: pu_location_id: optional int                │
│ ✅ │ 9: do_location_id: optional int              │ 9: do_location_id: optional int                │
│ ✅ │ 10: payment_type: optional long              │ 10: payment_type: optional long                │
│ ✅ │ 11: fare_amount: optional double             │ 11: fare_amount: optional double               │
│ ✅ │ 12: extra: optional double                   │ 12: extra: optional double                     │
│ ✅ │ 13: mta_tax: optional double                 │ 13: mta_tax: optional double                   │
│ ✅ │ 14: tip_amount: optional double              │ 14: tip_amount: optional double                │
│ ✅ │ 15: tolls_amount: optional double            │ 15: tolls_amount: optional double              │
│ ✅ │ 16: improvement_surcharge: optional double   │ 16: improvement_surcharge: optional double     │
│ ✅ │ 17: total_amount: optional double            │ 17: total_amount: optional double              │
│ ✅ │ 18: congestion_surcharge: optional double    │ 18: congestion_surcharge: optional double      │
│ ✅ │ 19: airport_fee: optional double             │ 19: airport_fee: optional double               │
│ ✅ │ 20: cbd_congestion_fee: optional double      │ 20: cbd_congestion_fee: optional double        │
└────┴──────────────────────────────────────────────┴────────────────────────────────────────────────┘


In [30]:
import pyarrow.compute as pc

def convert_timestamps_naive(table, cols):
    new_cols = {}
    for name in table.schema.names:
        col = table[name]
        if name in cols:
            # Convertir a timestamp sin tz
            new_cols[name] = pc.cast(col, pa.timestamp("us"))
        else:
            new_cols[name] = col
    return pa.table(new_cols)

# Columnas conflictivas
timestamp_cols = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

# Convertir
table_fixed = convert_timestamps_naive(table, timestamp_cols)

In [31]:
project.append(table_fixed)
print("✅ Datos insertados en Iceberg")

✅ Datos insertados en Iceberg


In [32]:
print(table_fixed.schema)

vendor_id: int32
tpep_pickup_datetime: timestamp[us]
tpep_dropoff_datetime: timestamp[us]
passenger_count: int64
trip_distance: double
ratecode_id: int64
store_and_fwd_flag: large_string
pu_location_id: int32
do_location_id: int32
payment_type: int64
fare_amount: double
extra: double
mta_tax: double
tip_amount: double
tolls_amount: double
improvement_surcharge: double
total_amount: double
congestion_surcharge: double
airport_fee: double
cbd_congestion_fee: double


In [33]:
# Cargar la tabla desde el catálogo
project = catalog.load_table("project-frameworks.project")

# Leer los datos en Arrow Table
table_arrow = project.scan().to_arrow()

# Mostrar los primeros 5 registros
print(table_arrow.to_pandas().head())

   vendor_id tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  \
0          1  2025-01-01 00:18:38   2025-01-01 00:26:59              1.0   
1          1  2025-01-01 00:32:40   2025-01-01 00:35:13              1.0   
2          1  2025-01-01 00:44:04   2025-01-01 00:46:01              1.0   
3          2  2025-01-01 00:14:27   2025-01-01 00:20:01              3.0   
4          2  2025-01-01 00:21:34   2025-01-01 00:25:06              3.0   

   trip_distance  ratecode_id store_and_fwd_flag  pu_location_id  \
0           1.60          1.0                  N             229   
1           0.50          1.0                  N             236   
2           0.60          1.0                  N             141   
3           0.52          1.0                  N             244   
4           0.66          1.0                  N             244   

   do_location_id  payment_type  fare_amount  extra  mta_tax  tip_amount  \
0             237             1         10.0    3.5      0

In [37]:
from pyiceberg.catalog import load_catalog

catalog = load_catalog(
    "nessie",
    uri="http://nessie:19120/api/v1",  
    default_branch="main"
)

JSONDecodeError: Expecting value: line 1 column 1 (char 0)