# Capítulo 6: Agregações Hash em Paralelo

## Introdução

Agregações como `GROUP BY` são operações fundamentais em SQL. O DuckDB usa **hash tables vetorizadas** e técnicas de **scatter/gather** para maximizar o throughput, processando múltiplas chaves simultaneamente.

### Objetivos:
1. Entender hash aggregation tradicional vs vetorizada
2. Implementar técnicas de scatter/gather
3. Otimizar para diferentes cardinalidades
4. Entender particionamento para paralelismo
5. Comparar performance com Pandas

In [None]:
!pip install duckdb pandas numpy matplotlib -q

In [None]:
import duckdb
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
from collections import defaultdict

plt.rcParams['figure.figsize'] = (12, 6)

## 6.1 Hash Aggregation: Tradicional vs Vetorizada

### Abordagem Tradicional (Volcano)
```
Para cada linha:
  1. Calcular hash da chave        ← 1 hash por vez
  2. Lookup na hash table          ← Cache miss provável
  3. Atualizar agregado            ← Acesso aleatório

Problemas: Muitos acessos aleatórios à memória!
```

### Abordagem Vetorizada (DuckDB)
```
Para cada chunk de 2048 linhas:
  1. Calcular hashes de TODAS as chaves (SIMD)
  2. Agrupar por bucket (scatter)
  3. Processar cada bucket em sequência
  4. Agregar resultados (gather)

Vantagens:
  - Hash de múltiplas chaves em paralelo
  - Melhor uso de cache (dados agrupados)
  - Prefetching possível
```

In [None]:
# Implementação tradicional (linha por linha)
def hash_aggregate_traditional(keys: np.ndarray, values: np.ndarray):
    """GROUP BY tradicional - uma linha por vez"""
    aggregates = defaultdict(lambda: {'sum': 0.0, 'count': 0})
    
    for i in range(len(keys)):
        key = keys[i]
        val = values[i]
        aggregates[key]['sum'] += val
        aggregates[key]['count'] += 1
    
    return dict(aggregates)

# Teste
n = 100_000
num_groups = 100
keys = np.random.randint(0, num_groups, n)
values = np.random.randn(n)

start = time.perf_counter()
result = hash_aggregate_traditional(keys, values)
trad_time = time.perf_counter() - start

print(f"Agregação tradicional: {trad_time*1000:.2f} ms")
print(f"Grupos: {len(result)}")
print(f"Exemplo grupo 0: sum={result[0]['sum']:.2f}, count={result[0]['count']}")

In [None]:
# Implementação vetorizada usando NumPy
def hash_aggregate_vectorized(keys: np.ndarray, values: np.ndarray):
    """GROUP BY vetorizado usando operações NumPy"""
    # Encontrar chaves únicas e mapear para índices
    unique_keys, inverse_indices = np.unique(keys, return_inverse=True)
    
    # Agregar usando bincount (altamente otimizado, usa SIMD internamente)
    sums = np.bincount(inverse_indices, weights=values)
    counts = np.bincount(inverse_indices)
    
    return {
        'keys': unique_keys,
        'sums': sums,
        'counts': counts,
        'means': sums / counts
    }

start = time.perf_counter()
result_vec = hash_aggregate_vectorized(keys, values)
vec_time = time.perf_counter() - start

print(f"Agregação vetorizada: {vec_time*1000:.4f} ms")
print(f"Speedup: {trad_time/vec_time:.0f}x")
print(f"\nExemplo grupo 0: sum={result_vec['sums'][0]:.2f}, count={result_vec['counts'][0]}")

## 6.2 Scatter/Gather: Como Funciona

```
SCATTER (Espalhar):
  Dados:     [A, B, C, A, B, A]    (valores)
  Hashes:    [0, 1, 2, 0, 1, 0]    (buckets)
  
  Bucket 0:  [A, A, A]  ← Todos os 'A' juntos
  Bucket 1:  [B, B]
  Bucket 2:  [C]

GATHER (Coletar):
  Agrega cada bucket separadamente
  Bucket 0: SUM = 3*A
  Bucket 1: SUM = 2*B
  Bucket 2: SUM = 1*C

Benefício: Dados no mesmo bucket ficam próximos → Cache hits!
```

In [None]:
# Demonstração visual de Scatter/Gather
def scatter_gather_demo(keys, values):
    """Demonstra o processo de scatter/gather"""
    print("=== SCATTER ===")
    print(f"Chaves:  {keys}")
    print(f"Valores: {values}")
    
    # Scatter: agrupar por chave
    buckets = defaultdict(list)
    for k, v in zip(keys, values):
        buckets[k].append(v)
    
    print("\nBuckets após scatter:")
    for k in sorted(buckets.keys()):
        print(f"  Bucket {k}: {buckets[k]}")
    
    # Gather: agregar cada bucket
    print("\n=== GATHER ===")
    results = {}
    for k, vals in buckets.items():
        results[k] = {'sum': sum(vals), 'count': len(vals)}
        print(f"  Grupo {k}: SUM={sum(vals):.1f}, COUNT={len(vals)}")
    
    return results

# Exemplo pequeno
demo_keys = np.array([0, 1, 2, 0, 1, 0, 2, 1])
demo_values = np.array([10, 20, 30, 15, 25, 12, 35, 22])
_ = scatter_gather_demo(demo_keys, demo_values)

In [None]:
# Implementação de agregação com scatter/gather explícito
def hash_aggregate_scatter_gather(keys: np.ndarray, values: np.ndarray, num_partitions: int = 16):
    """
    Agregação usando scatter/gather com partições.
    Simula como DuckDB particiona para paralelismo.
    """
    n = len(keys)
    
    # Fase 1: Calcular hashes para particionamento
    partition_ids = keys % num_partitions
    
    # Fase 2: Scatter - separar por partição
    partitions = []
    for p in range(num_partitions):
        mask = partition_ids == p
        partitions.append({
            'keys': keys[mask],
            'values': values[mask]
        })
    
    # Fase 3: Agregar cada partição (pode ser paralelo!)
    partial_results = []
    for p in partitions:
        if len(p['keys']) > 0:
            unique, inverse = np.unique(p['keys'], return_inverse=True)
            sums = np.bincount(inverse, weights=p['values'])
            counts = np.bincount(inverse)
            partial_results.append({
                'keys': unique,
                'sums': sums,
                'counts': counts
            })
    
    # Fase 4: Gather - combinar resultados
    final_sums = defaultdict(float)
    final_counts = defaultdict(int)
    
    for pr in partial_results:
        for i, key in enumerate(pr['keys']):
            final_sums[key] += pr['sums'][i]
            final_counts[key] += pr['counts'][i]
    
    return dict(final_sums), dict(final_counts)

# Benchmark
start = time.perf_counter()
sums, counts = hash_aggregate_scatter_gather(keys, values)
sg_time = time.perf_counter() - start

print(f"Scatter/Gather: {sg_time*1000:.2f} ms")
print(f"Grupos: {len(sums)}")

## 6.3 Impacto da Cardinalidade

A **cardinalidade** (número de grupos distintos) afeta drasticamente a performance:

```
Baixa cardinalidade (poucos grupos):
  - Hash table pequena → cabe no cache L1
  - Muitas colisões → mais processamento por bucket
  - Muito rápido!

Alta cardinalidade (muitos grupos):
  - Hash table grande → cache misses
  - Poucas colisões → menos trabalho por bucket
  - Mais lento devido a memória
```

In [None]:
# Benchmark: Impacto da cardinalidade
n = 1_000_000
cardinalities = [10, 50, 100, 500, 1000, 5000, 10000, 50000, 100000]

results_card = []
for card in cardinalities:
    keys = np.random.randint(0, card, n)
    values = np.random.randn(n)
    
    # Vetorizado
    times = []
    for _ in range(5):
        start = time.perf_counter()
        _ = hash_aggregate_vectorized(keys, values)
        times.append(time.perf_counter() - start)
    
    avg_time = np.mean(times) * 1000
    rows_per_ms = n / avg_time / 1000
    
    results_card.append({
        'cardinality': card,
        'time_ms': avg_time,
        'rows_per_ms': rows_per_ms
    })
    print(f"Cardinalidade {card:>6}: {avg_time:6.2f} ms ({rows_per_ms:.0f}K rows/ms)")

In [None]:
# Visualização
df_card = pd.DataFrame(results_card)

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

# Tempo vs Cardinalidade
ax1.plot(df_card['cardinality'], df_card['time_ms'], marker='o', linewidth=2)
ax1.set_xlabel('Número de Grupos (Cardinalidade)')
ax1.set_ylabel('Tempo (ms)')
ax1.set_title('Tempo de Agregação vs Cardinalidade')
ax1.set_xscale('log')
ax1.grid(True, alpha=0.3)

# Throughput vs Cardinalidade
ax2.bar(range(len(cardinalities)), df_card['rows_per_ms'], color='steelblue')
ax2.set_xticks(range(len(cardinalities)))
ax2.set_xticklabels([str(c) for c in cardinalities], rotation=45)
ax2.set_xlabel('Cardinalidade')
ax2.set_ylabel('Throughput (K rows/ms)')
ax2.set_title('Throughput vs Cardinalidade')
ax2.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6.4 Agregação por Chunks

Para datasets grandes, processar em chunks mantém dados no cache.

In [None]:
def hash_aggregate_chunked(keys: np.ndarray, values: np.ndarray, chunk_size: int = 2048):
    """
    Agregação processando em chunks de tamanho fixo.
    Simula como DuckDB processa DataChunks.
    """
    n = len(keys)
    partial_sums = defaultdict(float)
    partial_counts = defaultdict(int)
    
    for start in range(0, n, chunk_size):
        end = min(start + chunk_size, n)
        chunk_keys = keys[start:end]
        chunk_values = values[start:end]
        
        # Agregar chunk (cabe no cache L1!)
        unique, inverse = np.unique(chunk_keys, return_inverse=True)
        sums = np.bincount(inverse, weights=chunk_values)
        counts = np.bincount(inverse)
        
        # Merge com resultados parciais
        for i, key in enumerate(unique):
            partial_sums[key] += sums[i]
            partial_counts[key] += counts[i]
    
    return dict(partial_sums), dict(partial_counts)

# Benchmark com diferentes chunk sizes
n = 5_000_000
keys = np.random.randint(0, 1000, n)
values = np.random.randn(n)

chunk_sizes = [512, 1024, 2048, 4096, 8192, 16384]
chunk_results = []

for cs in chunk_sizes:
    times = []
    for _ in range(3):
        start = time.perf_counter()
        _ = hash_aggregate_chunked(keys, values, cs)
        times.append(time.perf_counter() - start)
    
    avg_time = np.mean(times) * 1000
    chunk_results.append({'chunk_size': cs, 'time_ms': avg_time})
    print(f"Chunk size {cs:>6}: {avg_time:6.2f} ms")

## 6.5 DuckDB: Agregações em Ação

In [None]:
con = duckdb.connect(':memory:')

# Criar tabela de vendas simulada
con.execute("""
    CREATE TABLE vendas AS
    SELECT 
        (random() * 1000)::INTEGER AS produto_id,
        (random() * 100)::INTEGER AS loja_id,
        (random() * 10)::INTEGER AS regiao_id,
        random() * 1000 AS valor,
        (random() * 10)::INTEGER AS quantidade,
        DATE '2024-01-01' + (random() * 365)::INTEGER AS data_venda
    FROM generate_series(1, 10000000)
""")

print("Tabela criada com 10M linhas")
con.execute("SELECT * FROM vendas LIMIT 3").df()

In [None]:
# Benchmark de diferentes tipos de agregação
queries = {
    'COUNT simples': 'SELECT COUNT(*) FROM vendas',
    'SUM simples': 'SELECT SUM(valor) FROM vendas',
    'Múltiplas agregações': 'SELECT SUM(valor), AVG(valor), MIN(valor), MAX(valor), COUNT(*) FROM vendas',
    'GROUP BY baixa card (10)': 'SELECT regiao_id, SUM(valor) FROM vendas GROUP BY regiao_id',
    'GROUP BY média card (100)': 'SELECT loja_id, SUM(valor) FROM vendas GROUP BY loja_id',
    'GROUP BY alta card (1000)': 'SELECT produto_id, SUM(valor) FROM vendas GROUP BY produto_id',
    'GROUP BY composto': 'SELECT produto_id, loja_id, SUM(valor) FROM vendas GROUP BY produto_id, loja_id',
    'GROUP BY com filtro': 'SELECT loja_id, SUM(valor) FROM vendas WHERE valor > 500 GROUP BY loja_id',
    'GROUP BY com HAVING': 'SELECT loja_id, SUM(valor) as total FROM vendas GROUP BY loja_id HAVING SUM(valor) > 50000000',
}

print("Performance de Agregações DuckDB:\n")
agg_results = []
for name, query in queries.items():
    times = []
    for _ in range(5):
        start = time.perf_counter()
        result = con.execute(query).fetchall()
        times.append(time.perf_counter() - start)
    
    avg_time = np.mean(times) * 1000
    agg_results.append({'query': name, 'time_ms': avg_time})
    print(f"{name:35} {avg_time:8.2f} ms")

In [None]:
# Visualização
df_agg = pd.DataFrame(agg_results)

plt.figure(figsize=(12, 6))
bars = plt.barh(df_agg['query'], df_agg['time_ms'], color='steelblue')
plt.xlabel('Tempo (ms)')
plt.title('Performance de Diferentes Agregações no DuckDB (10M linhas)')
plt.grid(True, alpha=0.3, axis='x')

for bar, val in zip(bars, df_agg['time_ms']):
    plt.text(val + 1, bar.get_y() + bar.get_height()/2, f'{val:.1f}ms', va='center')

plt.tight_layout()
plt.show()

In [None]:
# Ver plano de execução
print("=== EXPLAIN ANALYZE ===")
plan = con.execute("""
    EXPLAIN ANALYZE
    SELECT produto_id, loja_id, SUM(valor), COUNT(*)
    FROM vendas
    GROUP BY produto_id, loja_id
""").df()
print(plan.to_string())

## 6.6 Comparação: DuckDB vs Pandas

In [None]:
# Criar DataFrame Pandas equivalente
df_pandas = con.execute("SELECT produto_id, loja_id, valor FROM vendas").df()
print(f"DataFrame: {len(df_pandas):,} linhas")

# Benchmark: GROUP BY com múltiplas agregações
# DuckDB
times_duck = []
for _ in range(5):
    start = time.perf_counter()
    con.execute("""
        SELECT loja_id, SUM(valor), AVG(valor), COUNT(*)
        FROM vendas GROUP BY loja_id
    """).fetchall()
    times_duck.append(time.perf_counter() - start)

# Pandas
times_pandas = []
for _ in range(5):
    start = time.perf_counter()
    df_pandas.groupby('loja_id')['valor'].agg(['sum', 'mean', 'count'])
    times_pandas.append(time.perf_counter() - start)

print(f"\nGROUP BY loja_id com SUM, AVG, COUNT:")
print(f"DuckDB: {np.mean(times_duck)*1000:.2f} ms")
print(f"Pandas: {np.mean(times_pandas)*1000:.2f} ms")
print(f"Speedup: {np.mean(times_pandas)/np.mean(times_duck):.1f}x")

In [None]:
# Benchmark: GROUP BY composto
# DuckDB
times_duck2 = []
for _ in range(5):
    start = time.perf_counter()
    con.execute("""
        SELECT produto_id, loja_id, SUM(valor)
        FROM vendas GROUP BY produto_id, loja_id
    """).fetchall()
    times_duck2.append(time.perf_counter() - start)

# Pandas
times_pandas2 = []
for _ in range(5):
    start = time.perf_counter()
    df_pandas.groupby(['produto_id', 'loja_id'])['valor'].sum()
    times_pandas2.append(time.perf_counter() - start)

print(f"\nGROUP BY produto_id, loja_id com SUM:")
print(f"DuckDB: {np.mean(times_duck2)*1000:.2f} ms")
print(f"Pandas: {np.mean(times_pandas2)*1000:.2f} ms")
print(f"Speedup: {np.mean(times_pandas2)/np.mean(times_duck2):.1f}x")

## 6.7 Técnicas Avançadas

### Pre-Aggregation
```
Antes de fazer hash join/merge, agregar localmente:

Thread 1: chunk1 → partial_agg1 (100K → 1K grupos)
Thread 2: chunk2 → partial_agg2 (100K → 1K grupos)
Thread 3: chunk3 → partial_agg3 (100K → 1K grupos)

Merge final: 3K entradas em vez de 300K!
```

### Partitioned Aggregation
```
Particionar por hash da chave:

Partition 0: keys onde hash % 4 == 0
Partition 1: keys onde hash % 4 == 1
...

Cada partição pode ser processada em paralelo sem locks!
```

In [None]:
# Demonstração de pre-aggregation
def pre_aggregate_chunks(keys, values, chunk_size=100000):
    """Simula pre-aggregation antes do merge final"""
    n = len(keys)
    partial_results = []
    
    # Fase 1: Agregar cada chunk localmente
    for start in range(0, n, chunk_size):
        end = min(start + chunk_size, n)
        chunk_keys = keys[start:end]
        chunk_values = values[start:end]
        
        unique, inverse = np.unique(chunk_keys, return_inverse=True)
        sums = np.bincount(inverse, weights=chunk_values)
        counts = np.bincount(inverse)
        
        partial_results.append({
            'keys': unique,
            'sums': sums,
            'counts': counts
        })
    
    # Estatísticas de redução
    total_partial_rows = sum(len(pr['keys']) for pr in partial_results)
    print(f"Linhas originais: {n:,}")
    print(f"Linhas após pre-aggregation: {total_partial_rows:,}")
    print(f"Redução: {n/total_partial_rows:.1f}x")
    
    return partial_results

# Teste
n = 1_000_000
keys = np.random.randint(0, 1000, n)
values = np.random.randn(n)

_ = pre_aggregate_chunks(keys, values)

## 6.8 Resumo

| Técnica | Descrição | Benefício |
|---------|-----------|----------|
| **Hash vetorizado** | Calcula hashes de múltiplas chaves via SIMD | CPU eficiente |
| **Scatter/Gather** | Agrupa dados por bucket antes de agregar | Cache friendly |
| **Pre-aggregation** | Agrega localmente antes do merge | Reduz dados |
| **Particionamento** | Divide por hash para paralelismo sem locks | Escalabilidade |
| **Chunked processing** | Processa 2048 linhas por vez | Cabe no L1 |

### Pontos Chave:

1. **Cardinalidade importa**: Poucos grupos = mais rápido
2. **Chunks de 2048**: Hash table parcial cabe no cache
3. **Particionamento**: Permite paralelismo sem contenção
4. **Pre-aggregation**: Reduz volume antes do merge
5. **SIMD para hash**: Múltiplas chaves em paralelo

In [None]:
con.close()