# Fundamentos do Apache Spark

Este notebook introduz os conceitos fundamentais do Apache Spark, demonstrando como criar e operar em RDDs (Resilient Distributed Datasets) e DataFrames.

## Tópicos Abordados

1. Configuração do ambiente Spark
2. Manipulação de RDDs
3. Introdução a SparkSQL e DataFrames
4. Operações de transformação e ação
5. Conceitos de particionamento e persistência
6. Visualização do plano de execução

## 1. Configuração e Inicialização da Sessão Spark

O SparkSession é o ponto de entrada para a programação Spark com as APIs Dataset e DataFrame. Em um ambiente PySpark, a sessão geralmente já está disponível como variável `spark`.

In [None]:
# Verificamos se a sessão spark está disponível
try:
    spark
except NameError:
    # Se não estiver, criamos uma nova
    from pyspark.sql import SparkSession
    
    spark = SparkSession.builder \
        .appName("SparkFundamentals") \
        .master("local[*]") \
        .config("spark.sql.shuffle.partitions", "4") \
        .config("spark.executor.memory", "1g") \
        .getOrCreate()

# Obtemos o SparkContext a partir da sessão
sc = spark.sparkContext

# Exibimos informações da aplicação
print(f"Versão do Spark: {spark.version}")
print(f"Aplicação: {sc.appName}")
print(f"Master: {sc.master}")

## 2. RDDs (Resilient Distributed Datasets)

RDDs são a estrutura de dados fundamental do Spark. Eles representam uma coleção distribuída imutável de objetos que podem ser processados em paralelo.

In [None]:
# Criando um RDD a partir de uma coleção
numbers = range(1, 1001)
numbers_rdd = sc.parallelize(numbers, numSlices=4)  # 4 partições

# Verificando o número de partições
print(f"Número de partições: {numbers_rdd.getNumPartitions()}")

# Exibindo os primeiros 5 elementos
print(f"Primeiros 5 elementos: {numbers_rdd.take(5)}")

### 2.1 Operações de Transformação em RDDs

Transformações criam um novo RDD a partir de um existente. Elas são lazy (preguiçosas), ou seja, só são executadas quando uma ação é solicitada.

In [None]:
# Transformação: map
squared_rdd = numbers_rdd.map(lambda x: x * x)

# Transformação: filter
even_squares_rdd = squared_rdd.filter(lambda x: x % 2 == 0)

# Neste ponto, nenhuma computação foi realizada ainda devido à avaliação lazy

### 2.2 Operações de Ação em RDDs

Ações retornam valores para o driver ou escrevem dados em um sistema de armazenamento. Elas acionam a execução das transformações.

In [None]:
# Ação: count
count = even_squares_rdd.count()
print(f"Número de quadrados pares: {count}")

# Ação: take (retorna os primeiros n elementos)
first_10 = even_squares_rdd.take(10)
print(f"Primeiros 10 quadrados pares: {first_10}")

# Ação: collect (traz todos os elementos para o driver - cuidado com grandes conjuntos de dados!)
# Usar apenas para conjuntos pequenos ou amostragem
sample = even_squares_rdd.sample(False, 0.01).collect()
print(f"Amostra de {len(sample)} elementos: {sample[:5]}...")

### 2.3 Operações mais complexas

In [None]:
# Criando um RDD com pares (chave, valor)
pairs_rdd = numbers_rdd.map(lambda x: (x % 10, x))  # chave é o resto da divisão por 10

# GroupByKey - agrupa valores pela chave
grouped = pairs_rdd.groupByKey()
result = grouped.map(lambda x: (x[0], list(x[1]))).collect()
print("Agrupamento por chave (primeiros 3):")
for i in range(min(3, len(result))):
    key, values = result[i]
    print(f"  Chave {key}: {values[:5]}...")

# ReduceByKey - mais eficiente que groupByKey para agregações
sums = pairs_rdd.reduceByKey(lambda a, b: a + b)
print("\nSoma dos valores por chave:")
for key, value in sorted(sums.collect()):
    print(f"  Chave {key}: {value}")

### 2.4 Persistência (caching)

A persistência permite armazenar RDDs na memória ou no disco para reutilização em várias ações.

In [None]:
from pyspark import StorageLevel

# Criamos um RDD que será usado várias vezes
expensive_rdd = numbers_rdd.map(lambda x: (x, x**3))

# Persistimos o RDD na memória
expensive_rdd.persist(StorageLevel.MEMORY_AND_DISK)

# Primeira ação (executará a transformação)
import time
start = time.time()
count1 = expensive_rdd.count()
end = time.time()
print(f"Primeira contagem: {count1} (tempo: {end-start:.4f}s)")

# Segunda ação (usará a versão em cache)
start = time.time()
count2 = expensive_rdd.count()
end = time.time()
print(f"Segunda contagem: {count2} (tempo: {end-start:.4f}s)")

# Não esqueça de liberar a memória quando não precisar mais do RDD em cache
expensive_rdd.unpersist()

## 3. DataFrames e SparkSQL

DataFrames são conjuntos de dados distribuídos organizados em colunas nomeadas, similares a tabelas em bancos de dados relacionais.

In [None]:
# Criando um DataFrame a partir de uma lista de dados
from pyspark.sql import Row

# Dados de exemplo: vendas de produtos
sales_data = [
    Row(date="2025-01-15", product="Laptop", category="Electronics", price=1200, quantity=5),
    Row(date="2025-01-15", product="Mouse", category="Electronics", price=25, quantity=30),
    Row(date="2025-01-16", product="Monitor", category="Electronics", price=350, quantity=10),
    Row(date="2025-01-16", product="Desk Chair", category="Furniture", price=175, quantity=8),
    Row(date="2025-01-17", product="Coffee Maker", category="Appliances", price=80, quantity=12),
    Row(date="2025-01-17", product="Sofa", category="Furniture", price=950, quantity=2),
    Row(date="2025-01-18", product="Blender", category="Appliances", price=70, quantity=15),
    Row(date="2025-01-18", product="Headphones", category="Electronics", price=120, quantity=20),
    Row(date="2025-01-19", product="Dining Table", category="Furniture", price=600, quantity=3),
    Row(date="2025-01-19", product="Smartphone", category="Electronics", price=800, quantity=10)
]

# Criar DataFrame
sales_df = spark.createDataFrame(sales_data)

# Exibir o schema (estrutura) do DataFrame
sales_df.printSchema()

# Mostrar os primeiros registros
sales_df.show(5)

### 3.1 Operações básicas com DataFrames

In [None]:
# Seleção de colunas
sales_df.select("date", "product", "price").show(5)

# Filtragem
sales_df.filter(sales_df.category == "Electronics").show()

# Filtragem com expressão SQL
sales_df.filter("price > 500").show()

# Ordenação
sales_df.orderBy(sales_df.price.desc()).show(5)

# Adicionando colunas calculadas
from pyspark.sql.functions import col, lit

sales_df.withColumn("total_value", col("price") * col("quantity")) \
        .withColumn("tax", col("price") * col("quantity") * lit(0.1)) \
        .select("product", "quantity", "price", "total_value", "tax") \
        .show(5)

# Funções de agregação
from pyspark.sql.functions import sum, avg, count, max, min

sales_df.groupBy("category") \
        .agg(count("product").alias("product_count"), 
             sum("price").alias("total_price"), 
             avg("price").alias("avg_price")) \
        .show()

### 3.2 Usando SQL com DataFrames

O Spark permite executar consultas SQL em DataFrames registrados como tabelas temporárias.

In [None]:
# Registrar DataFrame como uma tabela temporária
sales_df.createOrReplaceTempView("sales")

# Executar uma consulta SQL
result = spark.sql("""
    SELECT 
        category, 
        COUNT(*) as product_count,
        SUM(price * quantity) as total_revenue,
        AVG(price) as avg_price
    FROM sales
    GROUP BY category
    ORDER BY total_revenue DESC
""")

result.show()

# Consulta mais complexa
daily_sales = spark.sql("""
    SELECT 
        date,
        SUM(price * quantity) as daily_revenue,
        COUNT(DISTINCT category) as categories_sold,
        MAX(price) as most_expensive_item
    FROM sales
    GROUP BY date
    ORDER BY date
""")

daily_sales.show()

### 3.3 Leitura e Escrita de Dados

O Spark suporta vários formatos de dados e fontes.

In [None]:
# Escrever o DataFrame em diferentes formatos

# CSV
sales_df.write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("/home/jovyan/data/sales_csv")

# Parquet (formato colunar otimizado)
sales_df.write \
    .mode("overwrite") \
    .parquet("/home/jovyan/data/sales_parquet")

# JSON
sales_df.write \
    .mode("overwrite") \
    .json("/home/jovyan/data/sales_json")

# Ler dados de volta
parquet_df = spark.read.parquet("/home/jovyan/data/sales_parquet")
parquet_df.show(5)

# Ler CSV com opções
csv_df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv("/home/jovyan/data/sales_csv")
    
csv_df.printSchema()

## 4. Otimização e Plano de Execução

O otimizador do Spark (Catalyst) cria um plano de execução otimizado para suas operações.

In [None]:
# Criar uma consulta complexa
complex_query = sales_df \
    .filter(col("price") > 100) \
    .join(
        sales_df.groupBy("category").agg(avg("price").alias("avg_category_price")),
        on="category"
    ) \
    .filter(col("price") > col("avg_category_price")) \
    .select("date", "product", "category", "price", "avg_category_price")

# Exibir o plano de execução lógico
print("\nPlano Lógico:")
complex_query.explain()

# Exibir o plano físico
print("\nPlano Físico Otimizado:")
complex_query.explain("extended")

# Executar a consulta
complex_query.show()

## 5. Particionamento e Performance

O particionamento adequado de dados é crucial para a performance em aplicações Spark.

In [None]:
# Criar um DataFrame maior para demonstrar particionamento
from pyspark.sql.functions import explode, sequence, to_date

# Gerar datas para os últimos 30 dias
date_df = spark.sql("""
    SELECT explode(sequence(to_date('2025-01-01'), to_date('2025-01-30'), interval 1 day)) as date
""")

# Verificar o número padrão de partições
print(f"Número de partições do SQL Shuffle: {spark.conf.get('spark.sql.shuffle.partitions')}")

# Alterar o número de partições para um valor menor para este exemplo
spark.conf.set("spark.sql.shuffle.partitions", "4")

# Exemplo de operação com shuffling
large_agg = sales_df \
    .crossJoin(date_df) \
    .groupBy("date", "category") \
    .count() \
    .orderBy("date", "category")

# Verificar o plano de execução
large_agg.explain()

# Executar com o número especificado de partições
large_agg.show(10)

### 5.1 Broadcast Join

Os broadcast joins podem melhorar significativamente a performance quando uma das tabelas é pequena o suficiente para caber em memória.

In [None]:
# Criar uma tabela de dimensão pequena para categorias
category_data = [
    Row(category="Electronics", tax_rate=0.08, department="Tech"),
    Row(category="Furniture", tax_rate=0.09, department="Home"),
    Row(category="Appliances", tax_rate=0.07, department="Home")
]
category_df = spark.createDataFrame(category_data)

# Join normal
print("\nJoin normal:")
start = time.time()
normal_join = sales_df.join(category_df, on="category")
normal_join.explain()
normal_join_count = normal_join.count()
end = time.time()
print(f"Normal join tempo: {end-start:.4f}s, resultado: {normal_join_count} linhas")

# Broadcast join (explícito)
from pyspark.sql.functions import broadcast

print("\nBroadcast join explícito:")
start = time.time()
broadcast_join = sales_df.join(broadcast(category_df), on="category")
broadcast_join.explain()
broadcast_join_count = broadcast_join.count()
end = time.time()
print(f"Broadcast join tempo: {end-start:.4f}s, resultado: {broadcast_join_count} linhas")

# Mostrar alguns resultados do join
broadcast_join.select("product", "category", "price", "department", "tax_rate").show(5)

## 6. Conclusão

Neste notebook, exploramos os conceitos fundamentais do Apache Spark:

1. Criação e manipulação de RDDs
2. Trabalho com DataFrames e SparkSQL
3. Leitura e escrita de dados em diferentes formatos
4. Otimização de consultas e planos de execução
5. Técnicas de particionamento e performance

Nos próximos notebooks, exploraremos tópicos mais avançados como:
- Streaming estruturado
- Machine Learning com Spark MLlib
- Processamento de dados em grafos com GraphX
- Integração com fontes de dados externas

Para mais informações, consulte a [documentação oficial do Apache Spark](https://spark.apache.org/docs/latest/).