### Criar Pipeline de Dados utilizando dados do Olist (ecommerce)

## ELT

- Externo: Emulador de geracao de pedido

- Bronze: Trazer os dados de fora para nosso lake

- Silver: Salvar os dados limpos e processados

- Gold: Salvar os dados finais transformados (para logistica e para vendas)

In [1]:
# disponibilziar arquivos parquet utilizados na camada silver (customer e orders detail) dentro da pasta input-data

In [1]:
# Link do Kaggle: https://www.kaggle.com/datasets/olistbr/brazilian-ecommerce?select=olist_order_reviews_dataset.csv

In [1]:
import boto3
import pandas as pd
import pyarrow.parquet as pq

# Assuming you have configured your AWS credentials or roles properly for boto3

def get_latest_parquet_file(bucket_name, folder_prefix):
    s3_client = boto3.client('s3')
    response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=folder_prefix)

    if 'Contents' in response:
        latest_file = max(response['Contents'], key=lambda x: x['LastModified'])['Key']
        return latest_file
    else:
        raise ValueError(f"No Parquet files found in {bucket_name}/{folder_prefix}")

def load_parquet_as_dataframe(bucket_name, file_key):
    s3_path = f's3://{bucket_name}/{file_key}'
    table = pq.read_table(s3_path)
    return table.to_pandas()

In [2]:
# Set your S3 bucket name and folder prefix
bucket_name = 'dnc-lucas-tutorial-class'
folder_prefix = 'ecommerce/gold/finance'

latest_parquet_file = get_latest_parquet_file(bucket_name, folder_prefix)
df = load_parquet_as_dataframe(bucket_name, latest_parquet_file)

# Now you have the latest Parquet file loaded as a pandas DataFrame in 'df'
df.sort_values("payment_value", ascending=False)

Unnamed: 0,customer_id,payment_value
4335,a9dc96b027d1252bbac0a9b72d837fc6,7117.75
804,1fc56719b52f82c03caddc5faf531fbb,4912.05
3380,85f0e92957e9fb9c5f72ba5378f492a0,4899.55
5573,d96619b47e477e6bcc377bb89f2ddaee,4859.80
699,1afc82cd60e303ef09b4ef9837c9505c,4513.32
...,...,...
2289,5a484fb97bcfd090eb27cf27f23c3f0f,14.29
433,10a79ef2783cae3d8d678e85fde235ac,14.29
2830,6fa8e1a64acacedee7747c7e67715bca,13.89
1434,38bb17584178abe8b36778530f75d810,13.38


In [3]:
# Set your S3 bucket name and folder prefix
bucket_name = 'dnc-lucas-tutorial-class'
folder_prefix = 'ecommerce/gold/logistic'

latest_parquet_file = get_latest_parquet_file(bucket_name, folder_prefix)
df = load_parquet_as_dataframe(bucket_name, latest_parquet_file)

# Now you have the latest Parquet file loaded as a pandas DataFrame in 'df'
df.sort_values("order_id", ascending=False)

Unnamed: 0,state_city,order_id
1163,SP-sao paulo,1424
675,RJ-rio de janeiro,581
226,MG-belo horizonte,243
114,DF-brasilia,173
555,PR-curitiba,143
...,...,...
478,PB-catole do rocha,1
479,PB-esperanca,1
480,PB-guarabira,1
482,PB-mae d'agua,1


In [None]:
# Exemplo de CRONs: Todos os dias as 23horas

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta

# Define o intervalo de tempo entre as execuções
schedule_interval = "0 23 * * *"

# Define a DAG e sua descrição
dag = DAG(
    'dag_nome',
    description='DAG para execução diária às 23 horas',
    start_date=datetime(2022, 1, 1),  # Data de início da DAG
    schedule_interval=schedule_interval,
    catchup=False  # Não retroceder para datas ausentes
)

# Definir tarefas da DAG
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

# Define a sequência de execução das tarefas
start >> end

In [None]:
# Toda segunda-feira as 9horas

# Define o intervalo de tempo entre as execuções
schedule_interval = "0 9 * * 1"

# Define a DAG e sua descrição
dag = DAG(
    'dag_nome',
    description='DAG para execução toda segunda-feira às 9 horas',
    start_date=datetime(2022, 1, 1),  # Data de início da DAG
    schedule_interval=schedule_interval,
    catchup=False  # Não retroceder para datas ausentes
)

# Definir tarefas da DAG
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

# Define a sequência de execução das tarefas
start >> end