# SQLAlchemy ORM and Columnar Data Formats

Demonstrates SQLAlchemy 2.0 ORM with relationships and complex queries,
plus high-performance columnar data I/O with PyArrow/Parquet and HDF5.

**Libraries:**
- [SQLAlchemy 2.0](https://docs.sqlalchemy.org/en/20/) — Python SQL toolkit and ORM with modern type hints
- [PyArrow](https://arrow.apache.org/docs/python/) — Apache Arrow columnar in-memory format
- [Parquet](https://arrow.apache.org/docs/python/parquet.html) — Columnar storage with predicate pushdown
- [h5py](https://docs.h5py.org/) — HDF5 hierarchical scientific data format

In [None]:
import os
import time
import tempfile
import datetime
from typing import Optional

import numpy as np
import pandas as pd

from sqlalchemy import (
    create_engine, select, func, case, desc,
    Integer, String, Float, DateTime, Boolean, ForeignKey,
)
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship, Session

import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.compute as pc
import h5py

rng = np.random.default_rng(seed=42)
print("All imports OK")

## SQLAlchemy 2.0 ORM: Define Models

SQLAlchemy 2.0 uses `Mapped` type annotations — models are type-safe and IDE-friendly.

In [None]:
class Base(DeclarativeBase):
    pass

class Department(Base):
    __tablename__ = 'department'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(100), unique=True)
    location: Mapped[Optional[str]] = mapped_column(String(100))
    employees: Mapped[list['Employee']] = relationship(back_populates='department', cascade='all, delete-orphan')

class Employee(Base):
    __tablename__ = 'employee'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    first_name: Mapped[str] = mapped_column(String(60))
    last_name: Mapped[str] = mapped_column(String(60))
    email: Mapped[str] = mapped_column(String(120), unique=True)
    salary: Mapped[float] = mapped_column(Float)
    hire_date: Mapped[datetime.date] = mapped_column(DateTime)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    department_id: Mapped[int] = mapped_column(ForeignKey('department.id'))
    department: Mapped['Department'] = relationship(back_populates='employees')
    projects: Mapped[list['ProjectAssignment']] = relationship(back_populates='employee', cascade='all, delete-orphan')

class Project(Base):
    __tablename__ = 'project'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    name: Mapped[str] = mapped_column(String(120))
    budget: Mapped[float] = mapped_column(Float)
    status: Mapped[str] = mapped_column(String(20), default='active')
    assignments: Mapped[list['ProjectAssignment']] = relationship(back_populates='project', cascade='all, delete-orphan')

class ProjectAssignment(Base):
    __tablename__ = 'project_assignment'
    id: Mapped[int] = mapped_column(Integer, primary_key=True)
    employee_id: Mapped[int] = mapped_column(ForeignKey('employee.id'))
    project_id: Mapped[int] = mapped_column(ForeignKey('project.id'))
    role: Mapped[str] = mapped_column(String(60))
    hours_allocated: Mapped[float] = mapped_column(Float, default=0.0)
    employee: Mapped['Employee'] = relationship(back_populates='projects')
    project: Mapped['Project'] = relationship(back_populates='assignments')

print("Models: Department, Employee, Project, ProjectAssignment")

In [None]:
engine = create_engine('sqlite:///:memory:', echo=False)
Base.metadata.create_all(engine)

dept_data = [('Engineering','San Francisco'),('Marketing','New York'),
             ('Finance','Chicago'),('Operations','Austin'),('Research','Boston')]
first_names = ['Alice','Bob','Carol','David','Eve','Frank','Grace','Henry',
               'Iris','Jack','Kate','Leo','Mia','Noah','Olivia','Paul','Quinn','Rachel']
last_names = ['Smith','Jones','Brown','Williams','Taylor','Davies','Evans','Wilson']
project_names = ['Project Apollo','Market Expansion','Cost Reduction','Digital Transformation',
                 'Customer Portal','Data Platform','Mobile App','AI Initiative']

with Session(engine) as session:
    departments = [Department(name=n, location=l) for n, l in dept_data]
    session.add_all(departments)
    session.flush()

    employees = []
    base_date = datetime.date(2018, 1, 1)
    salary_bases = {'Engineering': 110_000, 'Marketing': 85_000, 'Finance': 95_000,
                    'Operations': 80_000, 'Research': 105_000}
    for i in range(50):
        dept = rng.choice(departments)
        base = salary_bases[dept.name]
        emp = Employee(
            first_name=rng.choice(first_names), last_name=rng.choice(last_names),
            email=f"emp{i:03d}@co.com",
            salary=float(rng.normal(base, base * 0.12)),
            hire_date=datetime.datetime.combine(
                base_date + datetime.timedelta(days=int(rng.integers(0, 365*6))), datetime.time.min
            ),
            is_active=bool(rng.random() > 0.1),
            department_id=dept.id,
        )
        employees.append(emp)
    session.add_all(employees)
    session.flush()

    projects = [Project(name=n, budget=float(rng.uniform(50_000, 500_000)),
                        status=rng.choice(['active','completed','active','active']))
                for n in project_names]
    session.add_all(projects)
    session.flush()

    for project in projects:
        n_assigned = int(rng.integers(3, 9))
        assigned = rng.choice(employees, size=min(n_assigned, len(employees)), replace=False)
        for emp in assigned:
            session.add(ProjectAssignment(
                employee_id=emp.id, project_id=project.id,
                role=rng.choice(['Lead','Developer','Analyst','Designer','Manager']),
                hours_allocated=float(rng.uniform(40, 400)),
            ))
    session.commit()

print(f"Inserted: 5 departments, 50 employees, 8 projects")

## Complex ORM Queries

In [None]:
# Department headcount and salary stats
with Session(engine) as session:
    stmt = (
        select(
            Department.name, Department.location,
            func.count(Employee.id).label('headcount'),
            func.round(func.avg(Employee.salary), 0).label('avg_salary'),
            func.round(func.max(Employee.salary), 0).label('max_salary'),
        )
        .join(Employee)
        .where(Employee.is_active == True)
        .group_by(Department.id)
        .order_by(desc('avg_salary'))
    )
    rows = session.execute(stmt).all()

dept_df = pd.DataFrame(rows, columns=['Department', 'Location', 'Headcount', 'Avg Salary', 'Max Salary'])
dept_df

In [None]:
# Top 2 earners per department using window functions
with Session(engine) as session:
    subq = (
        select(
            Employee.id, Employee.first_name, Employee.last_name, Employee.salary,
            Department.name.label('dept_name'),
            func.rank().over(partition_by=Department.id, order_by=desc(Employee.salary)).label('salary_rank'),
        )
        .join(Department)
        .where(Employee.is_active == True)
        .subquery()
    )
    top2 = session.execute(select(subq).where(subq.c.salary_rank <= 2).order_by(subq.c.dept_name)).all()

top2_df = pd.DataFrame(top2, columns=['id','first','last','salary','dept','rank'])
top2_df['name'] = top2_df['first'] + ' ' + top2_df['last']
top2_df[['dept','rank','name','salary']]

In [None]:
# Salary bands using CASE expression
with Session(engine) as session:
    salary_band = case(
        (Employee.salary >= 120_000, 'Senior (≥$120k)'),
        (Employee.salary >= 90_000, 'Mid ($90k–$120k)'),
        else_='Junior (<$90k)',
    )
    stmt = (
        select(salary_band.label('band'),
               func.count(Employee.id).label('count'),
               func.round(func.avg(Employee.salary), 0).label('avg'))
        .group_by(salary_band)
        .order_by(desc('avg'))
    )
    rows = session.execute(stmt).all()

pd.DataFrame(rows, columns=['Band', 'Count', 'Avg Salary'])

## PyArrow: Schema-First Columnar Data

PyArrow tables are typed, immutable column stores. The `compute` module provides
vectorized operations that work directly on Arrow memory — no pandas needed.

In [None]:
n_rows = 200_000
currencies = ['USD', 'EUR', 'GBP', 'JPY']
categories = ['Electronics', 'Groceries', 'Travel', 'Dining', 'Healthcare', 'Entertainment']

amounts = rng.lognormal(mean=3.5, sigma=1.2, size=n_rows)
is_fraud = rng.random(n_rows) < 0.02
amounts[is_fraud] *= rng.uniform(3, 20, is_fraud.sum())

start_ms = pd.Timestamp('2024-01-01').value // 10**6
end_ms = pd.Timestamp('2024-12-31').value // 10**6

table = pa.table({
    'transaction_id': pa.array(np.arange(n_rows, dtype=np.int64)),
    'customer_id': pa.array(rng.integers(1, 10_001, n_rows, dtype=np.int32)),
    'amount': pa.array(amounts),
    'currency': pa.array(rng.choice(currencies, n_rows)).dictionary_encode(),
    'category': pa.array(rng.choice(categories, n_rows)).dictionary_encode(),
    'timestamp': pa.array(rng.integers(start_ms, end_ms, n_rows).astype('int64'), type=pa.timestamp('ms')),
    'is_fraud': pa.array(is_fraud),
})

print(f"Table: {table.num_rows:,} rows × {table.num_columns} columns")
print(f"\nArrow compute (no pandas):")
print(f"  Total amount : ${pc.sum(table['amount']).as_py():,.2f}")
print(f"  Fraud rate   : {pc.sum(table['is_fraud']).as_py() / n_rows:.2%}")
print(f"  Max amount   : ${pc.max(table['amount']).as_py():.2f}")

## Parquet: Partitioned Write and Predicate Pushdown

Predicate pushdown lets Parquet skip row groups that can't contain matching rows,
dramatically reducing I/O for filtered reads.

In [None]:
with tempfile.TemporaryDirectory() as tmpdir:
    parquet_path = os.path.join(tmpdir, 'transactions.parquet')

    # Write
    t0 = time.perf_counter()
    pq.write_table(table, parquet_path, compression='snappy', row_group_size=50_000, write_statistics=True)
    write_ms = (time.perf_counter() - t0) * 1000

    size_mb = os.path.getsize(parquet_path) / 1e6
    meta = pq.read_metadata(parquet_path)

    print(f"Write: {write_ms:.0f} ms  |  Size: {size_mb:.2f} MB  |  Row groups: {meta.num_row_groups}")

    # Predicate pushdown: read only fraud
    t0 = time.perf_counter()
    fraud_table = pq.read_table(parquet_path,
                                 columns=['transaction_id', 'amount', 'category'],
                                 filters=[('is_fraud', '=', True)])
    read_ms = (time.perf_counter() - t0) * 1000

    print(f"\nPredicate pushdown (is_fraud=True):")
    print(f"  Rows returned: {fraud_table.num_rows:,} ({fraud_table.num_rows/n_rows:.2%})")
    print(f"  Read time    : {read_ms:.0f} ms  (skipped non-matching row groups)")

    # Performance comparison: CSV vs Parquet vs Feather
    sample = table.slice(0, 100_000).to_pandas()
    fmt_results = []
    for fmt, write_fn, read_fn, ext in [
        ('CSV',     lambda p: sample.to_csv(p, index=False),  pd.read_csv,     'csv'),
        ('Parquet', lambda p: sample.to_parquet(p),           pd.read_parquet, 'parquet'),
        ('Feather', lambda p: sample.to_feather(p),           pd.read_feather, 'feather'),
    ]:
        p = os.path.join(tmpdir, f'bench.{ext}')
        t0 = time.perf_counter(); write_fn(p); w = (time.perf_counter()-t0)*1000
        t0 = time.perf_counter(); read_fn(p); r = (time.perf_counter()-t0)*1000
        fmt_results.append({'Format': fmt, 'Write (ms)': w, 'Read (ms)': r, 'Size (KB)': os.path.getsize(p)/1024})

pd.DataFrame(fmt_results).round(1)

## HDF5: Hierarchical Numerical Storage

HDF5 stores large numerical arrays in a file-system-like hierarchy with metadata attributes.
It supports chunked storage and gzip compression for efficient access.

In [None]:
import tempfile

n_paths, n_steps = 500, 252
mu, sigma = 0.08, 0.20
dt = 1 / n_steps
W = rng.standard_normal((n_paths, n_steps))
paths = 100.0 * np.exp(np.cumsum((mu - 0.5*sigma**2)*dt + sigma*np.sqrt(dt)*W, axis=1))
final_prices = paths[:, -1]

with tempfile.NamedTemporaryFile(suffix='.h5', delete=False) as f:
    hdf5_path = f.name

with h5py.File(hdf5_path, 'w') as f:
    f.attrs['description'] = 'Monte Carlo GBM simulation'

    sim = f.create_group('simulation')
    ds = sim.create_dataset('paths', data=paths, compression='gzip', compression_opts=4, chunks=True)
    ds.attrs['shape'] = f'{n_paths} paths × {n_steps} steps'
    sim.create_dataset('time_axis', data=np.linspace(0, 1, n_steps))

    stats_grp = f.create_group('statistics')
    stats_grp.create_dataset('final_prices', data=final_prices)
    stats_grp.attrs['mean_final'] = float(final_prices.mean())
    stats_grp.attrs['std_final'] = float(final_prices.std())
    stats_grp.attrs['var_95'] = float(np.percentile(final_prices, 5))
    stats_grp.attrs['var_99'] = float(np.percentile(final_prices, 1))

size_mb = os.path.getsize(hdf5_path) / 1e6

with h5py.File(hdf5_path, 'r') as f:
    s = f['statistics']
    print(f"Monte Carlo GBM: μ={mu}, σ={sigma}, S₀=100")
    print(f"File size       : {size_mb:.2f} MB")
    print(f"Mean final price: ${s.attrs['mean_final']:.2f}")
    print(f"Std final price : ${s.attrs['std_final']:.2f}")
    print(f"VaR 95%         : ${s.attrs['var_95']:.2f}")
    print(f"VaR 99%         : ${s.attrs['var_99']:.2f}")

    # Demonstrate partial read (only 10 of 500 paths)
    partial = f['simulation/paths'][:10, :]
    print(f"\nSlice read shape: {partial.shape}  ({partial.nbytes/1024:.1f} KB)")

os.unlink(hdf5_path)

In [None]:
import matplotlib.pyplot as plt

t_sim = np.linspace(0, 1, n_steps)
fig, axes = plt.subplots(1, 2, figsize=(13, 5))

# Sample paths
for path in paths[:50]:
    axes[0].plot(t_sim, path, linewidth=0.4, alpha=0.4, color='steelblue')
axes[0].plot(t_sim, paths.mean(axis=0), linewidth=2.5, color='red', label='Mean path')
axes[0].axhline(y=100, color='black', linestyle='--', linewidth=0.8, alpha=0.5)
axes[0].set_xlabel("Time (years)")
axes[0].set_ylabel("Price")
axes[0].set_title(f"Monte Carlo GBM — {n_paths} paths")
axes[0].legend()

# Final price distribution
axes[1].hist(final_prices, bins=40, color='steelblue', alpha=0.75, density=True, edgecolor='white')
axes[1].axvline(x=np.percentile(final_prices, 5), color='red', linestyle='--', linewidth=2, label='VaR 95%')
axes[1].axvline(x=np.percentile(final_prices, 1), color='darkred', linestyle='--', linewidth=2, label='VaR 99%')
axes[1].axvline(x=final_prices.mean(), color='green', linestyle='-', linewidth=2, label='Mean')
axes[1].set_xlabel("Final Price")
axes[1].set_ylabel("Density")
axes[1].set_title("Distribution of Final Prices")
axes[1].legend()

plt.tight_layout()
plt.show()

---
## Summary

1. **SQLAlchemy 2.0** — `Mapped` type annotations make models IDE-friendly; relationships handle JOIN complexity; window functions in SQL work via SQLAlchemy's `over()`
2. **PyArrow** — Zero-copy columnar in-memory format; `pc.compute` functions are vectorized and faster than pandas for simple aggregations
3. **Parquet** — Columnar storage with predicate pushdown skips irrelevant row groups; Snappy compression reduces size by 60–80% vs CSV
4. **HDF5** — Chunked, compressed hierarchical storage for large numerical arrays; supports partial reads without loading full dataset

**Format selection guide:**
| Format | Best for |
|--------|----------|
| CSV | Human-readable exchange, small datasets |
| Parquet | Analytics workloads, columnar queries, data lakes |
| Feather | Fast in-process pandas exchange |
| HDF5 | Hierarchical scientific data, large numerical arrays |

**Resources:**
- [SQLAlchemy 2.0 ORM Guide](https://docs.sqlalchemy.org/en/20/orm/quickstart.html)
- [PyArrow Cookbook](https://arrow.apache.org/cookbook/py/)
- [h5py Quick Start](https://docs.h5py.org/en/stable/quick.html)