### O trecho abaixo traz as informações das external location criadas para interação com o Datalake

In [0]:
bronze_adls = "abfss://bronze-layer@medallionapistorage.dfs.core.windows.net/"
silver_adls = "abfss://silver-layer@medallionapistorage.dfs.core.windows.net/"

### Camada de transformação do dado carregado da API

In [0]:
import datetime 
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnull, when, current_timestamp, date_format, trim, initcap, upper
from pyspark.sql.types import StructType, StructField, StringType, DoubleType

# Cria uma SparkSession
spark = SparkSession.builder.appName("Open Brewery").getOrCreate()

In [0]:
schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("address_1", StringType(), True),
    StructField("address_2", StringType(), True),
    StructField("address_3", StringType(), True),
    StructField("brewery_type", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("state_province", StringType(), True),
    StructField("street", StringType(), True),
    StructField("country", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("phone", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("website_url", StringType(), True),
])

# Cria dataframe spark com os dados persistidos na camada bronze
df_silver = spark.read.schema(schema).json(f'{bronze_adls}open_breweries_raw.json')
#print(df.dtypes)

In [0]:
# Transformação dos dados (limpeza, substituição de nulos e padronização de strings)

#DROP COLUNA POSSIVELMENTE DUPLICADA 
# Drop da coluna state_province (importante validar alteração com área de negócios)
df_silver= df_silver.drop("state_province")

# REMOÇÃO DE DUPLICATAS
df_silver = df_silver.dropDuplicates()

# SUBSTITUIÇÃO DE NULOS
# Substituição de valores nulos por zero quando double e NA quando string
df_silver = (
        df_silver.withColumn('latitude', when(isnull(col('latitude')), 0).otherwise(col('latitude')))
        .withColumn('longitude', when(isnull(col('longitude')), 0).otherwise(col('longitude')))
        .withColumn('address_1', when(isnull(col('address_1')), 'NA').otherwise(col('address_1')))
        .withColumn('address_2', when(isnull(col('address_2')), 'NA').otherwise(col('address_2')))
        .withColumn('address_3', when(isnull(col('address_3')), 'NA').otherwise(col('address_3')))
        .withColumn('phone', when(isnull(col('phone')), 'NA').otherwise(col('phone')))
        .withColumn('website_url', when(isnull(col('website_url')), 'NA').otherwise(col('website_url')))
        .withColumn('street', when(isnull(col('street')), 'NA').otherwise(col('street')))
)

#PADRONIZAÇÃO DOS VALORES STRING
#Ajuste dos valores string para remover espaços identificados nas colunas country e state e ajustar capitalização
df_silver = (
    df_silver
    .withColumn("country",trim(col("country")))
    .withColumn("country", initcap(col("country")))
    .withColumn("state",trim(col("state")))
    .withColumn("state",upper(col("state")))
)


# ADIÇÃO DE COLUNA COM DATETIME DA CARGA
# Adição de coluna dat_load para auditoria dos dados
df_silver = df_silver.withColumn('dat_load', date_format(current_timestamp(), "yyyy-MM-dd HH:mm:ss"))

# Verifica o df após transformações
#display(df_silver)

# SALVA O DF NO FORMATO DELTA NA CAMADA SILVER
# Indica o caminho para persistir os dados
delta_path = f"{silver_adls}open_breweries_delta"

# Salva os dados no formato delta
(
    df_silver
    .write
    .mode("overwrite")
    .format("delta")
    .partitionBy("country", "state")
    .save(delta_path)
)

print('Dados transformados e salvos na camada silver')


In [0]:
# Cria tabela para testes de qualidade de dados com dbt
spark.sql(f""" 
    CREATE TABLE IF NOT EXISTS brewery.default.open_brewery_silver
    USING DELTA
    LOCATION '{delta_path}'
""")

# Verifica a criação da tabela
#spark.sql("SHOW TABLES").show()