In [None]:
import warnings
warnings.filterwarnings("ignore")

import requests
import pandas as pd
import numpy as np
import schedule
import time
from datetime import datetime
import os
from pathlib import Path

class ETLPipeline:
    def __init__(self, output_dir="data_output"):
        self.output_dir = output_dir
        Path(output_dir).mkdir(parents=True, exist_ok=True)

    def obter_produto(self):
        url = "https://fakestoreapi.com/products/"
        response = requests.get(url)

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Erro ao obter produtos: {response.status_code}")
            return None

    def obter_carrinhos(self):
        url = "https://fakestoreapi.com/carts/"
        response = requests.get(url)

        if response.status_code == 200:
            return response.json()
        else:
            print(f"Erro ao obter carrinhos: {response.status_code}")
            return None

    def transform_produtos(self, registros=20):
        df_fim = pd.DataFrame()

        for _ in range(registros):
            resultado = self.obter_produto()
            if resultado:
                df = pd.DataFrame(resultado)
                df[['rate', 'count']] = df['rating'].apply(pd.Series)
                df.drop(columns='rating', inplace=True)
                df_fim = pd.concat([df_fim, df], ignore_index=True)

        if df_fim.empty:
            raise Exception("Não foi possível obter dados de produtos")

        # Transformações
        df_fim['rate'] = df_fim['rate'].sample(frac=1).reset_index(drop=True)
        df_fim['count'] = df_fim['rate'].sample(frac=1).reset_index(drop=True)
        df_fim['vendidos'] = [np.random.uniform(0, 5000) for _ in range(df_fim.shape[0])]
        df_fim['total_faturado'] = df_fim['vendidos'] * df_fim['price']
        df_fim['faturado_total(%)_base'] = (df_fim['total_faturado'] / df_fim['total_faturado'].sum()) * 100
        df_fim['rate_peso'] = (df_fim['rate'] * df_fim['count'])
        df_fim['ranking_avaliação_base'] = df_fim['rate_peso'] / df_fim['count'].sum()
        df_fim['loja'] = np.random.choice(['Loja 1', 'Loja 2', 'Loja 3'], size=df_fim.shape[0])
        df_fim['fator_mult_loja'] = np.select(
            [df_fim['loja'] == 'Loja 1', df_fim['loja'] == 'Loja 2', df_fim['loja'] == 'Loja 3'],
            [1.3, 0.95, 0.9],
            default=np.nan
        )
        df_fim['latitudes'] = [np.random.uniform(-90, 90) for _ in range(df_fim.shape[0])]
        df_fim['longitudes'] = [np.random.uniform(-180, 180) for _ in range(df_fim.shape[0])]

        # Correção na geração das datas
        datas = pd.date_range("2023-09-01", '2023-09-30')
        df_fim['Data'] = pd.Series(np.random.choice(datas, size=df_fim.shape[0])).dt.strftime('%Y/%m/%d')

        df_fim.rename(columns={'id': 'productId'}, inplace=True)
        return df_fim

    def transform_carrinhos(self):
        carrinhos_data = self.obter_carrinhos()
        if not carrinhos_data:
            raise Exception("Não foi possível obter dados de carrinhos")

        carrinhos = pd.DataFrame(carrinhos_data)
        carrinhos = carrinhos.explode('products')
        carrinhos = pd.concat(
            [carrinhos, carrinhos['products'].apply(pd.Series)],
            axis=1
        ).drop(['products', '__v'], axis=1)

        # Correção no tratamento das datas
        carrinhos['Data'] = pd.to_datetime(carrinhos['date'])
        carrinhos['Data'] = carrinhos['Data'].apply(lambda x: x.replace(year=2023))
        carrinhos['Data'] = carrinhos['Data'].apply(
            lambda x: x.replace(month=9) if x.month == 3 else x
        )
        carrinhos['Data'] = carrinhos['Data'].dt.strftime('%Y/%m/%d')

        carrinhos.drop(columns='date', inplace=True)
        carrinhos['loja'] = np.random.choice(
            ['Loja 1', 'Loja 2', 'Loja 3'],
            size=carrinhos.shape[0]
        )
        carrinhos = carrinhos.rename(columns={'id': 'id_carrinho'})
        return carrinhos

    def run_etl(self):
        try:
            print(f"Iniciando processo ETL - {datetime.now()}")

            # Extract e Transform
            df_produtos = self.transform_produtos()
            df_carrinhos = self.transform_carrinhos()

            # Merge final
            dados_combinados = pd.merge(
                df_produtos,
                df_carrinhos,
                on=['productId', 'loja', 'Data'],
                how='left'
            )

            # Load
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            output_file = os.path.join(self.output_dir, f"dados_etl_{timestamp}.parquet")

            dados_combinados.to_parquet(
                output_file,
                compression='snappy',
                index=False
            )

            print(f"Dados salvos com sucesso em {output_file}")

        except Exception as e:
            print(f"Erro durante o processo ETL: {str(e)}")

def main():
    # Inicializar pipeline
    pipeline = ETLPipeline()

    # Agendar execução a cada 2 horas
    schedule.every(2).hours.do(pipeline.run_etl)

    # Executar imediatamente pela primeira vez
    pipeline.run_etl()

    while True:
        schedule.run_pending()
        time.sleep(60)

if __name__ == "__main__":
    main()

Iniciando processo ETL - 2024-10-23 22:10:31.013229
Dados salvos com sucesso em data_output/dados_etl_20241023_221036.parquet


Collecting schedule
  Downloading schedule-1.2.2-py3-none-any.whl.metadata (3.8 kB)
Downloading schedule-1.2.2-py3-none-any.whl (12 kB)
Installing collected packages: schedule
Successfully installed schedule-1.2.2
