# BootCamp Spark - Mini DataLake

In [None]:
%%bash
# Instalar Java
apt-get update --fix-missing
apt-get install openjdk-8-jdk-headless -qq > /dev/null

# Instalar dependências Python
pip install -q pyspark==3.4.1 delta-spark==2.4.0 findspark pyarrow fastparquet

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
Hit:2 https://cli.github.com/packages stable InRelease
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [2,008 kB]
Get:9 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
Get:10 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease [18.1 kB]
Hit:11 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:12 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:13 https://r2u.stat.illinois.edu/ubuntu jammy/main amd64 Pa

W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.8.3 requires pyspark[connect]~=3.5.1, but you have pyspark 3.4.1 which is incompatible.


In [None]:
import os
import sys
import findspark
from datetime import datetime, date
from decimal import Decimal

# Configurar variáveis de ambiente
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
# Inicializar findspark
findspark.init()

# Importações PySpark e Delta
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col, sum, max, count, current_timestamp, lit, when
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType, TimestampType
from delta import DeltaTable, configure_spark_with_delta_pip

In [None]:
# Configurar Spark com Delta Lake
builder = SparkSession.builder \
    .appName("DataLake_Delta_Parquet") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
    .config("spark.databricks.delta.vacuum.parallelDelete.enabled", "true")

# Configurar Delta
spark = configure_spark_with_delta_pip(builder).getOrCreate()


In [None]:
# Criar estrutura de diretórios do datalake
base_path = "datalake"
dirs = ["bronze", "silver", "gold"]
for d in dirs:
    os.makedirs(os.path.join(base_path, d), exist_ok=True)

In [None]:
# Carregar os dados do Parquet
parquet_url = "https://github.com/wuldson-franco/breweries_case/raw/main/analises/bs_bronze.parquet"
parquet_path = os.path.join(base_path, "bronze", "bs_bronze.parquet")

In [None]:
# Baixar o arquivo Parquet
if not os.path.exists(parquet_path):
    import requests
    response = requests.get(parquet_url)
    with open(parquet_path, "wb") as f:
        f.write(response.content)

In [None]:
df = spark.read.parquet("datalake/bronze/bs_bronze.parquet")

In [None]:
df.show()

+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+------------------+-----------------+------------+--------------------+-------------+--------------------+
|                  id|                name|brewery_type|           address_1|address_2|address_3|          city|state_province|postal_code|      country|         longitude|         latitude|       phone|         website_url|        state|              street|
+--------------------+--------------------+------------+--------------------+---------+---------+--------------+--------------+-----------+-------------+------------------+-----------------+------------+--------------------+-------------+--------------------+
|5128df48-79fc-4f0...|    (405) Brewing Co|       micro|      1716 Topeka St|     null|     null|        Norman|      Oklahoma| 73069-8224|United States|      -97.46818222|      35.25738891|  4058160490|http://www.405bre

In [None]:
# Exiba os Schemas das colunas. "printSchema"

In [None]:
# Faça um Select do pais, para verificarmos quais paises constam na nossa base.

In [None]:
# Descubra quais cervejarias estão com os endereços faltantes

In [None]:
# Busque na internet quais são esses endereços e inclua-os nos campos faltantes.
# Você pode utilizar a função .otherwise para incluir esses valores.

In [None]:
# Salve as transformações na camada Silver.
bronze_df.write.format("parquet ou delta").mode("overwrite").save(os.path.join(base_path, "silver", "nome do arquivo"))

In [None]:
# Respondendo a pergunta de negocio: "Quais tipos de cervejaria por cidade, provincia e pais?"
# Sim ela está em python, converta para spark!

def data_aggregation(df):
        try:
            # Agrupando por tipo de cervejaria e localização
            aggregated_df = df.groupby(['brewery_type', 'city', 'state_province', 'country']).size().reset_index(name='brewery_count')
            return aggregated_df
        except Exception as e:
            logging.error(f"Erro ao realizar a agregação de dados: {e}")
            raise

In [None]:
# Salve a agregação feita anteiormente na camada gold.

In [None]:
# OPCIONAL
# Crie um gráfico em spark ou python com os dados do arquivo .parquet ou delta.