In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
import requests

# Inicialização do Spark
spark = SparkSession.builder.appName("BreweryETL").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark.conf.set("spark.sql.parquet.int96RebaseModeInRead", "LEGACY")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/11 11:27:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# Configuração da URL da API
API_URL = "https://api.openbrewerydb.org/breweries"

def fetch_data():
    try:
        response = requests.get(API_URL)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Erro ao acessar a API: {e}")
        return []

In [3]:
def round_columns(df, columns, precision):
    """
    Função para arredondar colunas de um DataFrame
    Parâmetros:
    df: DataFrame a ser arredondado
    columns: Lista de colunas a serem arredondadas
    precision: int - Número de casas decimais para arredondamento
    """
    for column in columns:
        df = df.withColumn(column, F.regexp_replace(F.col(column), ',', '.'))
        df = df.withColumn(column, F.regexp_replace(F.col(column), ' ', ''))
        
        df = df.withColumn(column, F.col(column).cast("double"))
        
        df = df.withColumn(column, F.round(F.col(column), precision))
    return df

In [4]:
def fill_blank_columns(df, columns):
    for column in columns:
        df = df.withColumn(column, F.when(F.col(column).isNull(), F.lit("Unknown")).otherwise(F.col(column)))
    return df

In [5]:
def save_bronze(data):
    pd.DataFrame(data).to_json("/home/coder/notebooks/case/data/bronze/breweries_raw.json", orient="records")

In [6]:
# Definindo o esquema explicitamente para garantir a consistência
schema = T.StructType([
    T.StructField("id", T.StringType(), True),
    T.StructField("name", T.StringType(), True),
    T.StructField("brewery_type", T.StringType(), True),
    T.StructField("street", T.StringType(), True),
    T.StructField("city", T.StringType(), True),
    T.StructField("state", T.StringType(), True),
    T.StructField("country", T.StringType(), True),
    T.StructField("longitude", T.StringType(), True),  # Será convertido para DoubleType mais tarde
    T.StructField("latitude", T.StringType(), True)     # Será convertido para DoubleType mais tarde
])

In [7]:
def transform_data(data):
    df = spark.createDataFrame(data, schema=schema)
    df = round_columns(df=df, columns=["longitude", "latitude"], precision=2)
    df = fill_blank_columns(df=df, columns=["name",
                                            "brewery_type",
                                            "street",
                                            "city",
                                            "state",
                                            "country"
                                           ])
    return df

In [10]:
def save_silver(df):
    df.write.mode("overwrite").partitionBy("state").parquet("/home/coder/notebooks/case/data/silver/breweries_partitioned")

In [11]:
def main():
    data = fetch_data()
    save_bronze(data)
    df_silver = transform_data(data)
    save_silver(df_silver)

if __name__ == "__main__":
    main()

                                                                                

In [12]:
silver_df = spark.read.format("parquet").load(
    "/home/coder/notebooks/case/data/silver/breweries_partitioned/")

In [13]:
silver_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- brewery_type: string (nullable = true)
 |-- street: string (nullable = true)
 |-- city: string (nullable = true)
 |-- country: string (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- state: string (nullable = true)



In [20]:
silver_df

id,name,brewery_type,street,city,country,longitude,latitude,state
6d14b220-8926-452...,10 Barrel Brewing Co,large,62970 18th St,Bend,United States,-121.28,44.09,Oregon
e2e78bd8-80ff-4a6...,10 Barrel Brewing Co,large,1135 NW Galveston...,Bend,United States,-121.33,44.06,Oregon
e432899b-7f58-455...,10 Barrel Brewing Co,large,1411 NW Flanders St,Portland,United States,-122.69,45.53,Oregon
9f1852da-c312-42d...,10 Barrel Brewing...,large,62950 NE 18th St,Bend,United States,-121.28,44.09,Oregon
58293321-14ae-49d...,1188 Brewing Co,brewpub,141 E Main St,John Day,United States,-118.92,44.41,Oregon
936c3d7e-5d54-445...,13 Virtues Brewin...,brewpub,6410 SE Milwaukie...,Portland,United States,-122.65,45.48,Oregon
1988eb86-f0a2-467...,10 Barrel Brewing...,large,2620 Walnut St,Denver,United States,-104.99,39.76,Colorado
4ffda196-dd59-44a...,105 West Brewing Co,micro,1043 Park St,Castle Rock,United States,-104.87,39.38,Colorado
06e9fffb-e820-45c...,12Degree Brewing,brewpub,820 Main St,Louisville,United States,-105.13,39.98,Colorado
4b677b60-fef1-42e...,14er Brewing Company,proprietor,2801 Walnut St,Denver,United States,-104.98,39.76,Colorado
