**Objetivo**:

  Dominar a leitura e escrita de dados em diferentes formatos (CSV, JSON, Parquet, Delta Lake) usando Spark SQL e PySpark.
  Realizar transformações básicas de dados (seleção, filtragem, agregação) com ambas as linguagens.
  Compreender o conceito de Autoloader para ingestão incremental.

Conceitos Chave:
* `DataFrame` : A estrutura de dados fundamental no Spark para trabalhar com dados estruturados.

* `Spark SQL`: A API do Spark para interagir com dados usando SQL.

* `PySpark`: A API do Spark para interagir com dados usando Python.

* `Formatos de Dados`:
    
    CSV (Comma Separated Values): Formato de texto simples, comum para exportação/importação.
    
    JSON (JavaScript Object Notation): Formato de texto para dados semi-estruturados.
    
    Parquet: Formato de armazenamento colunar otimizado para performance e compressão em sistemas distribuídos.

* `Delta Lake`: Camada de armazenamento que traz confiabilidade (ACID), performance e governança para Data Lakes (construído sobre Parquet).

* `Modos de Escrita`: overwrite, append, ignore, errorIfExists (ou errorifexists).

* `Autoloader`: Uma funcionalidade do Databricks para ingestão incremental e eficiente de dados de arquivos que chegam em armazenamento de objetos.

## # Caminhos para os datasets públicos

In [0]:
%fs ls /databricks-datasets/

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from pyspark.sql.functions import col

## # Definindo o schema manualmente para o dataset de carros

In [0]:
schema_cars = StructType([
    StructField("mpg", DoubleType(), True),
    StructField("cylinders", IntegerType(), True),
    StructField("displacement", DoubleType(), True),
    StructField("horsepower", IntegerType(), True),
    StructField("weight", IntegerType(), True),
    StructField("acceleration", DoubleType(), True),
    StructField("model year", IntegerType(), True),
    StructField("origin", IntegerType(), True),
    StructField("car name", StringType(), True)
])

## # Dados de exemplo (apenas algumas linhas para simular)

In [0]:
data_cars = [
    (18.0, 8, 307.0, 130, 3504, 12.0, 70, 1, "chevrolet chevelle malibu"),
    (15.0, 8, 350.0, 165, 3693, 11.5, 70, 1, "buick skylark 320"),
    (18.0, 8, 318.0, 150, 3433, 11.0, 70, 1, "plymouth satellite"),
    (16.0, 8, 304.0, 150, 3433, 12.0, 70, 1, "amc rebel sst"),
    (17.0, 8, 302.0, 140, 3449, 10.5, 70, 1, "ford torino"),
    (20.0, 4, 97.0, 88, 2130, 14.5, 70, 2, "volkswagen 1131 deluxe sedan"),
    (21.0, 6, 199.0, 90, 2648, 15.0, 70, 1, "amc gremlin"),
    (26.0, 4, 97.0, 46, 1835, 20.5, 70, 2, "volkswagen model 111"),
    (22.0, 4, 140.0, 96, 2408, 19.5, 70, 1, "plymouth duster"),
    (19.0, 6, 232.0, 100, 2634, 13.0, 70, 1, "amc hornet")
]

df_cars = spark.createDataFrame(data_cars, schema=schema_cars)

print("Schema do DataFrame de Carros (em memória):")
df_cars.printSchema()
print("\nPrimeiras 10 linhas do DataFrame de Carros (em memória):")
df_cars.display()

In [0]:
df_cars = spark.createDataFrame(data_cars, schema=schema_cars)

In [0]:
df_cars = df_cars.withColumnRenamed("model year", "model_year") \
             .withColumnRenamed("car name", "car_name")

In [0]:
print("Schema do DataFrame de Carros (em memória, colunas renomeadas):")
df_cars.printSchema()
print("\nPrimeiras 10 linhas do DataFrame de Carros (em memória):")
df_cars.display()

## DataFrame de exemplo para simular o people.json

In [0]:
schema_people = StructType([
    StructField("age", IntegerType(), True),
    StructField("name", StringType(), True)
])

In [0]:
data_people = [
    (30, "Andy"),
    (35, "Berta"),
    (40, "Casey")
]

df_people = spark.createDataFrame(data_people, schema=schema_people)

print("Schema do DataFrame de Pessoas (em memória):")
df_people.printSchema()
print("\nPrimeiras 3 linhas do DataFrame de Pessoas (em memória):")
df_people.display()

## Transformações Básicas com PySpark:

### Use o df_cars para estas transformações.

* `Selecionar colunas específicas`

In [0]:
df_selected = df_cars.select(col("model year"), col("horsepower"), col("mpg"))
print("DataFrame com colunas selecionadas:")
df_selected.display()

* `Filtrar dados (ex: carros com mais de 150 cavalos)`

In [0]:
df_filtered = df_cars.filter(col("horsepower") > 150)
print("DataFrame filtrado (horsepower > 150):")
df_filtered.display()

## Agrupar e agregar (ex: média de MPG por ano do modelo)

In [0]:
from pyspark.sql.functions import avg

In [0]:
df_grouped = df_cars.groupBy(col("model year")).agg(avg(col("mpg")).alias("avg_mpg"))
print("DataFrame agrupado (média de MPG por ano do modelo):")
df_grouped.display()

## Escrita de Dados (Para Views Temporárias/Tabelas Temporárias Delta):

### Criar uma View Temporária a partir de um DataFrame

In [0]:
df_grouped.createOrReplaceTempView("avg_mpg_by_year_temp")
print("View temporária 'avg_mpg_by_year_temp' criada.")

In [0]:
# Consultar a view temporária usando Spark SQL (na mesma célula ou em uma nova com %sql)
spark.sql("SELECT * FROM avg_mpg_by_year_temp ORDER BY `model year` DESC").display()

## Criar uma Tabela Delta Temporária (simulando escrita Delta)

In [0]:
# Isso permite praticar a sintaxe Delta, mas os dados não persistem após o cluster terminar.
df_filtered.write.format("delta").mode("overwrite").saveAsTable("high_horsepower_cars_delta_temp")
print("Tabela Delta temporária 'high_horsepower_cars_delta_temp' criada.")

In [0]:
# Consultar a tabela Delta temporária
spark.sql("SELECT * FROM high_horsepower_cars_delta_temp").display()

In [0]:
%sql
DESCRIBE  HISTORY high_horsepower_cars_delta_temp

### 