In [3]:
# BIBLIOTECAS
# =====================
import os
import re
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd

In [4]:
#Utils
# -------------------------
# Utils de I/O e diretórios
# -------------------------

def ensure_dirs(paths):
    """
    Garante que todas as pastas em 'paths' existem.
    paths pode ser lista de strings ou uma única string.
    """
    if isinstance(paths, str):
        paths = [paths]
    for p in paths:
        os.makedirs(p, exist_ok=True)


def to_float_smart(x):
    """
    Converte strings tipo '1.234,56' (BR) ou '1,234.56' (US) e variantes em float.
    Trata negativos e milhares. Retorna NaN se não der.
    """
    import re
    if pd.isna(x):
        return np.nan
    if isinstance(x, (int, float, np.number)):
        return float(x)

    s = str(x).strip()
    if s in {"", "-", "--", "nan", "NaN", "None", "NULL"}:
        return np.nan

    # mantém apenas dígitos, sinais e separadores
    s = re.sub(r"[^0-9\-\.,]", "", s)

    has_dot   = "." in s
    has_comma = "," in s

    try:
        if has_dot and has_comma:
            # decide pelo separador mais à direita
            if s.rfind(",") > s.rfind("."):
                # BR: 1.234,56 -> 1234.56
                s = s.replace(".", "").replace(",", ".")
            else:
                # US: 1,234.56 -> 1234.56
                s = s.replace(",", "")
            return float(s)

        if has_comma and not has_dot:
            # BR decimal: 1234,56 -> 1234.56
            return float(s.replace(",", "."))

        if has_dot and not has_comma:
            # Pode ser decimal (um ponto) ou milhares (vários pontos)
            if s.count(".") == 1:
                return float(s)  # 1234.56
            else:
                # 109.641.290.194 -> 109641290194
                return float(s.replace(".", ""))

        # Só dígitos e talvez sinal
        return float(s)
    except Exception:
        return np.nan

#def _parse_br_friendly_date_series(series: pd.Series) -> pd.Series:    
#    """
#    Converte a coluna de datas de publicação para datetime,
#    assumindo SEMPRE padrão brasileiro (DD/MM/AAAA) quando houver ambiguidade.
#
#    Regras:
#    - Se estiver no padrão ISO 'YYYY-MM-DD', usamos isso direto (não é ambíguo).
#    - Se estiver no padrão brasileiro 'DD/MM/YYYY', interpretamos como dia/mês/ano.
#    - Se vier em qualquer outro formato, tentamos parse com dayfirst=True.
#    - No final, retornamos datetime normalizado (sem hora).
#    """
#
#    s = series.astype(str).str.strip()
#
#    # 1) tenta ISO claro: 2024-03-31
#    iso_mask = s.str.match(r"^\d{4}-\d{2}-\d{2}$")
#    out_iso = pd.to_datetime(
#        s.where(iso_mask),
#        format="%Y-%m-%d",
#        errors="coerce"
#    )
#
#    # 2) tenta BR claro: 31/03/2024
#    br_mask = s.str.match(r"^\d{2}/\d{2}/\d{4}$")
#    out_br = pd.to_datetime(
#        s.where(br_mask),
#        format="%d/%m/%Y",
#        dayfirst=True,
#        errors="coerce"
#    )
#
#    # 3) começa com ISO e preenche lacunas com BR
#    out = out_iso.fillna(out_br)
#
#    # 4) fallback genérico:
#    #    qualquer coisa que sobrou a gente interpreta assumindo padrão brasileiro (dayfirst=True)
#    still_nat = out.isna()
#    if still_nat.any():
#        out_fallback = pd.to_datetime(
#            s[still_nat],
#            errors="coerce",
#            dayfirst=True   # <- força semântica brasileira
#        )
#        out.loc[still_nat] = out_fallback
#
#    # 5) normaliza para "apenas a data" (zera hora)
#    out = out.dt.normalize()
#
#    return out



In [5]:
#DataPrepFund

class DataPrepFund:
    def __init__(self,
                 prices_dir: str = "dataset/prices_processed",
                 fund_dir: str = "dataset/fundamental",
                 out_fund_dir: str = "dataset/fund_processed",
                 out_final_dir: str = "dataset/final"):
        self.prices_dir = prices_dir
        self.fund_dir = fund_dir
        self.out_fund_dir = out_fund_dir
        self.out_final_dir = out_final_dir
        ensure_dirs([prices_dir, fund_dir, out_fund_dir, out_final_dir])

    @staticmethod
    def _ticker_from_price_filename(fname: str) -> str:
        # "AZUL4.SA.csv" -> "AZUL4"
        base = os.path.basename(fname)
        if base.endswith(".csv"):
            base = base[:-4]
        return base.replace(".SA", "")

    @staticmethod
    def _fund_path_for_ticker(fund_dir: str, ticker: str) -> Optional[str]:
        # procura "<TICKER>_fUND.csv" (case-insensitive)
        for fn in os.listdir(fund_dir):
            if fn.lower() == f"{ticker.lower()}_fund.csv" or fn.lower() == f"{ticker.lower()}_fundamental.csv" or fn.lower() == f"{ticker.lower()}_fund.csv":
                return os.path.join(fund_dir, fn)
            if fn.lower().startswith(ticker.lower()) and "fund" in fn.lower():
                return os.path.join(fund_dir, fn)
        return None

    def process_one(self, price_csv_path: str,
                    indicator_factor: float = 0.1,
                    save_intermediate_prices: bool = True,
                    attach_fundamentals_asof: bool = True,
                    only_events: bool = False) -> Optional[pd.DataFrame]:
        """
        Processa um ticker:
          - preço -> retornos/STD/indicadores
          - evento -> flag 0/1 (Data ∈ Data_Publicacao)
          - (opcional) merge_asof com fundamentos publicados
          - salva CSV final
        """
        
        # --- fundamentos (se houver)
        fund_path = self._fund_path_for_pricefile(self.fund_dir, price_csv_path)
        df_price_feat = pd.read_csv(self.prices_dir, sep=None, engine="python", dtype=str)
        
        if fund_path is None:
            # sem fundamentos -> apenas marca evento=0 e finaliza
            df_final = df_price_feat.copy()
            df_final["event"] = 0
        else:
            dff_raw = pd.read_csv(fund_path, sep=None, engine="python", dtype=str)
            fund = FundamentalProcessing(dff_raw, tkr)
            pub_dates = set(pd.to_datetime(fund.get_publication_dates(), errors="coerce").dropna().values)
            df_final = df_price_feat.copy()
            # flag de evento
            df_final["event"] = df_final["Data"].isin(pub_dates).astype(int)

            # (opcional) merge_asof colando fundamentos até a próxima publicação
            if attach_fundamentals_asof:
                f_asof = fund.features_for_asof_merge()
                if not f_asof.empty:
                    df_final = df_final.sort_values("Data")
                    f_asof = f_asof.sort_values("Data_Publicacao")
                    df_final = pd.merge_asof(
                        df_final,
                        f_asof,
                        left_on="Data",
                        right_on="Data_Publicacao",
                        direction="backward"
                    )
                    # remove coluna-âncora para não poluir
                    if "Data_Publicacao" in df_final.columns:
                        df_final.drop(columns=["Data_Publicacao"], inplace=True)

            if only_events:
                df_final = df_final[df_final["event"] == 1].copy()

        # salva final por ticker
        out_final_path = os.path.join(self.out_final_dir, f"{tkr}.final.csv")
        df_final.to_csv(out_final_path, index=False)
        return df_final

    def process_all(self,
                    indicator_factor: float = 0.1,
                    save_intermediate_prices: bool = True,
                    attach_fundamentals_asof: bool = True,
                    only_events: bool = False) -> pd.DataFrame:
        """Processa todos os arquivos em dataset/prices e devolve consolidado."""
        all_final = []
        for fn in os.listdir(self.prices_dir):
            if not fn.lower().endswith(".csv"):
                continue
            try:
                path = os.path.join(self.prices_dir, fn)
                df_final = self.process_one(
                    path,
                    indicator_factor=indicator_factor,
                    save_intermediate_prices=save_intermediate_prices,
                    attach_fundamentals_asof=attach_fundamentals_asof,
                    only_events=only_events
                )
                if df_final is not None and not df_final.empty:
                    all_final.append(df_final.assign(Ticker=self._ticker_from_price_filename(fn)))
            except Exception as ex:
                print(f"Erro no ticker de {fn}: {ex}")
                continue

        if not all_final:
            return pd.DataFrame()

        df_all = pd.concat(all_final, ignore_index=True)
        df_all = df_all.sort_values(["Ticker", "Data"]).reset_index(drop=True)
        # salva consolidado
        df_all.to_csv(os.path.join(self.out_final_dir, "final_dataprep.csv"), index=False)
        return df_all


In [6]:
class FundamentalProcessing:
    """
    Limpa e estrutura o DataFrame de fundamentos de um único ticker
    e gera também métricas derivadas (QoQ, YoY, EPS surprise).
    """

    def __init__(self, df_fund: pd.DataFrame, ticker: str):
        self.ticker = ticker
        self.df = df_fund.copy()

        # 1. Padronizar datas principais
        for col in ["Data_Publicacao", "Data_Demonstracao", "Data_Analise"]:
            if col in self.df.columns:
                self.df[col] = pd.to_datetime(self.df[col], dayfirst=True, errors="coerce", format="%Y-%m-%d")
        #        self.df[col] = _parse_br_friendly_date_series(self.df[col])

        # 2. Converter colunas numéricas relevantes
        num_cols = [
            "RL","LL","EBITDA","Preco_Abertura","Preco_Fechamento","LPA","ROA","ROE","MEB",
            "CRESC_RL_12M","CRESC_LL_12M","CRESC_EBITDA_12M","CAPEX","FCO","FCF",
            "Divida_Liquida","PL","Divida_Bruta","AT","DVA_Despesas_Fin",
            "PC","PNC","Outros_PC","LUB"
        ]
        for c in num_cols:
            if c in self.df.columns:
                self.df[c] = self.df[c].apply(to_float_smart)

        # 3. Resolver duplicadas (RL_dup*, EBITDA_dup etc.)
        if set(["RL","RL_dup1","RL_dup2"]).issubset(self.df.columns):
            self.df["RL"] = (
                self.df[["RL","RL_dup1","RL_dup2"]]
                .bfill(axis=1)
                .iloc[:,0]
            )
            self.df.drop(columns=["RL_dup1","RL_dup2"], inplace=True, errors="ignore")

        if set(["EBITDA","EBITDA_dup"]).issubset(self.df.columns):
            self.df["EBITDA"] = (
                self.df[["EBITDA","EBITDA_dup"]]
                .bfill(axis=1)
                .iloc[:,0]
            )
            self.df.drop(columns=["EBITDA_dup"], inplace=True, errors="ignore")

        # 4. Definir QuarterEnd (fim do trimestre contábil)
        if "Data_Demonstracao" in self.df.columns:
            self.df["QuarterEnd"] = self.df["Data_Demonstracao"]
        else:
            self.df["QuarterEnd"] = pd.NaT

        if self.df["QuarterEnd"].isna().all() and "Data_Analise" in self.df.columns:
            self.df["QuarterEnd"] = self.df["Data_Analise"]

        # 5. Ordenar e deduplicar por QuarterEnd
        sort_cols = []
        if "QuarterEnd" in self.df.columns:
            sort_cols.append("QuarterEnd")
        if "Data_Publicacao" in self.df.columns:
            sort_cols.append("Data_Publicacao")

        if sort_cols:
            self.df = (
                self.df.sort_values(sort_cols)
                       .drop_duplicates(["QuarterEnd"])
                       .reset_index(drop=True)
            )

        # 6. Forçar coluna Ticker logo no início
        self.df.insert(0, "Ticker", self.ticker)

    def get_publication_dates(self) -> pd.Series:
        """
        Retorna as datas de publicação em string DD/MM/AAAA,
        para ser usada na marcação de 'event' nos preços.
        """
        if "Data_Publicacao" not in self.df.columns:
            return pd.Series([], dtype="object")

        pub = self.df["Data_Publicacao"].dropna()
        pub = pd.to_datetime(pub, errors="coerce", dayfirst=True).dropna()
        pub_str = pub.dt.strftime("%d/%m/%Y")
        return pub_str.reset_index(drop=True)

    def build_qoq_yoy_and_eps(self) -> pd.DataFrame:
        """
        Cria tabela enriquecida com:
        - métricas contábeis numéricas
        - variação QoQ e YoY
        - EPS surprise
        Retorna uma tabela alinhada em Data_Publicacao / QuarterEnd.
        """
        df = self.df.copy()

        if "Data_Publicacao" not in df.columns:
            return pd.DataFrame()

        out = df[["Ticker", "Data_Publicacao", "QuarterEnd"]].copy()

        # pega colunas numéricas
        num_cols = [
            c for c in df.columns
            if pd.api.types.is_numeric_dtype(df[c])
        ]

        for c in num_cols:
            out[c] = df[c].values
            out[f"{c}_Q_Change"] = df[c].diff(1).values        # variação QoQ
            out[f"{c}_Y_Change"] = (df[c] - df[c].shift(4)).values  # variação YoY (~mesmo tri ano passado)

        # EPS Surprise
        if "EPS_Consensus" in df.columns and "LPA" in df.columns:
            eps = df["LPA"] - df["EPS_Consensus"]
        elif "LPA" in df.columns:
            eps = df["LPA"].diff(1)
        else:
            eps = pd.Series([np.nan] * len(df))

        out["EPS_EarningsSurprise"] = eps
        out["EPS_Earnings_Surprise_Backward_Diff"] = eps - eps.shift(1)
        out["EPS_Earnings_Surprise_Backward_Ave_Diff"] = eps - eps.shift(3).rolling(3).mean()

        # ordenar e limpar
        out = (
            out.dropna(subset=["Data_Publicacao"])
               .sort_values("Data_Publicacao")
               .reset_index(drop=True)
        )

        # renomear Data_Publicacao -> AnnounceDate (mais típico no seu pipeline)
        out = out.rename(columns={"Data_Publicacao": "AnnounceDate"})

        return out

    def build_final_single_table(self) -> pd.DataFrame:
        """
        Junta as colunas limpas (self.df) com as métricas derivadas
        (QoQ/YoY/EPS surprise) em UMA tabela final por ticker.
        A junção é feita por ['Ticker','QuarterEnd','AnnounceDate' (~Data_Publicacao)].
        """
        base_df = self.df.copy()

        # garantir que as chaves existam
        if "Data_Publicacao" not in base_df.columns:
            base_df["Data_Publicacao"] = pd.NaT
        if "QuarterEnd" not in base_df.columns:
            base_df["QuarterEnd"] = pd.NaT

        # tabela derivada
        metrics_df = self.build_qoq_yoy_and_eps()

        if metrics_df.empty:
            # se não conseguiu gerar métricas (ex falta Data_Publicacao),
            # simplesmente retorna base_df como final
            final_df = base_df.copy()
            # manter sem poluir com colunas que não existem
        else:
            # vamos juntar o base_df com metrics_df
            # base_df tem Data_Publicacao; metrics_df tem AnnounceDate.
            tmp_base = base_df.copy()
            tmp_base = tmp_base.rename(columns={"Data_Publicacao": "AnnounceDate"})

            # chave de merge: Ticker + QuarterEnd + AnnounceDate
            final_df = pd.merge(
                tmp_base,
                metrics_df,
                on=["Ticker", "QuarterEnd", "AnnounceDate"],
                how="left",
                suffixes=("", "_MET")
            )

        # reordena por QuarterEnd/AnnounceDate
        sort_cols = []
        if "QuarterEnd" in final_df.columns:
            sort_cols.append("QuarterEnd")
        if "AnnounceDate" in final_df.columns:
            sort_cols.append("AnnounceDate")

        if sort_cols:
            final_df = (
                final_df.sort_values(sort_cols)
                        .reset_index(drop=True)
            )

        return final_df


# ------------------------
# utilitário pra mapear arquivo -> ticker base
# ------------------------

def _ticker_from_fund_filename(fname: str) -> str:
    """
    "ABEV3.SA.csv" -> "ABEV3"
    """
    base = os.path.basename(fname)
    if base.endswith(".csv"):
        base = base[:-4]
    return base.replace(".SA", "")


In [7]:
class DataPrepFund:
    """
    Lê todos os arquivos em dataset/fundamental/,
    processa cada um,
    gera UMA única tabela final por ticker (fundamentos limpos + métricas QoQ/YoY/EPS),
    salva essa tabela única em dataset/fund_processed/<TICKER>.SA.csv,
    e retorna um concat com todos.
    """

    def __init__(self,
                 fund_dir: str = "dataset/fundamental",
                 out_fund_dir: str = "dataset/fund_processed"):
        self.fund_dir = fund_dir
        self.out_fund_dir = out_fund_dir
        ensure_dirs([fund_dir, out_fund_dir])

    def process_one(self, fund_csv_path: str) -> Optional[pd.DataFrame]:
        """
        Processa um único arquivo bruto de fundamentos e salva o resultado final único.
        """
        try:
            raw = pd.read_csv(fund_csv_path, sep=None, engine="python", dtype=str)

            tkr = _ticker_from_fund_filename(fund_csv_path)

            fp = FundamentalProcessing(raw, tkr)

            final_df = fp.build_final_single_table()

            # salva APENAS um arquivo final por ticker
            out_path = os.path.join(self.out_fund_dir, f"{tkr}.SA.csv")
            final_df.to_csv(out_path, index=False)

            return final_df

        except Exception as ex:
            print(f"Erro ao processar {fund_csv_path}: {ex}")
            return None

    def process_all(self) -> pd.DataFrame:
        """
        Processa todos os CSVs em self.fund_dir e devolve concat completo.
        """
        all_final = []

        for fn in os.listdir(self.fund_dir):
            if not fn.lower().endswith(".csv"):
                continue

            full_path = os.path.join(self.fund_dir, fn)

            df_final = self.process_one(full_path)

            if df_final is not None and not df_final.empty:
                all_final.append(df_final)

        if not all_final:
            return pd.DataFrame()

        df_all = pd.concat(all_final, ignore_index=True)
        return df_all

In [8]:
if __name__ == "__main__":
    ensure_dirs([
        "dataset/fundamental",
        "dataset/fund_processed",
    ])

    fund_pipeline = DataPrepFund(
        fund_dir="dataset/fundamental",
        out_fund_dir="dataset/fund_processed"
    )

    df_all_fund = fund_pipeline.process_all()

    print("Fundamentals tratados salvos (1 arquivo por ticker) em dataset/fund_processed/")

Fundamentals tratados salvos (1 arquivo por ticker) em dataset/fund_processed/
