# Getting Started with PyIceberg + Polars

This notebook demonstrates how to query Iceberg tables managed by Nessie using PyIceberg and analyze the data with Polars.

## Setup

First, let's import the required libraries and configure our connection to the Nessie catalog.

In [None]:
import os
import polars as pl
from pyiceberg.catalog import load_catalog

# Configuration from environment variables
NESSIE_URI = os.environ.get("NESSIE_URI", "http://nessie.agartha-catalog.svc.cluster.local:19120/api/v2")
S3_ENDPOINT = os.environ.get("S3_ENDPOINT", "http://minio.agartha-storage.svc.cluster.local:9000")
S3_ACCESS_KEY = os.environ.get("S3_ACCESS_KEY_ID", "minioadmin")
S3_SECRET_KEY = os.environ.get("S3_SECRET_ACCESS_KEY", "minioadmin")
S3_WAREHOUSE = os.environ.get("S3_WAREHOUSE", "s3://agartha-warehouse")

print(f"Nessie URI: {NESSIE_URI}")
print(f"S3 Endpoint: {S3_ENDPOINT}")

## Connect to Nessie Catalog

In [None]:
catalog = load_catalog(
    "nessie",
    **{
        "type": "rest",
        "uri": NESSIE_URI,
        "s3.endpoint": S3_ENDPOINT,
        "s3.access-key-id": S3_ACCESS_KEY,
        "s3.secret-access-key": S3_SECRET_KEY,
        "warehouse": S3_WAREHOUSE,
    }
)

print("Connected to Nessie catalog!")

## List Available Namespaces and Tables

In [None]:
# List namespaces
namespaces = catalog.list_namespaces()
print("Available namespaces:")
for ns in namespaces:
    print(f"  - {ns}")

In [None]:
# List tables in each namespace
for ns in namespaces:
    tables = catalog.list_tables(ns)
    if tables:
        print(f"\nTables in {ns}:")
        for table in tables:
            print(f"  - {table}")

## Load an Iceberg Table into Polars

In [None]:
# Load the GitHub repositories table (adjust table name as needed)
try:
    table = catalog.load_table("raw.github_repositories")
    print(f"Table: {table.name()}")
    print(f"\nSchema:")
    print(table.schema())
except Exception as e:
    print(f"Could not load table: {e}")
    print("Make sure the table exists by running the Dagster pipeline first.")

In [None]:
# Scan the table and convert to Polars DataFrame
try:
    arrow_table = table.scan().to_arrow()
    df = pl.from_arrow(arrow_table)
    print(f"Loaded {len(df)} rows")
    df.head(10)
except Exception as e:
    print(f"Error loading data: {e}")

## Analyze Data with Polars

In [None]:
# Example: Group by language and compute statistics
try:
    language_stats = (
        df.group_by("language")
        .agg([
            pl.count().alias("repo_count"),
            pl.col("stargazers_count").sum().alias("total_stars"),
            pl.col("forks_count").sum().alias("total_forks"),
        ])
        .sort("total_stars", descending=True)
    )
    language_stats
except Exception as e:
    print(f"Error: {e}")

## Query with Column Selection and Filtering

PyIceberg supports predicate pushdown for efficient queries.

In [None]:
# Select specific columns and filter
try:
    # Only fetch specific columns (projection pushdown)
    scan = table.scan(
        selected_fields=("name", "language", "stargazers_count", "forks_count")
    )
    
    df_subset = pl.from_arrow(scan.to_arrow())
    
    # Filter in Polars for repos with > 100 stars
    popular_repos = df_subset.filter(pl.col("stargazers_count") > 100)
    print(f"Repos with > 100 stars: {len(popular_repos)}")
    popular_repos.head(10)
except Exception as e:
    print(f"Error: {e}")

## Alternative: Query via Trino

You can also query tables through Trino for more complex SQL queries.

In [None]:
from trino.dbapi import connect

TRINO_HOST = os.environ.get("TRINO_HOST", "trino.agartha-processing-trino.svc.cluster.local")
TRINO_PORT = int(os.environ.get("TRINO_PORT", "8080"))

try:
    conn = connect(
        host=TRINO_HOST,
        port=TRINO_PORT,
        user="jupyter",
        catalog="agartha",
        schema="raw",
    )
    
    cursor = conn.cursor()
    cursor.execute("SHOW TABLES")
    tables = cursor.fetchall()
    print("Tables available via Trino:")
    for t in tables:
        print(f"  - {t[0]}")
except Exception as e:
    print(f"Could not connect to Trino: {e}")

In [None]:
# Query via Trino and load into Polars
try:
    cursor.execute("""
        SELECT language, COUNT(*) as repo_count, SUM(stargazers_count) as total_stars
        FROM raw.github_repositories
        GROUP BY language
        ORDER BY total_stars DESC
        LIMIT 10
    """)
    
    rows = cursor.fetchall()
    columns = [desc[0] for desc in cursor.description]
    
    df_trino = pl.DataFrame(rows, schema=columns)
    df_trino
except Exception as e:
    print(f"Error: {e}")