# **Silver Layer Notebook**

This notebook will extract the raw datasets from the lakehouse, transform them through data cleaning, and load the clean tables in the lakehouse.

In [1]:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType
import unidecode

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 3, Finished, Available, Finished)

#### _Function to check for Nulls and Duplicates_

In [2]:
# Checking for Nulls and Duplicates
def null_dups_table(df):
    print("Checking for Nulls and Duplicates")
    
    total_rows = df.count()
    print("Total rows:", total_rows)

    # Check for null values
    null_counts = df.select([
        sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
    ]).collect()[0].asDict()
    
    print("\nMissing values per column:")
    for col_name, cnt in null_counts.items():
        print(f"  {col_name}: {cnt}")
    
    # Check for duplicates
    dups = (df.groupBy(df.columns)
            .agg(count("*").alias("cnt"))
            .filter(col("cnt") > 1))
    dup_count = dups.count()
    
    if dup_count > 0:
        print(f"\nFound {dup_count:,} duplicate rows")
        dups.show(10)
    else:
        print(f"\nNo duplicates found")

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 4, Finished, Available, Finished)

#### _Function to drop duplicates_


In [3]:
# Remove rows that are completely identical across all columns.
def drop_exact_dups(df):
    print("Dropping Duplicates")

    before = df.count()
    df_clean = df.dropDuplicates()
    after = df_clean.count()

    removed = before - after
    print(f"\nRemoved {removed:,} duplicates (kept {after:,} rows).")

    return df_clean

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 5, Finished, Available, Finished)

## <font color="green">Customers Dataset</font>

In [4]:
bronze_path = "Files/Olist_BronzeLayer_Customers/Olist_Customers.csv"
silver_table_name = "Olist_Silver_Customers"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 6, Finished, Available, Finished)

### Read the CSV

In [5]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 6f00e831-560e-4b96-9ccc-9cdd30b933c9)

### Data Cleaning

In [6]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 8, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 99441

Missing values per column:
  customer_id: 0
  customer_unique_id: 0
  customer_zip_code_prefix: 0
  customer_city: 0
  customer_state: 0

No duplicates found


In [7]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 9, Finished, Available, Finished)

[('customer_id', 'string'),
 ('customer_unique_id', 'string'),
 ('customer_zip_code_prefix', 'string'),
 ('customer_city', 'string'),
 ('customer_state', 'string')]

Only `customer_zip_code_prefix` needs to be fixed

In [8]:
# Correct the data types
df = (df.withColumn("customer_zip_code_prefix", col("customer_zip_code_prefix").cast("int")))
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 10, Finished, Available, Finished)

[('customer_id', 'string'),
 ('customer_unique_id', 'string'),
 ('customer_zip_code_prefix', 'int'),
 ('customer_city', 'string'),
 ('customer_state', 'string')]

### Write to Lakehouse as Silver managed table

In [9]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 11, Finished, Available, Finished)

Saved Olist_Silver_Customers


## <font color="green">Geolocation Dataset</font>

In [85]:
bronze_path = "Files/Olist_BronzeLayer_Geolocation/Olist_Geolocation.csv"
silver_table_name = "Olist_Silver_Geolocation"

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 95, Finished, Available, Finished)

### Read the CSV

In [86]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 96, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 114e548d-5b82-42e1-a3ca-4d54f8192536)

### Data Cleaning

#### Remove Accents

In [87]:
# UDF to remove accents
def remove_accents(s):
    if s is None:
        return None
    return unidecode.unidecode(s)

remove_accents_udf = udf(remove_accents, StringType())

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 97, Finished, Available, Finished)

In [88]:
# Apply transformations

"""

The original column will be renamed as "geolocation_city_raw"
The normalized column will receive the name "geolocation_city"

"""

df = (
    df
    .withColumnRenamed("geolocation_city", "geolocation_city_raw")
    .withColumn("geolocation_city", lower(trim(col("geolocation_city_raw"))))
    .withColumn("geolocation_city", regexp_replace("geolocation_city", "\s+", " "))
    .withColumn("geolocation_city", remove_accents_udf("geolocation_city"))
)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 98, Finished, Available, Finished)

In [89]:
(
    df
    .select("geolocation_zip_code_prefix", "geolocation_city_raw", "geolocation_city")
    .orderBy("geolocation_city")
    .show(20, truncate=False)
)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 99, Finished, Available, Finished)

+---------------------------+--------------------+-------------------+
|geolocation_zip_code_prefix|geolocation_city_raw|geolocation_city   |
+---------------------------+--------------------+-------------------+
|25970                      |´teresopolis        |'teresopolis       |
|81470                      |* cidade            |* cidade           |
|28930                      |...arraial do cabo  |...arraial do cabo |
|87365                      |4º centenario       |4o centenario      |
|87365                      |4º centenario       |4o centenario      |
|87365                      |4o. centenario      |4o. centenario     |
|75345                      |abadia de goias     |abadia de goias    |
|75345                      |abadia de goias     |abadia de goias    |
|75345                      |abadia de goias     |abadia de goias    |
|75345                      |abadia de goias     |abadia de goias    |
|75345                      |abadia de goias     |abadia de goias    |
|75345

There are some messy city names, like the first six in the table above.

**Decision**
- **<font color="red">Leave it as is and add a data quality flag, identifying when there's a non-alphabetic character.</font>**

#### Flag Typos

In [90]:
# Add flag for non-alphabetic characters
df = (
    df
    .withColumn(
        "flag_nonalpha_city",
        when(col("geolocation_city").rlike("[^a-z ]"), lit(True)).otherwise(lit(False))
    )
)

# Summary counts
total_flagged = df.filter(col("flag_nonalpha_city") == True).count()
unique_flagged = (
    df.filter(col("flag_nonalpha_city") == True)
    .select("geolocation_city").distinct().count()
)

print(f"Total rows flagged: {total_flagged:,}")
print(f"Unique city names flagged: {unique_flagged:,}")

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 100, Finished, Available, Finished)

Total rows flagged: 3,910
Unique city names flagged: 110


In [91]:
# Preview first 20 possibly problematic names
(
    df
    .filter(col("flag_nonalpha_city") == True)
    .select("geolocation_zip_code_prefix", "geolocation_city", "flag_nonalpha_city")
    .distinct()
    .orderBy("geolocation_city")
    .show(20, truncate=False)
)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 101, Finished, Available, Finished)

+---------------------------+--------------------------------------+------------------+
|geolocation_zip_code_prefix|geolocation_city                      |flag_nonalpha_city|
+---------------------------+--------------------------------------+------------------+
|25970                      |'teresopolis                          |true              |
|81470                      |* cidade                              |true              |
|28930                      |...arraial do cabo                    |true              |
|87365                      |4o centenario                         |true              |
|87365                      |4o. centenario                        |true              |
|76954                      |alta floresta d'oeste                 |true              |
|76930                      |alvorada d'oeste                      |true              |
|35698                      |antunes (igaratinga)                  |true              |
|15735                      |apa

#### Nulls, Duplicates, and Data Types

In [92]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 102, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 1000163

Missing values per column:
  geolocation_zip_code_prefix: 0
  geolocation_lat: 0
  geolocation_lng: 0
  geolocation_city_raw: 0
  geolocation_state: 0
  geolocation_city: 0
  flag_nonalpha_city: 0

Found 128,174 duplicate rows
+---------------------------+-------------------+-------------------+--------------------+-----------------+----------------+------------------+---+
|geolocation_zip_code_prefix|    geolocation_lat|    geolocation_lng|geolocation_city_raw|geolocation_state|geolocation_city|flag_nonalpha_city|cnt|
+---------------------------+-------------------+-------------------+--------------------+-----------------+----------------+------------------+---+
|                      01150| -23.53205041993188| -46.66025965598129|           sao paulo|               SP|       sao paulo|             false|  3|
|                      01139| -23.52198725618896| -46.66381768373821|           sao paulo|               SP|       sao pau

In [93]:
display(df.groupBy(df.columns).agg(count("*").alias("cnt")).filter(col("cnt") > 1))

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 103, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, b4a51744-17a5-41e6-b332-74cacb594ddb)

We can remove the completely identical rows

In [94]:
df = drop_exact_dups(df)

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 104, Finished, Available, Finished)

Dropping Duplicates

Removed 261,831 duplicates (kept 738,332 rows).


In [95]:
# Check the data types
df.dtypes

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 105, Finished, Available, Finished)

[('geolocation_zip_code_prefix', 'string'),
 ('geolocation_lat', 'string'),
 ('geolocation_lng', 'string'),
 ('geolocation_city_raw', 'string'),
 ('geolocation_state', 'string'),
 ('geolocation_city', 'string'),
 ('flag_nonalpha_city', 'boolean')]

In [96]:
# Correct the data types
df = (
    df
    .withColumn("geolocation_zip_code_prefix", col("geolocation_zip_code_prefix").cast("int"))
    .withColumn("geolocation_lat", col("geolocation_lat").cast("double"))
    .withColumn("geolocation_lng", col("geolocation_lng").cast("double"))
)
df.dtypes

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 106, Finished, Available, Finished)

[('geolocation_zip_code_prefix', 'int'),
 ('geolocation_lat', 'double'),
 ('geolocation_lng', 'double'),
 ('geolocation_city_raw', 'string'),
 ('geolocation_state', 'string'),
 ('geolocation_city', 'string'),
 ('flag_nonalpha_city', 'boolean')]

### Write to Lakehouse as Silver managed table

In [98]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3b904aed-acf0-410a-97c6-693afe60c86b, 108, Finished, Available, Finished)

Saved Olist_Silver_Geolocation


## <font color="green">Order Items Dataset</font>

In [18]:
bronze_path = "Files/Olist_BronzeLayer_OrderItems/Olist_OrderItems.csv"
silver_table_name = "Olist_Silver_OrderItems"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 20, Finished, Available, Finished)

### Read the CSV

In [19]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 21, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1ad9ec40-fa1b-4a61-a824-c333c7f89fc4)

### Data Cleaning

In [20]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 22, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 112650

Missing values per column:
  order_id: 0
  order_item_id: 0
  product_id: 0
  seller_id: 0
  shipping_limit_date: 0
  price: 0
  freight_value: 0

No duplicates found


In [21]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 23, Finished, Available, Finished)

[('order_id', 'string'),
 ('order_item_id', 'string'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('shipping_limit_date', 'string'),
 ('price', 'string'),
 ('freight_value', 'string')]

In [22]:
# Correct the data types
df = (
    df
    .withColumn("order_item_id", col("order_item_id").cast("int"))
    .withColumn("shipping_limit_date", col("shipping_limit_date").cast("timestamp"))
    .withColumn("price", col("price").cast("decimal(10,2)"))
    .withColumn("freight_value", col("freight_value").cast("decimal(10,2)"))
)
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 24, Finished, Available, Finished)

[('order_id', 'string'),
 ('order_item_id', 'int'),
 ('product_id', 'string'),
 ('seller_id', 'string'),
 ('shipping_limit_date', 'timestamp'),
 ('price', 'decimal(10,2)'),
 ('freight_value', 'decimal(10,2)')]

### Write to Lakehouse as Silver managed table

In [23]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 25, Finished, Available, Finished)

Saved Olist_Silver_OrderItems


## <font color="green">Order Payments Dataset</font>

In [24]:
bronze_path = "Files/Olist_BronzeLayer_OrderPayments/Olist_Payments.csv"
silver_table_name = "Olist_Silver_OrderPayments"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 26, Finished, Available, Finished)

### Read the CSV

In [25]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 27, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, a3f999a4-2888-46af-ae4b-44742e10601f)

### Data Cleaning

In [26]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 28, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 103886

Missing values per column:
  order_id: 0
  payment_sequential: 0
  payment_type: 0
  payment_installments: 0
  payment_value: 0

No duplicates found


In [27]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 29, Finished, Available, Finished)

[('order_id', 'string'),
 ('payment_sequential', 'string'),
 ('payment_type', 'string'),
 ('payment_installments', 'string'),
 ('payment_value', 'string')]

In [28]:
# Correct the data types
df = (
    df
    .withColumn("payment_sequential", col("payment_sequential").cast("int"))
    .withColumn("payment_installments", col("payment_installments").cast("int"))
    .withColumn("payment_value", col("payment_value").cast("decimal(10,2)"))
)
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 30, Finished, Available, Finished)

[('order_id', 'string'),
 ('payment_sequential', 'int'),
 ('payment_type', 'string'),
 ('payment_installments', 'int'),
 ('payment_value', 'decimal(10,2)')]

### Write to Lakehouse as Silver managed table

In [29]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 31, Finished, Available, Finished)

Saved Olist_Silver_OrderPayments


## <font color="green">Order Reviews Dataset</font>

In [134]:
bronze_path = "Files/Olist_BronzeLayer_OrderReviews/Olist_Reviews.csv"
silver_table_name = "Olist_Silver_OrderReviews"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 136, Finished, Available, Finished)

### Read the CSV

In [135]:
# Special configuration to prevent loading corruptions as the text fields contain commas.
df = (spark.read
       .option("header", "true")
       .option("quote", '"')
       .option("escape", '"')
       .option("multiLine", "true")   # if reviews include line breaks
       .option("mode", "PERMISSIVE")  # avoids hard failures on bad lines
       .csv(bronze_path))

display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 137, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 4a049611-006f-4666-b00e-4b0c1a02dc00)

### Data Cleaning

In [136]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 138, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 99224

Missing values per column:
  review_id: 0
  order_id: 0
  review_score: 0
  review_comment_title: 87656
  review_comment_message: 58247
  review_creation_date: 0
  review_answer_timestamp: 0

No duplicates found


# There are 814 duplicate review_ids.

alguns review id tem varios order id, mas alguns destes order id nao existem na stg_order_items

1) Pick a keep-rule (business logic)

Keep the latest version of each review_id:
	•	order by review_creation_date (desc)
	•	tie‑break by review_answer_timestamp (desc)
	•	(optional tiebreaker) order_id (asc)

2) De‑dupe in your Silver notebook (PySpark)

from pyspark.sql import functions as F, Window as W

# df_reviews = your Bronze→Silver dataframe (current reviews)
w = W.partitionBy("review_id").orderBy(
    F.col("review_creation_date").desc(),
    F.col("review_answer_timestamp").desc(),
    F.col("order_id").asc()
)

df_reviews_dedup = (
    df_reviews
    .withColumn("rn", F.row_number().over(w))
    .withColumn("is_dup", F.when(F.col("rn") > 1, F.lit(1)).otherwise(F.lit(0)))
)

# Keep one row per review_id
df_reviews_clean = df_reviews_dedup.filter(F.col("rn") == 1).drop("rn")

# (Optional) Persist a duplicates audit table
df_reviews_dups = df_reviews_dedup.filter("is_dup = 1")

# Write back to Silver (adjust paths/names to your Lakehouse layout)
df_reviews_clean.write.mode("overwrite").format("delta").saveAsTable("silver_order_reviews")   # your canonical Silver table
df_reviews_dups.write.mode("overwrite").format("delta").saveAsTable("silver_order_reviews_dups_audit")

3) QA (before/after)

total = df_reviews.count()
dups  = df_reviews_dedup.filter("is_dup = 1").count()
after = df_reviews_clean.count()
print(f"Total: {total:,} | Duplicates: {dups:,} | After de-dup: {after:,}")
# Should satisfy: after == total - dups

4) Feed staging from the cleaned Silver

Point your Copy activity to silver_order_reviews (the cleaned one), so stg.stg_order_reviews no longer carries dup IDs.


The missing values mean that the review doesn't contain a title or a message, which is plausible.

**Decision**
- **<font color="red">Leave it as is and add a data quality flag.</font>**

In [140]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 142, Finished, Available, Finished)

[('review_id', 'string'),
 ('order_id', 'string'),
 ('review_score', 'string'),
 ('review_comment_title', 'string'),
 ('review_comment_message', 'string'),
 ('review_creation_date', 'string'),
 ('review_answer_timestamp', 'string')]

In [141]:
# Correct the data types
df = (
    df
    .withColumn("review_score", col("review_score").cast("int"))
    .withColumn("review_creation_date", col("review_creation_date").cast("timestamp"))
    .withColumn("review_answer_timestamp", col("review_answer_timestamp").cast("timestamp"))
)
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 143, Finished, Available, Finished)

[('review_id', 'string'),
 ('order_id', 'string'),
 ('review_score', 'int'),
 ('review_comment_title', 'string'),
 ('review_comment_message', 'string'),
 ('review_creation_date', 'timestamp'),
 ('review_answer_timestamp', 'timestamp')]

### Data Quality Flag for Missing Values

In [142]:
# Registering the flag columns
df_flagged = (
    df
    
    # Global missing flag: 1 if any important field is null
    .withColumn(
        "flag_missing_value",
        when(
            col("review_comment_title").isNull() |
            col("review_comment_message").isNull(), True)
        .otherwise(False)
    )
    
    # Create a note column for each variable that contains nulls.
    .withColumn(
        "flag_note_review_info",
        when(
            col("review_comment_title").isNull() |
            col("review_comment_message").isNull(), "plausible")
        .otherwise("valid")
    )
)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 144, Finished, Available, Finished)

### Write to Lakehouse as Silver managed table

In [144]:
# Write the table
df_flagged.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 146, Finished, Available, Finished)

Saved Olist_Silver_OrderReviews


## <font color="green">Orders Dataset</font>

In [30]:
bronze_path = "Files/Olist_BronzeLayer_Orders/Olist_Orders.csv"
silver_table_name = "Olist_Silver_Orders"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 32, Finished, Available, Finished)

### Read the CSV

In [31]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 33, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, c9e2d117-0635-4875-8206-575ccc377a86)

### Data Cleaning

In [32]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 34, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 99441

Missing values per column:
  order_id: 0
  customer_id: 0
  order_status: 0
  order_purchase_timestamp: 0
  order_approved_at: 160
  order_delivered_carrier_date: 1783
  order_delivered_customer_date: 2965
  order_estimated_delivery_date: 0

No duplicates found


There are many missing values. Let's address each of them.

#### order_approved_at

In [33]:
# Count rows by status, separating null vs non-null
df.groupBy("order_status").agg(
    count(when(col("order_approved_at").isNull(), 1)).alias("null_count"),
    count(when(col("order_approved_at").isNull(), 1)).alias("non_null_count")
).show()

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 35, Finished, Available, Finished)

+------------+----------+--------------+
|order_status|null_count|non_null_count|
+------------+----------+--------------+
|     shipped|         0|             0|
|    canceled|       141|           141|
|    invoiced|         0|             0|
|     created|         5|             5|
|   delivered|        14|            14|
| unavailable|         0|             0|
|  processing|         0|             0|
|    approved|         0|             0|
+------------+----------+--------------+



- 141 are cancelled - plausible (never approved).
- 5 are created - plausible (order created but never approved).
- 14 are delivered - **anomaly** (delivered with no approval timestamp).

**Decision**
- **<font color="red">Leave it as is and add a data quality flag.</font>**

#### order_delivered_carrier_date and order_delivered_customer_date

In [34]:
# Count rows by status, separating null vs non-null
df.groupBy("order_status").agg(
    count(when(col("order_delivered_carrier_date").isNull(), 1)).alias("missing_carrier"),
    count(when(col("order_delivered_customer_date").isNull(), 1)).alias("missing_customer"),
    count(when(col("order_delivered_carrier_date").isNotNull() & col("order_delivered_customer_date").isNull(), 1)).alias("in_delivery")
).show()

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 36, Finished, Available, Finished)

+------------+---------------+----------------+-----------+
|order_status|missing_carrier|missing_customer|in_delivery|
+------------+---------------+----------------+-----------+
|     shipped|              0|            1107|       1107|
|    canceled|            550|             619|         69|
|    invoiced|            314|             314|          0|
|     created|              5|               5|          0|
|   delivered|              2|               8|          7|
| unavailable|            609|             609|          0|
|  processing|            301|             301|          0|
|    approved|              2|               2|          0|
+------------+---------------+----------------+-----------+



- Shipped
  - 1107 orders were shipped but not delivered to the customer – plausible
- Canceled
  - 550 orders were cancelled and not delivered to the carrier – plausible
  - 619 orders were cancelled and not delivered to the customer
    - 550 were not delivered to the carrier – plausible
    - 69 were cancelled during the delivery process - plausible
- Invoiced
  - 314 orders were invoiced and not yet delivered to the carrier- plausible
- Created
  - 5 orders were created and not yet delivered to the carrier- plausible
- Delivered
  - 1 order was delivered but not registered as delivered to the carrier and the customer – **anomaly**
  - 1 order was delivered and registered as delivered to the customer, but not to the carrier - **anomaly**
  - 7 orders were delivered but not registered as delivered to the customer – **anomaly**
- Unavailable
  - 609 orders are unavailable - plausible
- Processing
  - 609 orders are being processed - plausible
- Approved
  - 2 orders were approved and not yet delivered to the carrier- plausible

**Decision**
- **<font color="red">Leave it as is and add a data quality flag.</font>**





#### Correcting Data Types

In [35]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 37, Finished, Available, Finished)

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('order_purchase_timestamp', 'string'),
 ('order_approved_at', 'string'),
 ('order_delivered_carrier_date', 'string'),
 ('order_delivered_customer_date', 'string'),
 ('order_estimated_delivery_date', 'string')]

In [36]:
# Correct the data types
df = (
    df
    .withColumn("order_purchase_timestamp", col("order_purchase_timestamp").cast("timestamp"))
    .withColumn("order_approved_at", col("order_approved_at").cast("timestamp"))
    .withColumn("order_delivered_carrier_date", col("order_delivered_carrier_date").cast("timestamp"))
    .withColumn("order_delivered_customer_date", col("order_delivered_customer_date").cast("timestamp"))
    .withColumn("order_estimated_delivery_date", col("order_estimated_delivery_date").cast("timestamp"))
)
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 38, Finished, Available, Finished)

[('order_id', 'string'),
 ('customer_id', 'string'),
 ('order_status', 'string'),
 ('order_purchase_timestamp', 'timestamp'),
 ('order_approved_at', 'timestamp'),
 ('order_delivered_carrier_date', 'timestamp'),
 ('order_delivered_customer_date', 'timestamp'),
 ('order_estimated_delivery_date', 'timestamp')]

#### Date Sanity Checks

To validate temporal logic between events

##### 1. order_purchase_timestamp ≤ order_approved_at

In [37]:
insane_purchase_approval = df.filter(col("order_purchase_timestamp") > col("order_approved_at")).count()
print(f"Found {insane_purchase_approval:,} payments approved before the purchase")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 39, Finished, Available, Finished)

Found 0 payments approved before the purchase


##### 2. order_approved_at ≤ order_delivered_carrier_date ≤ order_delivered_customer_date

In [38]:
insane_approval_carrier = df.filter(col("order_approved_at") >= col("order_delivered_carrier_date")).count()
print(f"Found {insane_approval_carrier:,} orders delivered to the carrier before the payment approval")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 40, Finished, Available, Finished)

Found 1,359 orders delivered to the carrier before the payment approval


In [39]:
insane_carrier_customer = df.filter(col("order_delivered_carrier_date") > col("order_delivered_customer_date")).count()
print(f"Found {insane_carrier_customer:,} orders delivered to the customer before being delivered to the carrier")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 41, Finished, Available, Finished)

Found 23 orders delivered to the customer before being delivered to the carrier


Check #2 has true temporal **anomalies**.

**Decision**
- **<font color="red">Leave it as is and add a data quality flag.</font>**

### Data Quality Flag for Missing Values and Date Insanity

# WHERE order_status = 'canceled' and flag_missing_value != 1;

In [40]:
# Registering the flag columns
df_flagged = (
    df
    
    # Global missing flag: 1 if any important field is null
    .withColumn(
        "flag_missing_value",
        when(
            col("order_approved_at").isNull() |
            col("order_delivered_carrier_date").isNull() |
            col("order_delivered_customer_date").isNull(), True
        ).otherwise(False)
    )
    
    # Creating a note column for each variable that contains nulls.
    # It will inform the data cleaning conclusion about them: plausible, anomaly, or valid value.
    
    # order_approved_at
    .withColumn(
        "flag_note_order_approved_at",
        when( (col("order_approved_at").isNull()) & (col("order_status").isin("canceled","created")), "plausible")
        .when( (col("order_approved_at").isNull()) & (col("order_status") == "delivered"), "anomaly")
        .when(  col("order_approved_at").isNull(), "plausible")
        .otherwise("valid")
    )
    # order_delivered_carrier_date
    .withColumn(
        "flag_note_order_delivered_carrier_date",
        when( (col("order_delivered_carrier_date").isNull()) & (col("order_status").isin("canceled","created","processing","invoiced","unavailable","approved")), "plausible")
        .when( (col("order_delivered_carrier_date").isNull()) & (col("order_status") == "delivered"), "anomaly")
        .when( (col("order_delivered_carrier_date").isNotNull()) & (col("order_delivered_customer_date").isNull()) & (col("order_status") == "delivered"), "anomaly")
        .when( (col("order_delivered_carrier_date").isNull()) & (col("order_status") == "shipped"), "plausible")
        .otherwise("valid")
    )
    # order_delivered_customer_date
    .withColumn(
        "flag_note_order_delivered_customer_date",
        when( (col("order_delivered_customer_date").isNull()) & (col("order_status").isin("canceled","created","processing","invoiced","unavailable","approved","shipped")), "plausible")
        .when( (col("order_delivered_customer_date").isNull()) & (col("order_status") == "delivered"), "anomaly")
        .when( (col("order_delivered_customer_date").isNotNull()) & (col("order_delivered_carrier_date").isNull()) & (col("order_status") == "delivered"), "anomaly")
        .otherwise("valid")
    )


    # Temporal sanity flags (impossible sequences)
    .withColumn(
        "flag_carrier_before_approval",
        when( (col("order_delivered_carrier_date").isNotNull()) & 
              (col("order_approved_at").isNotNull()) &
              (col("order_approved_at") > col("order_delivered_carrier_date")), True
        ).otherwise(False)
    )
    .withColumn(
        "flag_customer_before_carrier",
        when( (col("order_delivered_customer_date").isNotNull()) &
              (col("order_delivered_carrier_date").isNotNull()) &
              (col("order_delivered_carrier_date") > col("order_delivered_customer_date")), True
        ).otherwise(False)
    )
)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 42, Finished, Available, Finished)

#### Write to Lakehouse as Silver managed table

In [41]:
# Write the table
df_flagged.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 43, Finished, Available, Finished)

Saved Olist_Silver_Orders


In [42]:
del df_flagged

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 44, Finished, Available, Finished)

## <font color="green">Products Dataset</font>

In [4]:
bronze_path = "Files/Olist_BronzeLayer_Products/Olist_Products.csv"
silver_table_name = "Olist_Silver_Products"

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 6, Finished, Available, Finished)

### Read the CSV

In [5]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 7, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 2eb20d1f-c956-4a27-a2d7-a3fd1e66a6b1)

### Data Cleaning

#### Product Category Names Translation

The file Olist_ProdCatNameTranslation.csv translates the product_category_name to English.

In [6]:
categ_name_df = spark.read.format("csv").option("header","true").load("Files/Olist_BronzeLayer_ProdCatNameTranslation/Olist_ProdCatNameTranslation.csv")

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 8, Finished, Available, Finished)

In [7]:
# Checking for Nulls and Duplicates
null_dups_table(categ_name_df)

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 9, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 71

Missing values per column:
  product_category_name: 0
  product_category_name_english: 0

No duplicates found


In [8]:
# Create a new column with the English product_category_name, keeping the original
p = df.alias("p")
c = categ_name_df.alias("c")

# keep every original product column except the original category (we’ll replace it)
rest_cols = [x for x in df.columns if x not in ("product_id", "product_category_name")]

df = (
    p.join(c, col("p.product_category_name") == col("c.product_category_name"), "left")
     .select(
        col("p.product_id"),
        col("p.product_category_name").alias("product_category_name_raw"),
        coalesce(col("c.product_category_name_english"), col("product_category_name_raw")).alias("product_category_name"),
        *[col(f"p.{x}") for x in rest_cols]
     )
)

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 10, Finished, Available, Finished)

#### Missing Values and Duplicates

In [9]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 11, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 32951

Missing values per column:
  product_id: 0
  product_category_name_raw: 610
  product_category_name: 610
  product_name_lenght: 610
  product_description_lenght: 610
  product_photos_qty: 610
  product_weight_g: 2
  product_length_cm: 2
  product_height_cm: 2
  product_width_cm: 2

No duplicates found


In [10]:
display(df
    .filter(
        (df.product_weight_g.isNull()) &
        (df.product_length_cm.isNull()) &
        (df.product_height_cm.isNull()) &
        (df.product_width_cm.isNull())
        )
    )

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 12, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1659ec5d-1044-4400-bc5b-4a1c15b8f3a6)

In [11]:
display(df
    .filter(
        (df.product_category_name_raw.isNull()) &
        (df.product_category_name.isNull()) &
        (df.product_name_lenght.isNull()) &
        (df.product_description_lenght.isNull()) &
        (df.product_photos_qty.isNull())
        )
    )

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 13, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 1e5667df-671d-415b-bc7d-544946ebf799)

- There is 1 row entirely with Nulls except for product_id.
- There is 1 row with product category, name, description, and photos information, but not with the product dimensions.
- The remaining rows have the product dimensions but not the other product information.

**Decision**
- **<font color="red">Replace the nulls in product_category_name with "Unknown."</font>**
- **<font color="red">Leave the rest as is and add a data quality flag.</font>**

### Fixing "product_category_name" and Data Quality Flag for Missing Values

In [12]:
df = (
    df
    
    # Fill product_category_name only
    .withColumn(
        "product_category_name",
        coalesce(col("product_category_name"), lit("unknown"))
    )
    
    
    # Flag any important info missing
    .withColumn(
        "flag_missing_product_info",
        when(
            (col("product_category_name") == lit("unknown")) |
            (col("product_name_lenght").isNull()) |
            (col("product_description_lenght").isNull()) |
            (col("product_photos_qty").isNull()) |
            (col("product_weight_g").isNull()) |
            (col("product_length_cm").isNull()) |
            (col("product_height_cm").isNull()) |
            (col("product_width_cm").isNull()),
            True
        ).otherwise(False)
    )

    # Create a note column for each variable that contains nulls.
    .withColumn(
        "flag_note_product_info",
        when(col("flag_missing_product_info"), lit("missing product information"))
        .otherwise(lit("valid"))
    )
)

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 14, Finished, Available, Finished)

#### Correcting Data Types

In [13]:
# Check the data types
df.dtypes

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 15, Finished, Available, Finished)

[('product_id', 'string'),
 ('product_category_name_raw', 'string'),
 ('product_category_name', 'string'),
 ('product_name_lenght', 'string'),
 ('product_description_lenght', 'string'),
 ('product_photos_qty', 'string'),
 ('product_weight_g', 'string'),
 ('product_length_cm', 'string'),
 ('product_height_cm', 'string'),
 ('product_width_cm', 'string'),
 ('flag_missing_product_info', 'boolean'),
 ('flag_note_product_info', 'string')]

In [14]:
# Correct the data types
df = (
    df
    .withColumn("product_name_lenght", col("product_name_lenght").cast("int"))
    .withColumn("product_description_lenght", col("product_description_lenght").cast("int"))
    .withColumn("product_photos_qty", col("product_photos_qty").cast("int"))
    .withColumn("product_weight_g", col("product_weight_g").cast("int"))
    .withColumn("product_length_cm", col("product_length_cm").cast("int"))
    .withColumn("product_height_cm", col("product_height_cm").cast("int"))
    .withColumn("product_width_cm", col("product_width_cm").cast("int"))
)
df.dtypes

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 16, Finished, Available, Finished)

[('product_id', 'string'),
 ('product_category_name_raw', 'string'),
 ('product_category_name', 'string'),
 ('product_name_lenght', 'int'),
 ('product_description_lenght', 'int'),
 ('product_photos_qty', 'int'),
 ('product_weight_g', 'int'),
 ('product_length_cm', 'int'),
 ('product_height_cm', 'int'),
 ('product_width_cm', 'int'),
 ('flag_missing_product_info', 'boolean'),
 ('flag_note_product_info', 'string')]

#### Write to Lakehouse as Silver managed table 

In [15]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, b2b0599c-a974-4f6c-a615-65dcc0cfaa75, 17, Finished, Available, Finished)

Saved Olist_Silver_Products


## <font color="green">Sellers Dataset</font>

In [55]:
bronze_path = "Files/Olist_BronzeLayer_Sellers/Olist_Sellers.csv"
silver_table_name = "Olist_Silver_Sellers"

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 57, Finished, Available, Finished)

### Read the CSV

In [56]:
df = spark.read.format("csv").option("header","true").load(bronze_path)
display(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 58, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 14393158-d491-44be-abfa-d75d8bae5cb5)

### Data Cleaning

In [57]:
# Checking for Nulls and Duplicates
null_dups_table(df)

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 59, Finished, Available, Finished)

Checking for Nulls and Duplicates
Total rows: 3095

Missing values per column:
  seller_id: 0
  seller_zip_code_prefix: 0
  seller_city: 0
  seller_state: 0

No duplicates found


In [58]:
# Check the data types
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 60, Finished, Available, Finished)

[('seller_id', 'string'),
 ('seller_zip_code_prefix', 'string'),
 ('seller_city', 'string'),
 ('seller_state', 'string')]

In [59]:
# Correct the data types
df = (
    df
    .withColumn("seller_zip_code_prefix", col("seller_zip_code_prefix").cast("int"))
)
df.dtypes

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 61, Finished, Available, Finished)

[('seller_id', 'string'),
 ('seller_zip_code_prefix', 'int'),
 ('seller_city', 'string'),
 ('seller_state', 'string')]

### Write to Lakehouse as Silver managed table

In [60]:
# Write the table
df.write.format("delta").mode("overwrite").saveAsTable(silver_table_name)
print(f"Saved {silver_table_name}")

StatementMeta(, 3ab89de6-c4a0-40e7-89fd-44f51e910c6c, 62, Finished, Available, Finished)

Saved Olist_Silver_Sellers
