In [10]:
import os
import requests
import pyspark.sql.functions as F

from pyspark.sql import SparkSession
from tqdm.auto import tqdm
from pyspark.sql.types import StructType, StructField, StringType, IntegerType 

In [11]:
from minio import Minio
from minio.error import S3Error

# CONSTANTES

In [12]:
minio_endpoint = 'minio:9000'
minio_user = os.environ['MINIO_ROOT_USER']
minio_password = os.environ['MINIO_ROOT_PASSWORD']
buckets = ['bronze', 'silver', 'gold']

# CRIAR BUCKETS

In [13]:
minio_client = Minio(
    minio_endpoint,
    access_key=minio_user,
    secret_key=minio_password,
    secure=False,
)

for bucket in buckets:
    if not minio_client.bucket_exists(bucket):
        minio_client.make_bucket(bucket)
        print(f'Make bucket {bucket}!')
    else:
        print(f'Bucket {bucket} already exists!')

Bucket bronze already exists!
Bucket silver already exists!
Bucket gold already exists!


# INICIAR SESSÃO SPARK

In [None]:
spark = (
    SparkSession
        .builder
        .master("spark://spark:7077")
        .appName('MinIO')
        .config('spark.hadoop.fs.s3a.endpoint', minio_endpoint)
        .config('spark.hadoop.fs.s3a.access.key', minio_user)
        .config('spark.hadoop.fs.s3a.secret.key', minio_password)
        .config('spark.hadoop.fs.s3a.path.style.access', 'true')
        .config('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false')
        .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
        .config('spark.jars.packages', 'com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.hadoop:hadoop-aws:3.3.4')
        .getOrCreate()
)

In [None]:
import json

for index, row in grouped.iterrows():
    if index >= 3:  # Limite a 3 iterações
        break
        
    order_id = row['order_id']
    item = row['items']
    
    json_data = {
        'order_id': order_id,
        'payment_type': df_pay.loc[df_pay['order_id'] == order_id, 'payment_type'].values[0],
        'payment_value': float(df_pay.loc[df_pay['order_id'] == order_id, 'payment_value'].values[0]),
        'seller_id': df_pay.loc[df_pay['order_id'] == order_id, 'seller_id'].values[0],
        'items': item
    }
    
    with open(f'C:\\Users\\RodrigoPintoMesquita\\Documents\\GitHub\\Faculdade\\Notebooks Trabalhos 2025H1\\PB\\data\\land\\{order_id}.json', 'w') as json_file:
        json.dump(json_data, json_file, indent=4)

In [None]:
%pip install prefect

In [30]:
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from prefect import task, flow

In [86]:
PATH_LAND = r'notebooks/land'
PATH_BRONZE = r'C:/caminho/para/sua/camada/bronze'


# GRAVANDO NA CAMADA BRONZE

In [107]:
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

files = os.listdir(folder)

import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

file = 'categories.csv'

categories_schema = StructType([
    StructField('product_id', StringType(), True),
    StructField('product_category_name', StringType(), True)
])

df = spark.read.csv(file, schema=categories_schema, header=True)
df.coalesce(1).write.mode('overwrite').parquet(f's3a://bronze/categories.parquet')

                                                                                

In [105]:
import os
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, ArrayType

folder = 'land/'
files = os.listdir(folder)

item_schema = StructType([
    StructField("order_item_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", StringType(), True),
    StructField("price", FloatType(), True),
    StructField("freight_value", FloatType(), True)
])

orders_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("payment_type", StringType(), True),
    StructField("payment_value", FloatType(), True),
    StructField("items", ArrayType(item_schema), True) 
])


for file in files:
    name = file.replace(".json", "")
    df = spark.read.json(f'{folder}{file}', schema=orders_schema)
    df.coalesce(1).write.mode('overwrite').parquet(f's3a://bronze/vendas/{name}.parquet')

25/05/31 16:09:31 WARN DataSource: All paths were ignored:
  file:/work/notebooks/land/.ipynb_checkpoints
                                                                                

In [111]:
df = spark.read.parquet('s3a://bronze/vendas/00143d0f86d6fbd9f9b38ab440ac16f5.parquet')
df

+--------+------------+-------------+-----+
|order_id|payment_type|payment_value|items|
+--------+------------+-------------+-----+
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         NULL| NULL|
|    NULL|        NULL|         

                                                                                

# GRAVANDO NA CAMADA PRATA

In [108]:
folder = 'land/'
files = os.listdir(folder)

df_categories = spark.read.parquet('s3a://bronze/categories.parquet', schema= orders_schema)

for index, file in enumerate(files):
    if index >= 3:  # Limite a 3 iterações
        break

    df = spark.read.parquet(f'{folder}{file}', schema= categories_schema)

    #extrair o dia mês e ano da coluna order_purchase_timestamp
    df = df.withColumn('year', F.year(F.to_timestamp(df['shipping_limit_date'], 'yyyy-MM-dd HH:mm:ss')))
    df = df.withColumn('month', F.month(F.to_timestamp(df['shipping_limit_date'], 'yyyy-MM-dd HH:mm:ss')))
    df = df.withColumn('day', F.dayofmonth(F.to_timestamp(df['shipping_limit_date'], 'yyyy-MM-dd HH:mm:ss')))
    df = df.withColumn('hour', F.hour(F.to_timestamp(df['shipping_limit_date'], 'yyyy-MM-dd HH:mm:ss')))

    #calulando o valor total por item do pedido
    df['total_item_value'] = df['price'] + df['freight_value']

    #join com a tabela de produtos para trazer o nome do produto
    df = df.join(df_categories, on='product_id', how='left')
    
    #mantendo somente as colunas necessárias
    silver_df = merged_df[['order_id', 'payment_sequential', 'payment_type', 'payment_installments', 'payment_value', 
                            'order_item_id', 'product_id', 'seller_id', 'shipping_limit_date', 
                            'price', 'freight_value', 'total_item_value']]
    
df

25/05/31 16:10:18 WARN DataSource: All paths were ignored:
  file:/work/notebooks/land/.ipynb_checkpoints


AnalysisException: [UNABLE_TO_INFER_SCHEMA] Unable to infer schema for Parquet. It must be specified manually.

In [None]:
def etl_to_gold(silver_df):
    # Agregação de dados: calcular total por order_id
    gold_df = silver_df.groupby('order_id').agg(
        total_value=('total_item_value', 'sum'),
        total_items=('order_item_id', 'count'),
        payment_type=('payment_type', 'first'),  # Presumindo que todos os pagamentos são do mesmo tipo
        order_date=('shipping_limit_date', 'first')  # Presumindo que a data de envio é a data do pedido
    ).reset_index()

    return gold_df

# Executar ETL para a camada ouro
gold_data = etl_to_gold(silver_data)