# Part 2 — Polars Lazy Pipeline (Didactic Version)

This notebook demonstrates advanced Polars features that make it faster and more expressive than pandas:

| Feature | Description |
|---------|-------------|
| **Lazy Evaluation** | Polars builds a query plan and optimizes before executing |
| **Expression API** | Composable expressions (`pl.col`, `pl.when`, `pl.lit`) compiled to fast Rust code |
| **Parallel Execution** | Automatic parallelization across CPU cores — no GIL |
| **Method Chaining** | Fluent, readable pipelines that never mutate the original data |

> **Key difference from Part 1:** Here we use `.lazy()` to defer computation, and `.collect()` to trigger the optimized plan.

In [None]:
import time
import tracemalloc
import yaml
import polars as pl
from datetime import date
from pathlib import Path

from utils import get_db_engine, load_sql_file

# Start performance tracking
tracemalloc.start()
start_time = time.time()

# Configuration
BASE_DIR = Path.cwd()
CREDS_PATH = Path("C:/Users/roger.lloret/Documents/creds/creds_dmbi.yml")
SQL_DIR = BASE_DIR / "sql"
TABLE_NAME = "gen_kpi_ft"

## Helper Functions

The key difference here: `read_from_database_pl` returns a **LazyFrame** (via `.lazy()`), not a DataFrame. No computation happens — Polars just records the plan.

In [None]:
def read_from_database_pl(engine, query: str) -> pl.LazyFrame:
    """
    Reads from MySQL and returns a **LazyFrame**.
    
    Why LazyFrame?
    - No computation happens yet — Polars just records what to do.
    - Subsequent filters, selects, and joins are optimized together.
    """
    uri = engine.url.render_as_string(hide_password=False)
    # .lazy() converts the eager DataFrame to a LazyFrame for optimization
    return pl.read_database_uri(query, uri=uri).lazy()


def write_to_database_pl(engine, df: pl.DataFrame, table_name: str, if_exists: str = "append"):
    """Writes a Polars DataFrame to MySQL (converts to pandas for to_sql)."""
    with engine.begin() as connection:
        df.to_pandas().to_sql(table_name, con=connection, if_exists=if_exists, index=False)

## Reusable Polars Expressions

Expressions are **first-class objects** in Polars. You define them once and reuse them across different pipelines — like SQL column definitions.

In [None]:
# Expression: classify invoice amounts into categories
CATEGORY_EXPR = (
    pl.when(pl.col("total_import_euros") < 50)
      .then(pl.lit("Less than 50"))
      .when(pl.col("total_import_euros").is_between(50, 100))
      .then(pl.lit("Between 50 and 100"))
      .when(pl.col("total_import_euros") > 100)
      .then(pl.lit("Greater than 100"))
      .otherwise(pl.lit("Invalid Input"))
      .alias("category")
)

# Expression: cast contract_id to Int64 for safe joins
CAST_CONTRACT_ID = pl.col("contract_id").cast(pl.Int64, strict=False)

print("Expressions defined. They are NOT executed yet — just stored as objects.")

## 1. Load Credentials

In [None]:
with open(CREDS_PATH, "r") as f:
    creds = yaml.safe_load(f)

db_engine = get_db_engine(creds["data_warehouse"])
print("Database engine created.")

## 2. Extract — Returns LazyFrames (nothing is computed yet)

Notice the function returns `pl.LazyFrame`, not `pl.DataFrame`. The SQL query runs, but all subsequent operations are deferred.

In [None]:
invoices_sql = load_sql_file(SQL_DIR / "invoices_main.sql")
contracts_sql = load_sql_file(SQL_DIR / "contracts_main.sql")

invoices_lf = read_from_database_pl(db_engine, invoices_sql)
contracts_lf = read_from_database_pl(db_engine, contracts_sql)

print(f"Type of invoices_lf: {type(invoices_lf)}")
print(f"Type of contracts_lf: {type(contracts_lf)}")
print("Data loaded as LazyFrames — no transforms executed yet.")

## 3. Transform — Build the Lazy Query Plan

**Polars Power #1:** Multiple transforms in one `.with_columns()` — Polars can execute them in parallel.

**Polars Power #2:** Chained lazy operations — `unique()` + `with_columns()` are recorded in the plan, not executed yet.

**Polars Power #3:** Lazy join — the optimizer may reorder or push filters before the join.

In [None]:
# Multiple transforms in one .with_columns() — reusable expressions!
invoices_lf = invoices_lf.with_columns(
    CAST_CONTRACT_ID,     # cast contract_id
    CATEGORY_EXPR,        # classify invoices
)

# Chained lazy operations
contracts_lf = (
    contracts_lf
    .unique()                                  # deduplicate
    .with_columns(CAST_CONTRACT_ID)            # cast contract_id
)

# Lazy join
merged_lf = invoices_lf.join(contracts_lf, on="contract_id", how="left")

print("Query plan built. Still no computation has been triggered.")
print(f"Type of merged_lf: {type(merged_lf)}")

## 4. Create KPIs — Still Lazy, Still Optimizable

In [None]:
# KPI 1: Total amount by category & client type
kpi_category_lf = (
    merged_lf
    .group_by("category", "client_type_description")
    .agg(pl.col("total_import_euros").sum().alias("kpi_value"))
    .with_columns(
        (
            pl.lit("Total amount in euros of the customers with invoices ")
            + pl.col("category")
            + pl.lit(" euros and ")
            + pl.col("client_type_description")
        ).alias("kpi_name")
    )
    .select("kpi_name", "kpi_value")
)

# KPI 2: Invoice count by document type
kpi_doctype_lf = (
    merged_lf
    .group_by("document_type_description")
    .agg(pl.col("total_import_euros").count().cast(pl.Float64).alias("kpi_value"))
    .with_columns(
        (
            pl.lit("Number of invoices of invoice type ")
            + pl.col("document_type_description")
        ).alias("kpi_name")
    )
    .select("kpi_name", "kpi_value")
)

print("KPI query plans built. Still lazy — nothing computed.")

## 5. Collect — Execute the Optimized Plan

**Polars Power #4:** A single `.collect()` triggers the optimized plan. The engine sees the full pipeline and optimizes before running.

In [None]:
kpi_category_df = kpi_category_lf.collect()
kpi_doctype_df = kpi_doctype_lf.collect()

print("KPI 1 — Amount by Category:")
print(kpi_category_df)
print("\nKPI 2 — Document Type Count:")
print(kpi_doctype_df)

## 6. Final Assembly & Load to Database

In [None]:
output_df = (
    pl.concat([kpi_category_df, kpi_doctype_df])
    .with_columns(pl.lit(date.today()).alias("kpi_date"))
    .select("kpi_date", "kpi_name", "kpi_value")
)

# Quick preview
print("--- Output Preview ---")
print(output_df)
print(f"\nSchema: {output_df.schema}")
print(f"Rows:   {output_df.height}")

# Write to database
write_to_database_pl(db_engine, output_df, TABLE_NAME)
print("\nETL Process completed successfully.")

## Performance Metrics

In [None]:
elapsed = time.time() - start_time
_, peak_memory = tracemalloc.get_traced_memory()
tracemalloc.stop()
print(f"Polars (lazy) pipeline executed in {elapsed:.2f} seconds.")
print(f"Peak RAM usage: {peak_memory / (1024 * 1024):.2f} MB")