In [0]:
from datetime import datetime
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("Gold").getOrCreate()

# Init variables
today = datetime.today().strftime('%Y%m%d')
storage_account_name = "desafioabinbev"
container_name_silver = "abinbev-silver"
container_name_gold = "abinbev-gold"
account_key = "RApm9QtERW4cJlgK25W9sus968RiyP0Ua2CkLWjKn8nWA/H0pR1l+zTn8K03MtlcTftbrxp/5VyX+AStprx+jw=="
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net", account_key)


In [0]:
# Unwanted files spark
spark.conf.set("spark.sql.sources.commitProtocolClass", "org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

In [0]:
from datetime import datetime
from pyspark.sql import DataFrame

def build_file_path(storage_account_name, container_name, blob_name):
    return f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_name}"


In [0]:
from pyspark.sql.utils import AnalysisException

def read_parquet_to_df(file_path, base_path):
    try:
        df = spark.read.option("basePath", base_path).parquet(file_path)
        if df is None or df.rdd.isEmpty():
            raise ValueError("DataFrame is empty")
        return df
    except AnalysisException as e:
        print(f"Error reading Parquet file: {e}")
        return None
    except ValueError as e:
        print(f"Data quality check failed: {e}")
        return None


In [0]:
def aggregate_breweries(df):
    aggregated_df = df.groupBy("country", "state", "brewery_type").count()
    aggregated_df = aggregated_df.withColumnRenamed("count", "quantity")
    return aggregated_df

In [0]:
def insert_aggregated_df(aggregated_df, table_name):
    try:
        aggregated_df.write \
            .format("delta") \
            .mode("overwrite") \
            .saveAsTable(table_name)
        print(f"Data successfully saved to {table_name}")
    except Exception as e:
        print(f"Error saving dataframe to table: {e}")

In [0]:
from pyspark.sql.functions import col
def validate_data(df):
    try:
        if df.filter(col("id").isNull()).count() > 0:
            raise ValueError("Data quality check failed: null values found in the ID column")
        return True
    except ValueError as e:
        print(e)
        return False

In [0]:

today = datetime.today().strftime('%Y%m%d')
file_path_silver = build_file_path(storage_account_name, container_name_silver, f"{today}")

df = read_parquet_to_df(file_path_silver, base_path=f"wasbs://{container_name_silver}@{storage_account_name}.blob.core.windows.net/")


In [0]:
output_path_gold = build_file_path(storage_account_name, container_name_gold, f"{today}")

def write_df_to_container(df, output_path):
    try:
        df.write.mode("overwrite").format("delta").parquet(output_path)
        print("Data successfully written to the container in parquet format.")
    except Exception as e:
        print(f"Error writing dataframe to the container {e}")

In [0]:
aggregated_df = aggregate_breweries(df)
table_name = "desafio_abinbev.gold.brewery_aggregated"

insert_aggregated_df(aggregated_df, table_name)

Data successfully saved to desafio_abinbev.gold.brewery_aggregated


In [0]:
write_df_to_container(aggregated_df, output_path_gold)

Data successfully written to the container in parquet format.


In [0]:
print("Number of records in brewery_aggregated:", aggregated_df.count())

display(aggregated_df)


Number of records in brewery_aggregated: 26


country,state,brewery_type,quantity
United States,North Carolina,brewpub,1
United States,Wisconsin,micro,2
United States,Illinois,micro,1
United States,Massachusetts,micro,1
United States,Oregon,brewpub,2
United States,North Carolina,micro,1
United States,Indiana,micro,3
United States,Oregon,large,4
United States,South Carolina,brewpub,1
United States,Idaho,large,1
