In [0]:
spark.sql("USE CATALOG workspace_webcreative1")
spark.sql("USE silver")

In [0]:
# --- Catalog / schemas ---
CATALOG = "workspace_webcreative1"
BRONZE_SCHEMA = "bronze"
SILVER_SCHEMA = "silver"
GOLD_SCHEMA   = "gold"

# --- Storage paths (RAW) ---
ACCOUNT = "stdbxfinalwebcreative01"
FQDN = f"{ACCOUNT}.dfs.core.windows.net"
RAW_CONTAINER = "raw"

AUTOMOBILE_CSV = f"abfss://{RAW_CONTAINER}@{FQDN}/automobile/Automobile.csv"
ECOMMERCE_CSV  = f"abfss://{RAW_CONTAINER}@{FQDN}/ecommerce/ecommerce.csv"

# --- Tables names ---
BRONZE_AUTO = f"{CATALOG}.{BRONZE_SCHEMA}.automobile_raw"
BRONZE_ECOM = f"{CATALOG}.{BRONZE_SCHEMA}.ecommerce_raw"

SILVER_AUTO = f"{CATALOG}.{SILVER_SCHEMA}.automobile_clean"
SILVER_ECOM = f"{CATALOG}.{SILVER_SCHEMA}.ecommerce_clean"

GOLD_AUTO_KPI = f"{CATALOG}.{GOLD_SCHEMA}.automobile_kpis"
GOLD_ECOM_KPI = f"{CATALOG}.{GOLD_SCHEMA}.ecommerce_kpis"

In [0]:
from pyspark.sql.functions import col, trim, lower, regexp_replace, when, count, isnan
spark.sql(f"USE CATALOG {CATALOG}")

In [0]:
df_auto_b = spark.table(BRONZE_AUTO)
df_auto_s = trim_string_cols(df_auto_b)
df_auto_s = drop_all_null_rows(df_auto_s)
df_auto_s = drop_duplicates(df_auto_s)

(df_auto_s.write.format("delta")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .saveAsTable(SILVER_AUTO)
)

display(spark.table(SILVER_AUTO).limit(10))

In [0]:
from pyspark.sql.functions import col, isnan, lit
def dq_report(table_name: str):

    df = spark.table(table_name)
    total = df.count()
    nulls = []
    for c in df.columns:
        null_count = df.filter(col(c).isNull() | (isnan(col(c)) if dict(df.dtypes).get(c) in ["double","float"] else lit(False))).count()
        nulls.append((c, null_count))
    return total, sorted(nulls, key=lambda x: x[1], reverse=True)[:10]

auto_total, auto_nulls_top = dq_report(SILVER_AUTO)
ecom_total, ecom_nulls_top = dq_report(SILVER_ECOM)

print("AUTO rows:", auto_total, "Top nulls:", auto_nulls_top)
print("ECOM rows:", ecom_total, "Top nulls:", ecom_nulls_top)

In [0]:
def normalize_colnames(df):
    for c in df.columns:
        new_c = c.strip().lower()
        new_c = regexp_replace(lit(new_c), r"[^a-z0-9_]+", "_")  # no aplica directo
    return df

def trim_string_cols(df):
    for f in df.schema.fields:
        if f.dataType.simpleString() == "string":
            df = df.withColumn(f.name, trim(col(f.name)))
    return df

def drop_all_null_rows(df):
    return df.na.drop("all")

def drop_duplicates(df):
    return df.dropDuplicates()

In [0]:
from pyspark.sql.functions import col, when, trim

df_auto = spark.table("workspace_webcreative1.bronze.automobile_raw")

# Reemplaza '?' por null en columnas string
for c, t in df_auto.dtypes:
    if t == "string":
        df_auto = df_auto.withColumn(c, when(trim(col(c)) == "?", None).otherwise(col(c)))

# Renombra columnas con guiones a underscore
for c in df_auto.columns:
    df_auto = df_auto.withColumnRenamed(c, c.replace("-", "_"))

df_auto_silver = df_auto.dropDuplicates()

(df_auto_silver.write.mode("overwrite")
 .format("delta")
 .saveAsTable("workspace_webcreative1.silver.automobile"))

In [0]:
df_ecom = spark.table("workspace_webcreative1.bronze.ecommerce_raw")

for c, t in df_ecom.dtypes:
    if t == "string":
        df_ecom = df_ecom.withColumn(c, when(trim(col(c)) == "?", None).otherwise(col(c)))

for c in df_ecom.columns:
    df_ecom = df_ecom.withColumnRenamed(c, c.replace("-", "_").replace(" ", "_"))

df_ecom_silver = df_ecom.dropDuplicates()

(df_ecom_silver.write.mode("overwrite")
 .format("delta")
 .saveAsTable("workspace_webcreative1.silver.ecommerce"))