### Notebook Overview:
- In this databricks notebook i am creating a ETL process layer that consists in reading data from the Bronze Folder, that has the data ingested from the API, i will perform some transformations, do some quality checks like look for duplicates and drop them, and finally save all of it into a silver table

**Create the databases/schemas (each tool calls it in a different way)**

In [0]:
spark.sql(""" CREATE DATABASE IF NOT EXISTS silver""")
spark.sql(""" CREATE DATABASE IF NOT EXISTS gold""")

In [0]:
# display(
#     spark.sql("SHOW DATABASES")
# )

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import DataFrame
import hashlib
from delta.tables import DeltaTable

In [0]:
# Read JSON files from the Bronze layer
df_bronze = spark.read.format("delta").load("/mnt/files/bronze/")

In [0]:

# Define the expected schema
expected_schema = StructType([
    StructField("id", StringType(), True),
    StructField("name", StringType(), True),
    StructField("brewery_type", StringType(), True),
    StructField("street", StringType(), True),
    StructField("city", StringType(), True),
    StructField("state", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("country", StringType(), True),
    StructField("longitude", StringType(), True),
    StructField("latitude", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("website_url", StringType(), True),
    StructField("updated_at", StringType(), True),
    StructField("created_at", StringType(), True)
])

In [0]:
# Cast longitude and latitude to DoubleType
df_clean = df_bronze.withColumn("longitude", col("longitude").cast(DoubleType())) \
                    .withColumn("latitude", col("latitude").cast(DoubleType()))

# drops the rows with missing id
df_clean = df_clean.na.drop(subset=["id"])

# Standardize text columns to lower case
text_columns = ["name", "brewery_type", "city", "state", "country"]
for col_name in text_columns:
    df_clean = df_clean.withColumn(col_name, lower(col(col_name)))

# masking sensitive data (like phone, user cpf, user uuid and so on)
def hash_sensitive_data(df: DataFrame, column_name: str) -> DataFrame:
    return df.withColumn(column_name, sha2(col(column_name), 256))

df_clean = hash_sensitive_data(df_clean, "phone")
display(df_clean)

In [0]:
#drops the duplicated rows
df_clean = df_clean.dropDuplicates(["id"])

#drops the rows that has the postal code out of the 'xxxxx-xxxx' format - testing for USA as an example
usa_postal_code_pattern = "^[0-9]{5}(-[0-9]{4})?$|^[0-9]{5}-?$"  # USA format: xxxxx or xxxxx-xxxx

# Filter rows with invalid postal codes for the United States
df_invalid_postal = df_clean.filter(
    (col("country") == "united states") & ~col("postal_code").rlike(usa_postal_code_pattern)
)

# Check for duplicates based on 'id'
df_duplicates = df_clean.groupBy("id").count().filter("count > 1")
if df_duplicates.count() > 0:
    print(f"Warning: i found: {df_duplicates.count()} Duplicates rows found based on 'id'. ")    

#drops the duplicated rows
df_clean = df_clean.dropDuplicates(["id"])

# Validate 'postal_code' format
if df_invalid_postal.count() > 0:
    print("Warning: Invalid postal codes found.")

#filter and test for both phone and website rows NOT NULL
df_clean = df_clean.filter(~(col("phone").isNull() & col("website_url").isNull()))

df_no_contact_information = df_clean.filter((col("phone").isNull() & col("website_url").isNull()))
                                            
if df_no_contact_information.count() > 0:
    print(f"We have {df_no_contact_information.count()} registries that does not have a contact information")


# Log data quality issues
data_quality_issues = df_duplicates.count() + df_invalid_postal.count() + df_no_contact_information.count()
if data_quality_issues > 0:
    raise Exception("We should not carry data with these quality checks not approved")
else:
    fl_quality = 1 #setting a quality check flag in case we need in the future
    print("Our data is good to proceed :)")



**saving the cleaned data**
- if the table does not exists, perform the overwrite and writes a delta table
- otherwise perform the merge statement, making sure that the 'id's will not repeat.

In [0]:
df_clean = df_clean.withColumn("ts_loadtimestamp_silver", current_timestamp())

In [0]:
delta_table_path = '/mnt/files/silver/'
df_clean.write.partitionBy("state") \
    .format("delta") \
    .mode("overwrite") \
    .save(delta_table_path)

**Overrall review for this notebook**:
- this notebook performs some data cleaning such as: droping -> dupicated id's, postal codes not accordinly to a pattern (tested for USA postal codes, can be replicated to other countries), also droped rows that does not have contact information (both phone AND website_url).
- performed a sensitive data masking for phone as an example, could have been for user name, last name, date of birth and so on.
- this logic also protects the data load if the quality checks fail rasing an error that will be caught in datafactory monitoring
- finally we partitioned the data by the load timestamp which is the usual approach in production, but since we only have this column after setting it up on the ingestion to bronze layer, we had to perform a similar approach when saving the data from the API.

In [0]:
spark.sql(f'OPTIMIZE budinworkspac.silver.silver_bewery')