In [None]:
%pip install yfinance pandas python-dateutil pyarrow duckdb

In [None]:
#INGESTAO

import yfinance as yf
import pandas as pd
from datetime import datetime
from dateutil.relativedelta import relativedelta
import os

def download_financial_data():
    br_company = ['PETR4.SA', 'BBAS3.SA', 'ELET6.SA', 'SBSP3.SA', 'VALE3.SA']

    end = datetime.now()
    start = end - relativedelta(months=6)

    df = yf.download(br_company, start=start, end=end, group_by='ticker', threads=False)

    os.makedirs("/opt/airflow/data/raw", exist_ok=True)
    path = "/opt/airflow/data/raw/br_raw.parquet"
    df.to_parquet(path)
    return path

In [None]:
#Anotação

import pandas as pd
import os

def annotate_financial_data(raw_path):
    df = pd.read_parquet(raw_path)

    # Remove o multi-índice de colunas
    df = df.stack(level=0).reset_index()
    
    # Renomeia as colunas
    df = df.rename(columns={"level_1": "Ticker"})

    coluna_empresa = {
    'PETR4.SA': 'Petrobras',
    'BBAS3.SA': 'Banco do Brasil',
    'ELET6.SA': 'Eletrobras',
    'SBSP3.SA': 'Empresa de Saneamento de SP',
    'VALE3.SA': 'Vale'
    }

    coluna_setor = {
        'PETR4.SA': 'Energia',
        'BBAS3.SA': 'Financeiro',
        'ELET6.SA': 'Energia',
        'SBSP3.SA': 'Saneamento',
        'VALE3.SA': 'Mineração'
    }

    df['Empresa'] = df['Ticker'].map(coluna_empresa)
    df['Setor'] = df['Ticker'].map(coluna_setor)

    os.makedirs("/opt/airflow/data/processed", exist_ok=True)
    annotated_path = "/opt/airflow/data/processed/br_annotated.parquet"
    df.to_parquet(annotated_path)
    return annotated_path

In [None]:
#VALIDAÇÃO

import pandas as pd

def validate_data(annotated_path):
    df = pd.read_parquet(annotated_path)

    assert df["Close"].notnull().all(), "Erro: coluna Close contém nulos"
    assert (df["Close"] > 0).all(), "Erro: preços não podem ser negativos"

    return True