In [None]:
import pandas as pd
from sqlalchemy import create_engine, text
import warnings
from typing import Optional

warnings.filterwarnings('ignore')


class ETLControleEstoque:
    """ETL para processamento de dados de controle de estoque."""
    
    def __init__(self):
        self.engine_origem = create_engine(
            'mysql+pymysql://erpj-ws:erpj-ws-homologacao@localhost:3309/autogeral'
        )
        self.engine_destino = create_engine(
            'mysql+pymysql://erpj-ws:erpj-ws-homologacao@10.50.1.252:3306/autogeral'
        )
    
    def _criar_tabela_destino(self):
        """Cria a tabela ETL_CONTROLE_ESTOQUE se não existir."""
        sql = """
        CREATE TABLE IF NOT EXISTS ETL_CONTROLE_ESTOQUE (
            ID INT AUTO_INCREMENT PRIMARY KEY,
            LOJA_ORIGEM INT NOT NULL,
            LOJA_DESTINO INT NOT NULL,
            CODIGO_X BIGINT UNSIGNED NOT NULL,
            CODIGO_SEQUENCIA CHAR(1) NOT NULL,
            QUANTIDADE DECIMAL(9,2) NOT NULL,
            DESCRICAO VARCHAR(255) NOT NULL,
            DATA_DESTINO VARCHAR(10) NOT NULL,
            ROMANEIO INT NOT NULL,
            DATA_PROCESSAMENTO TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            INDEX idx_loja_origem (LOJA_ORIGEM),
            INDEX idx_loja_destino (LOJA_DESTINO),
            INDEX idx_codigo_x (CODIGO_X),
            INDEX idx_data_destino (DATA_DESTINO),
            INDEX idx_romaneio (ROMANEIO)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3
        """
        
        with self.engine_destino.connect() as conn:
            conn.execute(text(sql))
            conn.commit()
    
    def _inserir_dados(self, df: pd.DataFrame):
        """Insere dados na tabela de destino."""
        if df.empty:
            return
        
        df.to_sql(
            name='ETL_CONTROLE_ESTOQUE',
            con=self.engine_destino,
            if_exists='append',
            index=False,
            method='multi',
            chunksize=1000
        )
    
    def _limpar_tabela(self):
        """Remove todos os registros da tabela."""
        with self.engine_destino.connect() as conn:
            conn.execute(text("DELETE FROM ETL_CONTROLE_ESTOQUE"))
            conn.commit()
    
    def _contar_registros(self) -> int:
        """Retorna o total de registros na tabela."""
        with self.engine_destino.connect() as conn:
            result = conn.execute(text("SELECT COUNT(*) FROM ETL_CONTROLE_ESTOQUE"))
            return result.fetchone()[0]
    
    def _processar_lote(self, offset: int, tamanho_lote: int, situacao: str, 
                       data_inicio: str, data_fim: str) -> pd.DataFrame:
        """Processa um lote de dados."""
        sql = f"""
        SELECT 
            r.LOJA as LOJA_ORIGEM,
            r.CADASTRO_CODIGO as LOJA_DESTINO,
            ri.CODIGO_X,
            ri.CODIGO_SEQUENCIA,
            SUM(ri.QUANTIDADE) as QUANTIDADE,
            ri.DESCRICAO,
            r.ROMANEIO,
            r.CADASTRO
        FROM romaneios_dbf r
        INNER JOIN romaneios_itens_dbf ri ON r.LOJA = ri.LOJA AND r.ROMANEIO = ri.ROMANEIO
        WHERE 
            r.OPERACAO_CODIGO = 4
            AND r.SITUACAO IN ('FECHADO', 'EM_ABERTO')
            AND r.COMPRA_PEDIDO_LOJA IS NULL
            AND r.COMPRA_PEDIDO_CODIGO IS NULL
            AND r.ORIGEM_TIPO IS NULL
            AND r.CADASTRO BETWEEN '{data_inicio}' AND '{data_fim}'
        GROUP BY
            r.LOJA,
            r.CADASTRO_CODIGO,
            ri.CODIGO_X,
            ri.CODIGO_SEQUENCIA,
            ri.DESCRICAO,
            r.ROMANEIO,
            r.CADASTRO
        ORDER BY r.CADASTRO DESC
        LIMIT {tamanho_lote} OFFSET {offset}
        """
        
        df = pd.read_sql_query(sql, self.engine_origem)
        
        if not df.empty:
            df['CADASTRO'] = pd.to_datetime(df['CADASTRO'])
            df['DATA_DESTINO'] = (df['CADASTRO']
                                 .dt.to_period('M')
                                 .dt.start_time
                                 .dt.strftime('%Y-%m-%d'))
            
            df = df[['LOJA_ORIGEM', 'LOJA_DESTINO', 'CODIGO_X', 'CODIGO_SEQUENCIA',
                    'QUANTIDADE', 'DESCRICAO', 'DATA_DESTINO', 'ROMANEIO']]
        
        return df
    
    def executar_etl(self, situacao: str = 'TODOS', 
                    data_inicio: str = '2024-01-01',
                    data_fim: str = '2024-12-31',
                    tamanho_lote: int = 10000,
                    limpar_antes: bool = False) -> int:
        """
        Executa o processo ETL em lotes.
        
        Args:
            situacao: Situação dos romaneios ( FECHADO e EM_ABERTO)
            data_inicio: Data inicial do filtro
            data_fim: Data final do filtro
            tamanho_lote: Número de registros por lote
            limpar_antes: Se deve limpar a tabela antes da inserção
            
        Returns:
            Número total de registros processados
        """
        self._criar_tabela_destino()
        
        if limpar_antes:
            self._limpar_tabela()
        
        offset = 0
        total_processado = 0
        
        while True:
            df_lote = self._processar_lote(offset, tamanho_lote, situacao, 
                                         data_inicio, data_fim)
            
            if df_lote.empty:
                break
            
            self._inserir_dados(df_lote)
            total_processado += len(df_lote)
            
            offset += tamanho_lote
            
            if len(df_lote) < tamanho_lote:
                break
        
        return total_processado


def main():
    """Executa o ETL com parâmetros padrão."""
    etl = ETLControleEstoque()
    
    resultado = etl.executar_etl(
        data_inicio='2025-04-01',
        data_fim='2025-06-16',
        tamanho_lote=1000,
        limpar_antes=True
    )
    
    print(f"Processados {resultado} registros")
    return resultado


if __name__ == "__main__":
    main()

🚀 Iniciando processo ETL por LOTES...
📅 Período: 2025-04-01 a 2025-06-16
📊 Situação: FECHADO
📦 Tamanho do lote: 5000
--------------------------------------------------
✅ Tabela ETL_CONTROLE_ESTOQUE criada/verificada com sucesso!
🗑️ Tabela ETL_CONTROLE_ESTOQUE limpa: 0 registros removidos
\n📦 Processando lote 1 (offset: 0)...
💾 Inserindo dados na tabela ETL_CONTROLE_ESTOQUE...
✅ 5000 registros inseridos com sucesso!
✅ Lote 1 processado: 5000 registros
📊 Total processado até agora: 5000
\n📦 Processando lote 2 (offset: 5000)...
💾 Inserindo dados na tabela ETL_CONTROLE_ESTOQUE...
✅ 5000 registros inseridos com sucesso!
✅ Lote 2 processado: 5000 registros
📊 Total processado até agora: 10000
\n📦 Processando lote 3 (offset: 10000)...
💾 Inserindo dados na tabela ETL_CONTROLE_ESTOQUE...
✅ 5000 registros inseridos com sucesso!
✅ Lote 3 processado: 5000 registros
📊 Total processado até agora: 15000
\n📦 Processando lote 4 (offset: 15000)...
💾 Inserindo dados na tabela ETL_CONTROLE_ESTOQUE...
✅ 500