<a href="https://colab.research.google.com/github/jetyfr/2019_foss4g-ar_taller_geoserver/blob/master/LAB_Hash_Join_PostgreSQL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# LAB (Google Colab): Hash Join desde cero hasta PostgreSQL

Este notebook está diseñado para que comprendas con profundidad:

1. Qué es un **Hash Join** y por qué es eficiente para joins por igualdad.
2. Cómo funciona internamente (fases **Build** y **Probe**, colisiones, particionado, memoria).
3. Cómo encaja en un motor real como **PostgreSQL**: cuándo lo elige el *planner*, cómo verlo con `EXPLAIN (ANALYZE, BUFFERS)`,
   y cómo influyen índices, cardinalidades y `work_mem`.

Ejecuta las celdas en orden. El entorno de Colab es efímero: si reinicias el runtime, tendrás que reinstalar PostgreSQL.


## 0) Checklist de objetivos

Al final del LAB deberías poder:

- Explicar **Build** y **Probe** con tus propias palabras.
- Implementar un Hash Join en Python (simple) y razonar sobre su complejidad.
- Entender por qué PostgreSQL elige Hash Join vs Nested Loop vs Merge Join.
- Interpretar un plan real con `Hash Join` y `Hash Cond`.
- Relacionar rendimiento con **tamaño**, **selectividad**, **memoria** y **estadísticas**.


In [1]:
# Imports y configuración
import random
import time
import pandas as pd
from collections import defaultdict

random.seed(42)
pd.set_option("display.max_rows", 10)
pd.set_option("display.max_columns", 30)


## 1) Concepto: Hash Join (idea)

Un **Hash Join** suele emplearse cuando el predicado del join es una **igualdad** (por ejemplo `o.customer_id = c.id`).

La idea es:

- **Build**: construir una tabla hash en memoria a partir de la relación más pequeña (por ejemplo `customers`).
- **Probe**: recorrer la relación grande (`orders`) y consultar la tabla hash para encontrar coincidencias.

En condiciones ideales, el coste esperado es aproximadamente **O(n + m)**.


## 2) Implementación mínima en Python (Build + Probe)

Implementaremos un Hash Join in-memory:
- Build: `hash[key] -> lista de filas` (para soportar duplicados)
- Probe: escanear la tabla grande y buscar coincidencias


In [2]:
def hash_join(left_rows, right_rows, left_key, right_key):
    """Hash Join simple:
    - Build sobre right_rows (típicamente la tabla más pequeña)
    - Probe con left_rows (típicamente la tabla más grande)
    Cada fila es un dict.
    """
    # BUILD
    h = defaultdict(list)
    for r in right_rows:
        h[r[right_key]].append(r)

    # PROBE
    out = []
    for l in left_rows:
        for r in h.get(l[left_key], []):
            out.append((l, r))
    return out

customers = [
    {"id": 1, "name": "Ana"},
    {"id": 2, "name": "Luis"},
    {"id": 3, "name": "Marta"},
]

orders = [
    {"id": 10, "customer_id": 1, "amount": 100},
    {"id": 11, "customer_id": 1, "amount": 50},
    {"id": 12, "customer_id": 3, "amount": 200},
]

pairs = hash_join(orders, customers, "customer_id", "id")
pd.DataFrame([{"order_id": l["id"], "customer": r["name"], "amount": l["amount"]} for (l, r) in pairs])


Unnamed: 0,order_id,customer,amount
0,10,Ana,100
1,11,Ana,50
2,12,Marta,200


### Observación clave

- En Build, `customers` se indexa por `id`.
- En Probe, cada `order.customer_id` hace una búsqueda promedio O(1).


## 3) Colisiones: por qué se verifica la igualdad aunque haya hash

Un motor real calcula un hash de la clave para decidir un bucket, pero **siempre** verifica la igualdad real,
porque pueden existir colisiones.

Simularemos buckets fijos: `bucket = hash(key) % num_buckets`.


In [3]:
def build_buckets(rows, key, num_buckets=8):
    buckets = [[] for _ in range(num_buckets)]
    for r in rows:
        hv = hash(r[key])
        b = hv % num_buckets
        buckets[b].append((hv, r))
    return buckets

def probe_buckets(buckets, rows, left_key, right_key, num_buckets=8):
    out = []
    for l in rows:
        hv = hash(l[left_key])
        b = hv % num_buckets
        for hv2, r in buckets[b]:
            if l[left_key] == r[right_key]:
                out.append((l, r))
    return out

buckets = build_buckets(customers, "id", num_buckets=4)
pairs2 = probe_buckets(buckets, orders, "customer_id", "id", num_buckets=4)
pd.DataFrame([{"customer": r["name"], "amount": l["amount"]} for (l, r) in pairs2])


Unnamed: 0,customer,amount
0,Ana,100
1,Ana,50
2,Marta,200


## 4) Comparación de coste: Nested Loop vs Hash Join

Nested loop compara cada fila con cada fila (**O(n·m)**). Hash Join se aproxima a (**O(n+m)**).

Haremos una medición simple para ver órdenes de magnitud. (No es un benchmark científico.)


In [5]:
def nested_loop_join(left_rows, right_rows, left_key, right_key):
    out = []
    for l in left_rows:
        for r in right_rows:
            if l[left_key] == r[right_key]:
                out.append((l, r))
    return out

def make_data(n_customers=5000, n_orders=20000, skew=True):
    customers = [{"id": i, "name": f"c{i}"} for i in range(1, n_customers+1)]
    orders = []
    for oid in range(1, n_orders+1):
        if skew:
            cid = int((random.random() ** 3) * (n_customers-1)) + 1
        else:
            cid = random.randint(1, n_customers)
        orders.append({"id": oid, "customer_id": cid, "amount": random.randint(1, 500)})
    return customers, orders

def time_one(fn, *args):
    t0 = time.time()
    fn(*args)
    return time.time() - t0

cust_s, ord_s = make_data(5000, 20000, skew=True)

t_hash = time_one(hash_join, ord_s, cust_s, "customer_id", "id")
t_nl   = time_one(nested_loop_join, ord_s, cust_s, "customer_id", "id")

pd.DataFrame([
    {"algorithm": "hash_join", "seconds": t_hash},
    {"algorithm": "nested_loop_join", "seconds": t_nl},
]).sort_values("seconds")


Unnamed: 0,algorithm,seconds
0,hash_join,0.013942
1,nested_loop_join,5.653623


## 5) Transición a PostgreSQL: ver Hash Join real con EXPLAIN ANALYZE

En Colab instalaremos PostgreSQL localmente y haremos:

- Crear tablas
- Cargar datos
- `ANALYZE`
- Ejecutar `EXPLAIN (ANALYZE, BUFFERS)` para observar `Hash Join`

Esto te permite conectar el concepto con el motor real.


In [6]:
# Instalar PostgreSQL (Colab / Debian-based)
!apt-get update -qq
!apt-get install -y postgresql postgresql-contrib -qq
!service postgresql start


W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
Preconfiguring packages ...
Selecting previously unselected package logrotate.
(Reading database ... 117528 files and directories currently installed.)
Preparing to unpack .../00-logrotate_3.19.0-1ubuntu1.1_amd64.deb ...
Unpacking logrotate (3.19.0-1ubuntu1.1) ...
Selecting previously unselected package netbase.
Preparing to unpack .../01-netbase_6.3_all.deb ...
Unpacking netbase (6.3) ...
Selecting previously unselected package libcommon-sense-perl:amd64.
Preparing to unpack .../02-libcommon-sense-perl_3.75-2build1_amd64.deb ...
Unpacking libcommon-sense-perl:amd64 (3.75-2build1) ...
Selecting previously unselected package libjson-perl.
Preparing to unpack .../03-libjson-perl_4.04000-1_all.deb ...
Unpacking libjson-perl (4.04000-1) ...
Selecting previously unselected package libtypes-serialiser-perl

## 6) Crear usuario y base de datos del LAB

Usaremos:
- usuario: `lab`
- password: `lab`
- base: `labdb`


In [7]:
!sudo -u postgres psql -c "DO $$ BEGIN CREATE ROLE lab LOGIN PASSWORD 'lab' SUPERUSER; EXCEPTION WHEN duplicate_object THEN RAISE NOTICE 'role exists'; END $$;"
!sudo -u postgres psql -c "DO $$ BEGIN CREATE DATABASE labdb OWNER lab; EXCEPTION WHEN duplicate_database THEN RAISE NOTICE 'db exists'; END $$;"


ERROR:  syntax error at or near "3330"
LINE 1: DO 3330 BEGIN CREATE ROLE lab LOGIN PASSWORD 'lab' SUPERUSER...
           ^
ERROR:  syntax error at or near "3333"
LINE 1: DO 3333 BEGIN CREATE DATABASE labdb OWNER lab; EXCEPTION WHE...
           ^


## 7) Conectar desde Python con psycopg2


In [9]:
!pip -q install psycopg2-binary
import psycopg2
from psycopg2.extras import execute_values

conn = psycopg2.connect(host="localhost", dbname="labdb", user="lab", password="lab")
conn.autocommit = True
cur = conn.cursor()


## 8) Crear tablas y cargar datos

Escenario:
- `customers` relativamente pequeña
- `orders` grande

Después hacemos `ANALYZE` para que el planner tenga estadísticas.


In [10]:
cur.execute("DROP TABLE IF EXISTS orders;")
cur.execute("DROP TABLE IF EXISTS customers;")

cur.execute("""
CREATE TABLE customers (
  id   int PRIMARY KEY,
  name text NOT NULL
);
""")

cur.execute("""
CREATE TABLE orders (
  id          bigserial PRIMARY KEY,
  customer_id int NOT NULL,
  amount      int NOT NULL
);
""")

N_CUSTOMERS = 100_000
N_ORDERS    = 800_000

customers_rows = [(i, f"c{i}") for i in range(1, N_CUSTOMERS+1)]
execute_values(cur, "INSERT INTO customers (id, name) VALUES %s", customers_rows, page_size=10_000)

orders_rows = []
for _ in range(N_ORDERS):
    cid = int((random.random() ** 3) * (N_CUSTOMERS-1)) + 1  # sesgo
    amt = random.randint(1, 500)
    orders_rows.append((cid, amt))

execute_values(cur, "INSERT INTO orders (customer_id, amount) VALUES %s", orders_rows, page_size=10_000)

cur.execute("ANALYZE;")
print("Loaded:", N_CUSTOMERS, "customers,", N_ORDERS, "orders")


Loaded: 100000 customers, 800000 orders


## 9) Ver el plan: Hash Join

Buscamos en la salida:
- `Hash Join`
- `Hash Cond: (o.customer_id = c.id)`
- `Hash` (build) sobre `customers`
- `Seq Scan` sobre `orders` (habitual en joins amplios)


In [11]:
def explain(query: str) -> str:
    cur.execute("EXPLAIN (ANALYZE, BUFFERS, VERBOSE, FORMAT TEXT) " + query)
    return "\n".join(r[0] for r in cur.fetchall())

q = """
SELECT c.name, o.amount
FROM orders o
JOIN customers c
  ON o.customer_id = c.id
WHERE o.amount >= 450;
"""

plan = explain(q)
print(plan[:2200])


Hash Join  (cost=3280.00..18978.31 rows=84685 width=10) (actual time=47.448..207.967 rows=81801 loops=1)
  Output: c.name, o.amount
  Inner Unique: true
  Hash Cond: (o.customer_id = c.id)
  Buffers: shared hit=4866, temp read=322 written=322
  ->  Seq Scan on public.orders o  (cost=0.00..14325.00 rows=84685 width=8) (actual time=0.019..101.588 rows=81801 loops=1)
        Output: o.amount, o.customer_id
        Filter: (o.amount >= 450)
        Rows Removed by Filter: 718199
        Buffers: shared hit=4325
  ->  Hash  (cost=1541.00..1541.00 rows=100000 width=10) (actual time=47.252..47.255 rows=100000 loops=1)
        Output: c.name, c.id
        Buckets: 131072  Batches: 2  Memory Usage: 3179kB
        Buffers: shared hit=541, temp written=195
        ->  Seq Scan on public.customers c  (cost=0.00..1541.00 rows=100000 width=10) (actual time=0.006..17.989 rows=100000 loops=1)
              Output: c.name, c.id
              Buffers: shared hit=541
Planning:
  Buffers: shared hit=67
Pl

## 10) Comparar con Nested Loop (índice + consulta selectiva)

Si el join/filtro es muy selectivo, PostgreSQL puede preferir Nested Loop con índice.


In [12]:
cur.execute("CREATE INDEX IF NOT EXISTS idx_orders_customer_id ON orders(customer_id);")
cur.execute("ANALYZE orders;")

q_selective = """
SELECT c.name, o.amount
FROM customers c
JOIN orders o
  ON o.customer_id = c.id
WHERE c.id = 1;
"""

plan2 = explain(q_selective)
print(plan2[:2200])


Nested Loop  (cost=215.33..4927.54 rows=16853 width=10) (actual time=2.848..20.340 rows=17329 loops=1)
  Output: c.name, o.amount
  Buffers: shared hit=4245 read=17
  ->  Index Scan using customers_pkey on public.customers c  (cost=0.29..8.31 rows=1 width=10) (actual time=0.009..0.015 rows=1 loops=1)
        Output: c.id, c.name
        Index Cond: (c.id = 1)
        Buffers: shared hit=3
  ->  Bitmap Heap Scan on public.orders o  (cost=215.04..4750.70 rows=16853 width=8) (actual time=2.830..15.967 rows=17329 loops=1)
        Output: o.id, o.customer_id, o.amount
        Recheck Cond: (o.customer_id = 1)
        Heap Blocks: exact=4242
        Buffers: shared hit=4242 read=17
        ->  Bitmap Index Scan on idx_orders_customer_id  (cost=0.00..210.82 rows=16853 width=0) (actual time=1.888..1.888 rows=17329 loops=1)
              Index Cond: (o.customer_id = 1)
              Buffers: shared read=17
Planning:
  Buffers: shared hit=16 read=1
Planning Time: 0.368 ms
Execution Time: 21.886 

## 11) `work_mem` y Hash Join: efecto sobre spill/batches

Reducimos y aumentamos `work_mem` para observar cambios.


In [13]:
q_wide = """
SELECT c.name, sum(o.amount) AS total_amount
FROM orders o
JOIN customers c
  ON o.customer_id = c.id
GROUP BY c.name
ORDER BY total_amount DESC
LIMIT 10;
"""

cur.execute("SET work_mem = '4MB';")
plan_low = explain(q_wide)

cur.execute("SET work_mem = '256MB';")
plan_high = explain(q_wide)

print("=== work_mem=4MB ===")
print(plan_low[:1600])
print("\n=== work_mem=256MB ===")
print(plan_high[:1600])


=== work_mem=4MB ===
Limit  (cost=77318.49..77318.51 rows=10 width=14) (actual time=1761.254..1767.273 rows=10 loops=1)
  Output: c.name, (sum(o.amount))
  Buffers: shared hit=6058, temp read=4124 written=5942
  ->  Sort  (cost=77318.49..77568.49 rows=100000 width=14) (actual time=1761.252..1767.269 rows=10 loops=1)
        Output: c.name, (sum(o.amount))
        Sort Key: (sum(o.amount)) DESC
        Sort Method: top-N heapsort  Memory: 25kB
        Buffers: shared hit=6058, temp read=4124 written=5942
        ->  Finalize GroupAggregate  (cost=49822.56..75157.52 rows=100000 width=14) (actual time=1521.726..1745.038 rows=97765 loops=1)
              Output: c.name, sum(o.amount)
              Group Key: c.name
              Buffers: shared hit=6058, temp read=4124 written=5942
              ->  Gather Merge  (cost=49822.56..73157.52 rows=200000 width=14) (actual time=1521.704..1677.174 rows=230157 loops=1)
                    Output: c.name, (PARTIAL sum(o.amount))
                   

## 12) Forzar/desactivar joins (solo para aprendizaje)

Puedes desactivar heurísticamente operadores para ver alternativas:

- `SET enable_hashjoin = off;`
- `SET enable_nestloop = off;`
- `SET enable_mergejoin = off;`


In [14]:
cur.execute("SET enable_hashjoin = off;")
alt_plan = explain(q)
print(alt_plan[:2200])

cur.execute("RESET enable_hashjoin;")


Merge Join  (cost=20865.04..25449.03 rows=80107 width=10) (actual time=91.520..150.749 rows=81801 loops=1)
  Output: c.name, o.amount
  Merge Cond: (c.id = o.customer_id)
  Buffers: shared hit=5141
  ->  Index Scan using customers_pkey on public.customers c  (cost=0.29..3148.29 rows=100000 width=10) (actual time=0.011..18.020 rows=100000 loops=1)
        Output: c.id, c.name
        Buffers: shared hit=816
  ->  Sort  (cost=20849.57..21049.84 rows=80107 width=8) (actual time=91.496..104.919 rows=81801 loops=1)
        Output: o.amount, o.customer_id
        Sort Key: o.customer_id
        Sort Method: quicksort  Memory: 6907kB
        Buffers: shared hit=4325
        ->  Seq Scan on public.orders o  (cost=0.00..14325.00 rows=80107 width=8) (actual time=0.016..67.302 rows=81801 loops=1)
              Output: o.amount, o.customer_id
              Filter: (o.amount >= 450)
              Rows Removed by Filter: 718199
              Buffers: shared hit=4325
Planning:
  Buffers: shared hit=1

## 13) Cierre

Has visto:
- Hash Join en Python (mecanismo esencial)
- Colisiones y buckets
- Comparación con nested loop
- Hash Join real en PostgreSQL con `EXPLAIN (ANALYZE, BUFFERS)`
- Efecto de índices, selectividad y `work_mem`

Ejercicios sugeridos:
1. Cambia `N_CUSTOMERS` y `N_ORDERS` y observa cambios en el plan.
2. Elimina el índice y vuelve a comparar.
3. Prueba filtros más/menos selectivos en `orders.amount`.
4. Ajusta `work_mem` y busca cambios de spill/batches.
