In [1]:
import os, subprocess
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructType, StructField, StringType
)
from pyspark.sql.functions import col, when, to_date, to_timestamp, coalesce
from pyspark.sql import functions as F
import great_expectations as gx

import numpy as np
import json

JAVA_HOME = subprocess.check_output(
    ["brew", "--prefix", "openjdk@17"], text=True
).strip()

os.environ["JAVA_HOME"] = JAVA_HOME
os.environ["PATH"] = JAVA_HOME + "/bin:" + os.environ["PATH"]


warehouse_path = "file:///Users/user/Documents/Mackenzie Data Enginner/projeto_data_lake/warehouse"

spark = (
    SparkSession.builder
        .appName("EcommercePipeline")
        # carrega o runtime do Iceberg compatível com Spark 4.0 / Scala 2.13
        .config(
            "spark.jars.packages",
            "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0"
        )
        # ativa as extensões do Iceberg
        .config(
            "spark.sql.extensions",
            "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
        )
        # define o catálogo Iceberg "my_catalog"
        .config("spark.sql.catalog.my_catalog", "org.apache.iceberg.spark.SparkCatalog")
        .config("spark.sql.catalog.my_catalog.type", "hadoop")
        .config("spark.sql.catalog.my_catalog.warehouse", warehouse_path)
        # se quiser que o catálogo padrão seja o my_catalog:
        # .config("spark.sql.default.catalog", "my_catalog")
        .getOrCreate()
)



schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_name", StringType(), True),
    StructField("customer_email", StringType(), True),
    StructField("product_name", StringType(), True),
    StructField("category", StringType(), True),
    StructField("quantity", StringType(), True),
    StructField("price", StringType(), True),
    StructField("discount", StringType(), True),
    StructField("total_price", StringType(), True),
    StructField("payment_method", StringType(), True),
    StructField("order_date", StringType(), True),
    StructField("delivery_date", StringType(), True),
    StructField("shipping_address", StringType(), True),
    StructField("status", StringType(), True),
    StructField("is_gift", StringType(), True),
    StructField("gift_message", StringType(), True)
])

#Ler a camada RAW
df_raw = (
    spark.read
        .schema(schema)
        .option("multiLine", "true")
        .option("mode", "PERMISSIVE")
        .json("ecommerce_data_20251115_124452.json")
)

df_raw.show(5, truncate=False)

# Transformação 1 — Limpeza de valores inválidos"

df_clean = (
    df_raw
    .replace("invalid_value", None)
    .replace("", None)
)

df_casted = (
    df_clean
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("price", col("price").cast("double"))
    .withColumn("discount", col("discount").cast("double"))
    .withColumn("total_price", col("total_price").cast("double"))
    .withColumn("is_gift", col("is_gift").cast("boolean"))
    .withColumn("order_date", to_timestamp("order_date", "yyyy-MM-dd HH:mm:ss"))
    .withColumn("delivery_date", to_timestamp("delivery_date", "yyyy-MM-dd HH:mm:ss"))
)

df_casted.show(5, truncate=False)

print("Contagem antes da remoção de nulos essenciais:", df_casted.count())
#Transformação 3 — Remover registros inválidos essenciais
df_valid = df_casted.dropna(subset=["order_id", "customer_name", "price"])
print("Contagem após a remoção de nulos essenciais:", df_valid.count())
#Transformação 4 — Desduplicação por order_id
df_dedup = df_valid.dropDuplicates(["order_id"])
print("Contagem após desduplicação:", df_dedup.count())

df_dedup = (
    df_dedup
    .withColumn("order_year", F.year("order_date"))
    .withColumn("order_month", F.month("order_date"))
)

df_dedup.printSchema()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/27 21:49:33 WARN Utils: Your hostname, USERs-MacBook-Pro.local, resolves to a loopback address: 127.0.0.1; using 192.168.106.80 instead (on interface en0)
25/11/27 21:49:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/homebrew/Cellar/apache-spark/4.0.1_1/libexec/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /Users/user/.ivy2.5.2/cache
The jars for the packages stored in: /Users/user/.ivy2.5.2/jars
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6c70153f-42f8-4447-bfd2-f9865d62c8dd;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.0 in central
:: resolution report :: resolve 73ms :: artifacts dl 2ms
	:: modules in use:
	org.apache.iceberg#iceberg-spark-runtime-4.0_2.13

+------------------------------------+-------------------+-------------------------+------------+-----------+-------------+------+--------+-----------+--------------+-------------------+-------------------+------------------------------------------------------+---------+-------+------------------------------------------+
|order_id                            |customer_name      |customer_email           |product_name|category   |quantity     |price |discount|total_price|payment_method|order_date         |delivery_date      |shipping_address                                      |status   |is_gift|gift_message                              |
+------------------------------------+-------------------+-------------------------+------------+-----------+-------------+------+--------+-----------+--------------+-------------------+-------------------+------------------------------------------------------+---------+-------+------------------------------------------+
|bf818b39-37ee-4db8-8e7e-3d0150

                                                                                

+------------------------------------+-------------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+------------------------------------------------------+---------+-------+------------------------------------------+
|order_id                            |customer_name      |customer_email           |product_name|category   |quantity|price |discount|total_price|payment_method|order_date         |delivery_date      |shipping_address                                      |status   |is_gift|gift_message                              |
+------------------------------------+-------------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+------------------------------------------------------+---------+-------+------------------------------------------+
|bf818b39-37ee-4db8-8e7e-3d0150da7489|Eileen D

                                                                                

Contagem após a remoção de nulos essenciais: 19240


[Stage 8:>                                                          (0 + 1) / 1]

Contagem após desduplicação: 9620
root
 |-- order_id: string (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- category: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)
 |-- discount: double (nullable = true)
 |-- total_price: double (nullable = true)
 |-- payment_method: string (nullable = true)
 |-- order_date: timestamp (nullable = true)
 |-- delivery_date: timestamp (nullable = true)
 |-- shipping_address: string (nullable = true)
 |-- status: string (nullable = true)
 |-- is_gift: boolean (nullable = true)
 |-- gift_message: string (nullable = true)
 |-- order_year: integer (nullable = true)
 |-- order_month: integer (nullable = true)



                                                                                

In [2]:
df_dedup.show(5, truncate=False)

25/11/27 21:49:56 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 14:>                                                         (0 + 1) / 1]

+------------------------------------+---------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+--------------------------------------------------+---------+-------+---------------------------------------------------------+----------+-----------+
|order_id                            |customer_name  |customer_email           |product_name|category   |quantity|price |discount|total_price|payment_method|order_date         |delivery_date      |shipping_address                                  |status   |is_gift|gift_message                                             |order_year|order_month|
+------------------------------------+---------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+--------------------------------------------------+---------+-------+--------------------------------------

                                                                                

In [3]:
#Escrever na camada Curated

caminho_parquet = "/Users/user/Documents/Mackenzie Data Enginner/projeto_data_lake/curated"
df_dedup_wr = df_dedup.coalesce(1)


(
    df_dedup_wr.write
    .mode("overwrite")
    .partitionBy("order_year", "order_month") 
    .option("compression", "snappy")
    .parquet(caminho_parquet)
)

print(f"\n--- 1. Escrita em Parquet concluída ---")
print(f"Dados salvos e particionados em: {caminho_parquet}")

[Stage 19:>                                                         (0 + 1) / 1]


--- 1. Escrita em Parquet concluída ---
Dados salvos e particionados em: /Users/user/Documents/Mackenzie Data Enginner/projeto_data_lake/curated


                                                                                

In [4]:
spark.sql("CREATE NAMESPACE IF NOT EXISTS my_catalog.dq_db")

spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.dq_db.orders (
  order_id         STRING,
  customer_name    STRING,
  customer_email   STRING,
  product_name     STRING,
  category         STRING,
  quantity         INT,
  price            DOUBLE,
  discount         DOUBLE,
  total_price      DOUBLE,
  payment_method   STRING,
  order_date       TIMESTAMP,
  delivery_date    TIMESTAMP,
  shipping_address STRING,
  status           STRING,
  is_gift          BOOLEAN,
  gift_message     STRING,
  order_year       INT,
  order_month      INT
)
USING iceberg
PARTITIONED BY (order_year, order_month)
""")


DataFrame[]

In [5]:
df_dedup.createOrReplaceTempView("stg_orders")

In [6]:
spark.sql("""select * from stg_orders limit 5""").show(truncate=False)

[Stage 20:>                                                         (0 + 1) / 1]

+------------------------------------+---------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+--------------------------------------------------+---------+-------+---------------------------------------------------------+----------+-----------+
|order_id                            |customer_name  |customer_email           |product_name|category   |quantity|price |discount|total_price|payment_method|order_date         |delivery_date      |shipping_address                                  |status   |is_gift|gift_message                                             |order_year|order_month|
+------------------------------------+---------------+-------------------------+------------+-----------+--------+------+--------+-----------+--------------+-------------------+-------------------+--------------------------------------------------+---------+-------+--------------------------------------

                                                                                

In [13]:
spark.sql("""
SELECT *
FROM my_catalog.dq_db.orders.snapshots
""").show(truncate=False)

+------------+-----------+---------+---------+-------------+-------+
|committed_at|snapshot_id|parent_id|operation|manifest_list|summary|
+------------+-----------+---------+---------+-------------+-------+
+------------+-----------+---------+---------+-------------+-------+



25/11/27 22:14:39 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 225843 ms exceeds timeout 120000 ms
25/11/27 22:14:39 WARN SparkContext: Killing executors is not supported by current scheduler.
25/11/27 22:14:42 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:53)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:342)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:132)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [None]:
df_dedup.createOrReplaceTempView("stg_orders")

spark.sql("""
MERGE INTO my_catalog.dq_db.orders AS tgt
USING stg_orders AS src
ON tgt.order_id = src.order_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

In [None]:

df_dq = (
    df_dedup
    .fillna({"discount": 0.0, "quantity": 0})
    .withColumn("quantity", F.col("quantity").cast("int"))
    .withColumn("price", F.col("price").cast("double"))
    .withColumn("total_price", F.col("total_price").cast("double"))
    .withColumn("_total_expected", F.col("price") * F.col("quantity") - F.col("discount"))
    .withColumn("_total_match", F.abs(F.col("total_price") - F.col("_total_expected")) <= 1e-2)
    .withColumn(
        "_delivery_after_order",
        (F.col("order_date").isNull()) |
        (F.col("delivery_date").isNull()) |
        (F.col("delivery_date") >= F.col("order_date"))
    )
    .withColumn(
        "_gift_message_ok",
        (
            ~F.coalesce(F.col("is_gift"), F.lit(False)) &
            (F.col("gift_message").isNull() | (F.trim(F.col("gift_message")) == ""))
        )
        |
        (
            F.coalesce(F.col("is_gift"), F.lit(False)) &
            F.col("gift_message").isNotNull() &
            (F.trim(F.col("gift_message")) != "")
        )
    )
)

# 1) contexto
context = gx.get_context()

# 2) datasource Spark
data_source = context.data_sources.add_spark(name="orders_spark")

# 3) asset baseado em DataFrame
data_asset = data_source.add_dataframe_asset(name="orders_asset")

# 4) batch definition que pega o DF inteiro
batch_def = data_asset.add_batch_definition_whole_dataframe(
    name="orders_batch_def"
)

# 5) conecta seu Spark DataFrame df_dq via batch_parameters
batch = batch_def.get_batch(batch_parameters={"dataframe": df_dq})

# 6) cria o Validator a partir do batch
v = context.get_validator(batch=batch)

# 6) Expectations (as que você já tinha)
v.expect_column_values_to_not_be_null("order_id")
v.expect_column_values_to_be_unique("order_id")

v.expect_column_values_to_not_be_null("customer_name", mostly=0.99)
v.expect_column_values_to_match_regex(
    "customer_email",
    r"^[\w\.-]+@[\w\.-]+\.\w+$",
    mostly=0.98,
)

v.expect_column_values_to_not_be_null("product_name", mostly=0.99)
v.expect_column_values_to_not_be_null("category", mostly=0.95)

v.expect_column_values_to_not_be_null("quantity")
v.expect_column_values_to_be_between("quantity", min_value=1, max_value=10000, mostly=0.999)
v.expect_column_values_to_be_of_type("quantity", "IntegerType")

v.expect_column_values_to_not_be_null("price")
v.expect_column_values_to_be_between("price", min_value=0.0, max_value=None, mostly=0.999)
v.expect_column_values_to_be_between("discount", min_value=0.0, max_value=None, mostly=0.999)
v.expect_column_values_to_be_between("total_price", min_value=0.0, max_value=None, mostly=0.999)

v.expect_column_values_to_be_in_set(
    "payment_method",
    ["Credit Card", "Debit Card", "Boleto", "paypal", "pix", "cash"],
    mostly=0.95,
)
v.expect_column_values_to_be_in_set(
    "status",
    ["pending", "processing", "shipped", "delivered", "cancelled", "returned"],
    mostly=0.98,
)

v.expect_column_values_to_not_be_null("order_date", mostly=0.99)
v.expect_column_values_to_not_be_null("delivery_date", mostly=0.9)
v.expect_column_values_to_be_of_type("order_date", "TimestampType")
v.expect_column_values_to_be_of_type("delivery_date", "TimestampType")

v.expect_column_values_to_not_be_null("shipping_address", mostly=0.98)
v.expect_column_values_to_be_of_type("is_gift", "BooleanType")

v.expect_column_values_to_be_in_set("_total_match", [True], mostly=0.995)
v.expect_column_values_to_be_in_set("_delivery_after_order", [True], mostly=0.995)
v.expect_column_values_to_be_in_set("_gift_message_ok", [True], mostly=0.999)

# 7) Rodar validação
result = v.validate(result_format="SUMMARY")
print(result)


25/11/27 17:16:16 WARN CacheManager: Asked to cache already cached data.
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00,  9.09it/s] 
Calculating Metrics: 100%|██████████| 10/10 [00:01<00:00,  9.14it/s]
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 12.29it/s]  
Calculating Metrics: 100%|██████████| 11/11 [00:00<00:00, 13.98it/s] 
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00,  9.76it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 12.95it/s]  
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00, 10.38it/s]  
Calculating Metrics: 100%|██████████| 11/11 [00:00<00:00, 16.32it/s] 
Calculating Metrics: 100%|██████████| 1/1 [00:00<00:00, 840.21it/s] 
Calculating Metrics: 100%|██████████| 8/8 [00:00<00:00,  9.83it/s]  
Calculating Metrics: 100%|██████████| 11/11 [00:01<00:00, 10.10it/s] 
Calculating Metrics: 100%|██████████| 11/11 [00:00<00:00, 12.86it/s] 
Calculating Metrics: 100%|██████████| 11/11 [00:00<00:00, 17.17it/s] 
Calculating Metrics: 100%|

{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "orders_spark-orders_asset",
          "column": "order_id"
        },
        "meta": {},
        "severity": "critical"
      },
      "result": {
        "element_count": 9623,
        "unexpected_count": 0,
        "unexpected_percent": 0.0,
        "partial_unexpected_list": [],
        "partial_unexpected_counts": []
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_traceback": null,
        "exception_message": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "type": "expect_column_values_to_be_unique",
        "kwargs": {
          "batch_id": "orders_spark-orders_asset",
          "column": "order_id"
        },
        "meta": {},
        "severity": "critical"
      },
      "result": {
     




In [None]:
import pandas as pd

rows = []

for r in result["results"]:
    exp = r["expectation_config"]
    kwargs = exp.get("kwargs", {})
    res = r.get("result", {})

    col = kwargs.get("column")  # nem toda expectation tem coluna
    unexpected_percent = res.get("unexpected_percent", 0.0)  # pode não existir para algumas
    element_count = res.get("element_count", 0)
    unexpected_count = res.get("unexpected_count", 0)

    rows.append({
        "column": col,
        "expectation": exp.get("expectation_type"),
        "success": r.get("success", False),
        "mostly": kwargs.get("mostly"),
        "element_count": element_count,
        "unexpected_count": unexpected_count,
        "unexpected_percent": unexpected_percent,
        "success_percent": 100.0 - unexpected_percent if unexpected_percent is not None else None,
    })

df_dq_summary = pd.DataFrame(rows)

# opcional: ordenar, por exemplo, mostrando primeiro as expectations que falharam
df_dq_summary = df_dq_summary.sort_values(
    by=["success", "unexpected_percent"],
    ascending=[True, False]
)

print(df_dq_summary.to_string(index=False))

               column expectation  success  mostly  element_count  unexpected_count  unexpected_percent  success_percent
               status        None    False   0.980           9623              9504          100.000000         0.000000
     _gift_message_ok        None    False   0.999           9623              4812           50.005196        49.994804
       payment_method        None    False   0.950           9623              4746           49.931615        50.068385
_delivery_after_order        None    False   0.995           9623              4546           47.240985        52.759015
         _total_match        None    False   0.995           9623              1147           12.088954        87.911046
             quantity        None    False   0.999           9623              1036           10.765873        89.234127
        customer_name        None    False   0.990           9623               123            1.278188        98.721812
           order_date        Non