# Otimização do Mix de Culturas — Versão 5 (Organizada)
Este notebook reorganiza o código: funções reutilizáveis, logging centralizado para `otimizacao_mix_culturas_v5_organized.txt`, e pontos claros para executar o dashboard separadamente.

In [None]:
# 0. Imports e configuração básica
import os
from datetime import datetime
import json
import pandas as pd
import numpy as np
import pulp
from pulp import LpProblem, LpVariable, LpMaximize, lpSum, value, LpStatus
from omegaconf import DictConfig, OmegaConf


# --- ADICIONE ESTA LINHA ---
# Define o caminho do arquivo de log principal (usa caminho absoluto na pasta do notebook)
NOTEBOOK_DIR = os.path.dirname(os.path.abspath("/home/vinicius/Downloads/estudo/po/PL/PL/otimizacao_mix_culturas_v5_organized.ipynb"))
MASTER_LOG = os.path.join(NOTEBOOK_DIR, 'otimizacao_mix_culturas_v5_organized.txt')
# Garante que o arquivo exista e seja gravável (tenta criar/tocar o arquivo)
try:
    open(MASTER_LOG, 'a', encoding='utf-8').close()
except Exception as e:
    print(f'Falha ao criar/abrir arquivo de log principal: {e}')
# ---------------------------


def print_and_log(message, logfile=MASTER_LOG):
    ts = datetime.now().isoformat(sep=' ', timespec='seconds')
    line = f'[{ts}] {message}'
    print(line)
    try:
        with open(logfile, 'a', encoding='utf-8') as f:
            f.write(line + '\n')
    except Exception as e:
        print(f'Falha ao gravar log: {e}')

In [None]:
# 1. Função para gerar dados mock (reutilizável)
def generate_mock_data(n_dados=1500, seed=42, verbose=True):
    print_and_log(f'GERAR DADOS: n_dados={n_dados}, seed={seed}')
    np.random.seed(int(seed))
    df = pd.DataFrame({'id_talhao': range(1, n_dados + 1), 'cultura': np.random.choice(['Soja_Resistente','Soja_Produtiva','Milho_Safrinha'], n_dados)})

    culturas_params = {
        'Soja_Resistente': {'prod': (3.5,0.5),'custo':(1800,150),'agua':(450,50),'k':(80,10),'p':(70,8),'horas':(10,1.5)},
        'Soja_Produtiva': {'prod': (4.8,0.6),'custo':(2500,200),'agua':(600,60),'k':(100,12),'p':(90,10),'horas':(12,1.8)},
        'Milho_Safrinha': {'prod': (5.5,0.7),'custo':(2800,250),'agua':(700,70),'k':(120,15),'p':(100,12),'horas':(15,2.0)},
    }

    def gen(params, size):
        return {
            'produtividade_ton_ha': np.random.normal(loc=params['prod'][0], scale=params['prod'][1], size=size),
            'custo_ha': np.random.normal(loc=params['custo'][0], scale=params['custo'][1], size=size),
            'uso_agua_m3_ha': np.random.normal(loc=params['agua'][0], scale=params['agua'][1], size=size),
            'demanda_k_kg_ha': np.random.normal(loc=params['k'][0], scale=params['k'][1], size=size),
            'demanda_p_kg_ha': np.random.normal(loc=params['p'][0], scale=params['p'][1], size=size),
            'horas_maquina_ha': np.random.normal(loc=params['horas'][0], scale=params['horas'][1], size=size),
        }
    
    for cultura, params in culturas_params.items():
        mask = df['cultura'] == cultura
        data = gen(params, mask.sum())
        for k, vals in data.items():
            df.loc[mask, k] = vals
    
    if verbose:
        dist = df['cultura'].value_counts().to_dict()
        print_and_log('Distribuição por cultura: ' + json.dumps(dist, ensure_ascii=False))
        print_and_log('Dados mock gerados com sucesso. Amostra das primeiras linhas:')
        print_and_log(df.head(5).to_json(orient='records', force_ascii=False))
        print_and_log('Estatísticas resumidas:')
        print_and_log(df.describe().to_json(force_ascii=False))
    return df

In [None]:
# 2. Função que configura o modelo de otimização (recebe parâmetros agregados e recursos)
def setup_model_from_params(params_df, resources, percentual_minimo_por_cultura=0.15, percentual_maximo_soja_produtiva=0.60):
    # params_df: DataFrame com index = culturas e colunas medias por ha
    print_and_log('SETUP MODELO: iniciando criação do modelo')
    modelo = LpProblem(name='Otimizacao_Mix_Culturas_Risco', sense=LpMaximize)
    # Variáveis de decisão
    x_sr = LpVariable('Hectares_Soja_Resistente', lowBound=0, cat='Continuous')
    x_sp = LpVariable('Hectares_Soja_Produtiva', lowBound=0, cat='Continuous')
    x_ms = LpVariable('Hectares_Milho_Safrinha', lowBound=0, cat='Continuous')

    # Extrair parâmetros de forma robusta (usando .at para evitar KeyErrors)
    def val(cultura, col):
        try:
            return float(params_df.at[cultura, col])
        except Exception:
            return 0.0

    l_sr = val('Soja_Resistente','lucro_ha')
    l_sp = val('Soja_Produtiva','lucro_ha')
    l_ms = val('Milho_Safrinha','lucro_ha')
    c_sr = val('Soja_Resistente','custo_ha')
    c_sp = val('Soja_Produtiva','custo_ha')
    c_ms = val('Milho_Safrinha','custo_ha')
    a_sr = val('Soja_Resistente','uso_agua_m3_ha')
    a_sp = val('Soja_Produtiva','uso_agua_m3_ha')
    a_ms = val('Milho_Safrinha','uso_agua_m3_ha')
    k_sr = val('Soja_Resistente','demanda_k_kg_ha')
    k_sp = val('Soja_Produtiva','demanda_k_kg_ha')
    k_ms = val('Milho_Safrinha','demanda_k_kg_ha')
    p_sr = val('Soja_Resistente','demanda_p_kg_ha')
    p_sp = val('Soja_Produtiva','demanda_p_kg_ha')
    p_ms = val('Milho_Safrinha','demanda_p_kg_ha')
    h_sr = val('Soja_Resistente','horas_maquina_ha')
    h_sp = val('Soja_Produtiva','horas_maquina_ha')
    h_ms = val('Milho_Safrinha','horas_maquina_ha')
    prod_sr = val('Soja_Resistente','produtividade_ton_ha')
    prod_sp = val('Soja_Produtiva','produtividade_ton_ha')
    prod_ms = val('Milho_Safrinha','produtividade_ton_ha')

    # Gestão de risco: mínimos e máximos baseados em AREA_NAO_COMPACTADA_HA e parâmetros de política
    area_nc = float(resources.get('AREA_NAO_COMPACTADA_HA', 0))
    area_minima_ha = int(area_nc * float(percentual_minimo_por_cultura))
    area_maxima_sp = int(area_nc * float(percentual_maximo_soja_produtiva))
    print_and_log(f'REGRAS: min_por_cultura={percentual_minimo_por_cultura}, max_sp={percentual_maximo_soja_produtiva}, area_nc={area_nc}')

    modelo += (x_sr >= area_minima_ha, 'Minimo_Soja_Resistente')
    modelo += (x_sp >= area_minima_ha, 'Minimo_Soja_Produtiva')
    modelo += (x_ms >= area_minima_ha, 'Minimo_Milho_Safrinha')
    modelo += (x_sp <= area_maxima_sp, 'Risco_Maximo_Soja_Produtiva')

    # Objetivo
    modelo += lpSum([l_sr * x_sr, l_sp * x_sp, l_ms * x_ms]), 'Lucro_Total'

    # Restrições (área, orçamento, água, nutriente, horas, silo)
    modelo += (x_sr + x_sp + x_ms <= resources.get('AREA_TOTAL_DISPONIVEL_HA', 0), 'Restricao_Area_Total')
    modelo += (x_sr + x_sp + x_ms <= resources.get('AREA_NAO_COMPACTADA_HA', 0), 'Restricao_Area_Nao_Compactada')
    modelo += (c_sr * x_sr + c_sp * x_sp + c_ms * x_ms <= resources.get('ORCAMENTO_TOTAL_DISPONIVEL', 0), 'Restricao_Orcamento')
    modelo += (a_sr * x_sr + a_sp * x_sp + a_ms * x_ms <= resources.get('AGUA_TOTAL_DISPONIVEL_M3', 0), 'Restricao_Agua')
    modelo += (k_sr * x_sr + k_sp * x_sp + k_ms * x_ms <= resources.get('POTASSIO_DISPONIVEL_KG', 0), 'Restricao_Potassio')
    modelo += (p_sr * x_sr + p_sp * x_sp + p_ms * x_ms <= resources.get('FOSFORO_DISPONIVEL_KG', 0), 'Restricao_Fosforo')
    modelo += (h_sr * x_sr + h_sp * x_sp + h_ms * x_ms <= resources.get('HORAS_MAQUINA_DISPONIVEIS', 0), 'Restricao_Horas_Maquina')
    modelo += (prod_sr * x_sr + prod_sp * x_sp + prod_ms * x_ms <= resources.get('CAPACIDADE_SILO_TON', 0), 'Restricao_Armazenagem')

    print_and_log('SETUP MODELO: finalizado')
    return modelo, (x_sr, x_sp, x_ms)

In [None]:
# 3. Função que executa a otimização a partir de um DictConfig/Dict-like com 'params' e 'resources'
def run_optimization(cfg, logmaster=MASTER_LOG, verbose=True):
    # cfg: DictConfig ou dict com keys 'params' (contendo preco, n_dados, seed) e 'resources'
    params_in = cfg.get('params', {}) if isinstance(cfg, dict) else cfg.params
    resources = cfg.get('resources', {}) if isinstance(cfg, dict) else cfg.resources

    # Log de entrada de parâmetros
    print_and_log('RUN: parâmetros (params_in) => ' + json.dumps(OmegaConf.to_container(params_in, resolve=True) if isinstance(params_in, DictConfig) else params_in, ensure_ascii=False))
    print_and_log('RUN: recursos (resources) => ' + json.dumps(OmegaConf.to_container(resources, resolve=True) if isinstance(resources, DictConfig) else resources, ensure_ascii=False))

    df = generate_mock_data(int(params_in.get('n_dados',1500)), int(params_in.get('seed',42)), verbose=verbose)
    params = df.groupby('cultura').mean()
    # calcular lucro por ha por cultura usando preços do cfg
    preco_soja = float(params_in.get('preco_soja', 2200))
    preco_milho = float(params_in.get('preco_milho', 1300))
    perc_min = float(params_in.get('percentual_minimo_por_cultura', 0.15))
    perc_max_sp = float(params_in.get('percentual_maximo_soja_produtiva', 0.60))

    params['lucro_ha'] = 0.0
    params.at['Soja_Resistente','lucro_ha'] = params.at['Soja_Resistente','produtividade_ton_ha'] * preco_soja - params.at['Soja_Resistente','custo_ha']
    params.at['Soja_Produtiva','lucro_ha'] = params.at['Soja_Produtiva','produtividade_ton_ha'] * preco_soja - params.at['Soja_Produtiva','custo_ha']
    params.at['Milho_Safrinha','lucro_ha'] = params.at['Milho_Safrinha','produtividade_ton_ha'] * preco_milho - params.at['Milho_Safrinha','custo_ha']

    if verbose:
        print_and_log('PARAMS agregados por cultura:')
        print_and_log(params.to_json(force_ascii=False))

    modelo, variables = setup_model_from_params(params, resources, percentual_minimo_por_cultura=perc_min, percentual_maximo_soja_produtiva=perc_max_sp)
    print_and_log('Iniciando solução do modelo...', logfile=logmaster)
    try:
        solve_status = modelo.solve()
        status = LpStatus[modelo.status]
    except Exception as e:
        print_and_log(f'Falha ao executar solver: {e}', logfile=logmaster)
        raise

    print_and_log(f'Status do solver (raw): {solve_status}', logfile=logmaster)
    print_and_log(f'Status da otimização: {status}', logfile=logmaster)

    plantio = {v.name: v.varValue for v in variables} if status == 'Optimal' else None
    shadow = {name: getattr(con, 'pi', None) for name, con in modelo.constraints.items()}
    print_and_log('Plantio (resultado):', logfile=logmaster)
    print_and_log(str(plantio), logfile=logmaster)
    print_and_log('Preços sombra:', logfile=logmaster)
    print_and_log(str(shadow), logfile=logmaster)

    results = {'status': status, 'lucro': value(modelo.objective) if status == 'Optimal' else None, 'plantio': plantio, 'shadow_prices': shadow}
    return results

In [None]:
# 4. Execução segura de múltiplos cenários (usa arquivos em config/cenario)
def load_and_merge_scenarios(scenario_dir='config/cenario', base_name='base'):
    base_path = os.path.join(scenario_dir, f'{base_name}.yaml')
    print_and_log(f'Carregando cenários de {scenario_dir} (base={base_name})')
    
    # Garante que a configuração base seja um DictConfig vazio se o arquivo não existir
    base_cfg = OmegaConf.load(base_path) if os.path.exists(base_path) else OmegaConf.create({})
    
    scenarios = {base_name: base_cfg}
    
    # --- LÓGICA SUGERIDA PARA CARREGAR OUTROS CENÁRIOS ---
    try:
        for filename in os.listdir(scenario_dir):
            # Ignora o arquivo base que já foi carregado e arquivos que não são .yaml
            if filename.endswith('.yaml') and not filename.startswith(base_name):
                scenario_name = filename.replace('.yaml', '')
                scenario_path = os.path.join(scenario_dir, filename)
                
                # Carrega o cenário específico
                scenario_cfg = OmegaConf.load(scenario_path)
                
                # Mescla o cenário com a base (valores do cenário específico sobrescrevem a base)
                merged_cfg = OmegaConf.merge(base_cfg, scenario_cfg)
                scenarios[scenario_name] = merged_cfg
    except FileNotFoundError:
        print_and_log(f"Diretório de cenários não encontrado: {scenario_dir}")
    except Exception as e:
        print_and_log(f"Ocorreu um erro ao carregar cenários: {e}")
    # --------------------------------------------------------
    print_and_log('Cenários carregados: ' + ', '.join(scenarios.keys()))
    
    return scenarios # Retornar os cenários carregados

In [None]:
# 5. Função utilitária para rodar todos os cenários do diretório e resumir resultados

def run_all_scenarios(scenario_dir='config/cenario', base_name='base', save_csv='resumo_cenarios.csv', verbose=True):
    scenarios = load_and_merge_scenarios(scenario_dir=scenario_dir, base_name=base_name)
    rows = []
    for name, cfg in scenarios.items():
        print_and_log(f'Executando cenário: {name}')
        try:
            res = run_optimization(cfg, verbose=verbose)
            rows.append({
                'cenario': name,
                'status': res.get('status'),
                'lucro': res.get('lucro'),
                'plantio': json.dumps(res.get('plantio', {}), ensure_ascii=False)
            })
        except Exception as e:
            print_and_log(f'Falha no cenário {name}: {e}')
            rows.append({'cenario': name, 'status': 'Error', 'lucro': None, 'plantio': '{}'})
    df = pd.DataFrame(rows)
    try:
        df.to_csv(save_csv, index=False)
        print_and_log(f'Resumo salvo em {save_csv}')
    except Exception as e:
        print_and_log(f'Falha ao salvar resumo {save_csv}: {e}')
    return df

In [None]:
# 6. Rodar todos os cenários e exibir tabela resumo
summary_df = run_all_scenarios(scenario_dir='config/cenario', base_name='base', save_csv='resumo_cenarios.csv', verbose=False)
summary_df

In [None]:
# --- Smoke test de logging: grava e mostra as últimas linhas do log principal
print_and_log('SMOKE TEST: iniciando verificação do arquivo de log principal')
# Append a specific identifiable line
print_and_log('SMOKE TEST: linha de verificação')
# Agora lemos as últimas linhas do arquivo de log e mostramos
try:
    with open(MASTER_LOG, 'r', encoding='utf-8') as f:
        lines = f.readlines()
    tail = lines[-20:] if len(lines) > 0 else []
    print('\n--- Últimas linhas do log ---')
    for ln in tail:
        print(ln.rstrip())
except Exception as e:
    print(f'Falha ao ler o arquivo de log: {e}')