In [1]:
import uuid 
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("BronzeVendas") \
    .config("spark.extraListeners", "io.openlineage.spark.agent.OpenLineageSparkListener") \
    .config("spark.openlineage.columnLineage.datasetLineageEnabled", "true") \
    .config("spark.openlineage.transport.type", "http") \
    .config("spark.openlineage.transport.url", "http://api:5000") \
    .config("spark.openlineage.namespace", "spark_integration") \
    .config("spark.openlineage.parentJobName", "customer-job") \
    .config("spark.openlineage.parentRunId", uuid.uuid4().__str__()) \
    .getOrCreate()

25/06/19 18:11:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/06/19 18:11:43 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, TimestampType
from pyspark.sql.functions import when
from datetime import datetime, timedelta
import random

# Dados fictícios
def generate_fake_data(n=20):
    names = ["João", "Maria", "Carlos", "Ana", "Lucas"]
    foods = ["Pizza", "Sushi", "Hamburguer", "Salada", "Lasanha"]
    data = []

    for i in range(1, n + 1):
        name = random.choice(names)
        food = random.choice(foods)
        price = round(random.uniform(15.0, 80.0), 2)
        time = datetime.now() - timedelta(minutes=random.randint(0, 1440))
        data.append((i, name, food, price, time))
    return data

# Schema
schema = StructType([
    StructField("order_id", IntegerType(), False),
    StructField("customer_name", StringType(), True),
    StructField("food_item", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("order_time", TimestampType(), True),
])

# Criar DataFrame
data = generate_fake_data(20)
df = spark.createDataFrame(data, schema)

# Mapear nomes para id_customer usando when
df = df.withColumn("id_customer",
    when(df.customer_name == "João", 1)
    .when(df.customer_name == "Maria", 2)
    .when(df.customer_name == "Carlos", 3)
    .when(df.customer_name == "Ana", 4)
    .when(df.customer_name == "Lucas", 5)
)

# Remover customer_name
df_final = df.drop("customer_name")

# Mostrar resultado
df_final.show()


                                                                                

+--------+----------+-----+--------------------+-----------+
|order_id| food_item|price|          order_time|id_customer|
+--------+----------+-----+--------------------+-----------+
|       1|     Sushi|32.05|2025-06-19 15:13:...|          4|
|       2|   Lasanha|34.07|2025-06-19 04:55:...|          1|
|       3|    Salada|28.15|2025-06-18 19:33:...|          2|
|       4|Hamburguer| 79.4|2025-06-18 20:45:...|          2|
|       5|     Pizza|43.74|2025-06-19 15:40:...|          1|
|       6|     Pizza|76.93|2025-06-18 23:20:...|          4|
|       7|     Sushi|63.18|2025-06-19 07:16:...|          5|
|       8|     Pizza|17.07|2025-06-19 02:37:...|          1|
|       9|    Salada| 25.8|2025-06-19 02:09:...|          2|
|      10|   Lasanha|65.81|2025-06-19 13:43:...|          3|
|      11|    Salada|33.66|2025-06-19 04:37:...|          5|
|      12|    Salada|53.45|2025-06-18 21:25:...|          3|
|      13|     Sushi| 36.1|2025-06-18 22:17:...|          4|
|      14|     Pizza|46.

In [3]:
# Caminho para o bucket MinIO (formato S3)
delta_path = "s3a://bronze/vendas"

# Escrever como tabela Delta
df_final.write.format("delta").mode("overwrite").save(delta_path)

25/06/19 18:11:51 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [4]:
spark.stop()