In [34]:
# Importa√ß√µes necess√°rias
import csv
from collections import Counter, defaultdict
import pandas as pd
import os
from sklearn.metrics.pairwise import cosine_similarity # Para similaridade mais robusta (opcional)
import numpy as np

# --- 1. Fun√ß√µes de Leitura e Prepara√ß√£o de Dados ---

def ler_usuarios_compras(caminho_csv_usuarios):
    """
    L√™ o hist√≥rico de compras dos usu√°rios.
    Assumimos que o CSV 'usuarios_compras_atualizado.csv' n√£o tem uma coluna 'organico'
    por usu√°rio. A prefer√™ncia por org√¢nicos do usu√°rio ser√° tratada como um par√¢metro.
    """
    usuarios_data = {}
    try:
        df_usuarios = pd.read_csv(caminho_csv_usuarios)
        for _, row in df_usuarios.iterrows():
            itens_comprados = set()
            for i in range(1, 6): # item1 a item5
                item_col = f'item{i}'
                if pd.notna(row[item_col]):
                    itens_comprados.add(row[item_col])
            usuarios_data[row["nome"]] = {
                "regiao": row["regiao"],
                "itens": itens_comprados
            }
    except FileNotFoundError:
        print(f"Erro: Arquivo de usu√°rios n√£o encontrado em {caminho_csv_usuarios}")
        return None
    return usuarios_data

def ler_info_produtos_corrigida(caminho_csv_produtos_associacao_simulado):
    """
    L√™ informa√ß√µes dos produtos do arquivo 'associacoes_produtos_simulado.csv'.
    Retorna um dicion√°rio onde a chave √© o nome_produto, agregando informa√ß√µes
    de todas as associa√ß√µes que o oferecem.
    """
    info_produtos_agregada = defaultdict(lambda: {
        "organico_geral": False, # True se qualquer associa√ß√£o o oferece como org√¢nico
        "sazonalidade": [], 
        "avaliacoes_coletadas": [], 
        "associacoes_ofertantes": set(),
        "avaliacao_media_geral": 3.0 # Default
    })
    try:
        df_produtos_assoc = pd.read_csv(caminho_csv_produtos_associacao_simulado)
        
        for _, row in df_produtos_assoc.iterrows():
            nome_produto = str(row["nome_produto"]).strip()
            if not nome_produto:
                continue

            # Atualiza se o produto √© org√¢nico em geral
            if int(row["produto_organico"]) == 1:
                info_produtos_agregada[nome_produto]["organico_geral"] = True
            
            # Agrega sazonalidade (sem duplicatas)
            saz_produto_atual = info_produtos_agregada[nome_produto]["sazonalidade"]
            if int(row["disponivel_primavera"]) == 1 and "primavera" not in saz_produto_atual:
                saz_produto_atual.append("primavera")
            if int(row["disponivel_verao"]) == 1 and "verao" not in saz_produto_atual:
                saz_produto_atual.append("verao")
            if int(row["disponivel_outono"]) == 1 and "outono" not in saz_produto_atual:
                saz_produto_atual.append("outono")
            if int(row["disponivel_inverno"]) == 1 and "inverno" not in saz_produto_atual:
                saz_produto_atual.append("inverno")
            
            if pd.notna(row["avaliacao_produto"]):
                try:
                    info_produtos_agregada[nome_produto]["avaliacoes_coletadas"].append(float(row["avaliacao_produto"]))
                except ValueError:
                    pass # Ignora avalia√ß√µes inv√°lidas

            info_produtos_agregada[nome_produto]["associacoes_ofertantes"].add(str(row["nome_associacao"]))
            
        # Calcula a m√©dia das avalia√ß√µes para cada produto
        for produto, dados in info_produtos_agregada.items():
            if dados["avaliacoes_coletadas"]:
                dados["avaliacao_media_geral"] = np.mean(dados["avaliacoes_coletadas"])
        
        print(f"Informa√ß√µes de {len(info_produtos_agregada)} produtos √∫nicos processadas de '{caminho_csv_produtos_associacao_simulado}'.")
        return info_produtos_agregada
                
    except FileNotFoundError:
        print(f"Erro: Arquivo de produtos das associa√ß√µes n√£o encontrado em {caminho_csv_produtos_associacao_simulado}")
        return None
    except Exception as e:
        print(f"Erro ao ler ou processar o arquivo de produtos das associa√ß√µes: {e}")
        # Imprimir o traceback para mais detalhes do erro durante o desenvolvimento
        import traceback
        traceback.print_exc()
        return None



In [None]:

from collections import Counter, defaultdict
import pandas as pd
import os
import numpy as np

def ler_usuarios_compras(caminho_csv_usuarios):
    """L√™ o hist√≥rico de compras dos usu√°rios. Retorna um dicion√°rio ou None em caso de erro."""
    usuarios_data = {}
    try:
        df_usuarios = pd.read_csv(caminho_csv_usuarios)
        for _, row in df_usuarios.iterrows():
            itens_comprados = set()
            for i in range(1, 6): 
                item_col = f'item{i}'
                if pd.notna(row[item_col]):
                    itens_comprados.add(str(row[item_col]).strip()) 
            usuarios_data[row["nome"]] = {
                "regiao": row["regiao"],
                "itens": itens_comprados
            }
        return usuarios_data
    except FileNotFoundError:
        
        return None
    except Exception:
        
        return None


def ler_info_produtos_corrigida(caminho_csv_produtos_associacao_simulado):
    """
    L√™ informa√ß√µes dos produtos do arquivo 'associacoes_produtos_simulado.csv'.
    Retorna um dicion√°rio (info_produtos_agregada) ou None em caso de erro.
    """
    info_produtos_agregada = defaultdict(lambda: {
        "organico_geral": False,
        "sazonalidade": [],
        "avaliacoes_coletadas": [],
        "associacoes_ofertantes": set(),
        "avaliacao_media_geral": 3.0
    })
    try:
        df_produtos_assoc = pd.read_csv(caminho_csv_produtos_associacao_simulado)
        for _, row in df_produtos_assoc.iterrows():
            nome_produto = str(row["nome_produto"]).strip()
            if not nome_produto:
                continue

            if int(row["produto_organico"]) == 1:
                info_produtos_agregada[nome_produto]["organico_geral"] = True
            
            saz_produto_atual = info_produtos_agregada[nome_produto]["sazonalidade"]
            if int(row["disponivel_primavera"]) == 1 and "primavera" not in saz_produto_atual:
                saz_produto_atual.append("primavera")
            if int(row["disponivel_verao"]) == 1 and "verao" not in saz_produto_atual:
                saz_produto_atual.append("verao")
            if int(row["disponivel_outono"]) == 1 and "outono" not in saz_produto_atual:
                saz_produto_atual.append("outono")
            if int(row["disponivel_inverno"]) == 1 and "inverno" not in saz_produto_atual:
                saz_produto_atual.append("inverno")
            
            if pd.notna(row["avaliacao_produto"]):
                try:
                    info_produtos_agregada[nome_produto]["avaliacoes_coletadas"].append(float(row["avaliacao_produto"]))
                except ValueError:
                    pass 
            info_produtos_agregada[nome_produto]["associacoes_ofertantes"].add(str(row["nome_associacao"]))
            
        for produto, dados in info_produtos_agregada.items():
            if dados["avaliacoes_coletadas"]:
                dados["avaliacao_media_geral"] = np.mean(dados["avaliacoes_coletadas"])
        
        return info_produtos_agregada
                
    except FileNotFoundError:
        return None
    except Exception:
        
        return None


def calcular_similaridade_usuarios_custom(usuario_alvo_data, outros_usuarios_data):
    similaridades = {}
    itens_alvo = usuario_alvo_data["itens"]
    regiao_alvo = usuario_alvo_data["regiao"]

    for nome_outro_usuario, dados_outro_usuario in outros_usuarios_data.items():
        intersecao = len(itens_alvo.intersection(dados_outro_usuario["itens"]))
        bonus_regiao = 0.5 if dados_outro_usuario["regiao"] == regiao_alvo else 0
        score_similaridade = intersecao + bonus_regiao
        similaridades[nome_outro_usuario] = score_similaridade
        
    usuarios_similares_ordenados = sorted(similaridades.items(), key=lambda item: item[1], reverse=True)
    return usuarios_similares_ordenados


def recomendar_itens_knn(
    usuario_alvo_nome,
    todos_usuarios_data,
    info_produtos, 
    k_vizinhos=3,
    estacao_atual=None, 
    usuario_prefere_organicos=False
):
    if not todos_usuarios_data or usuario_alvo_nome not in todos_usuarios_data:
        return [], {} 

    if not info_produtos:
        return [], {} 

    usuario_alvo_data = todos_usuarios_data[usuario_alvo_nome]
    outros_usuarios_data = {nome: dados for nome, dados in todos_usuarios_data.items() if nome != usuario_alvo_nome}

    usuarios_similares = calcular_similaridade_usuarios_custom(usuario_alvo_data, outros_usuarios_data)
    
    if not usuarios_similares:
        return [], {} 
        
    vizinhos = [nome for nome, score in usuarios_similares[:k_vizinhos] if score > 0]

    if not vizinhos:
        return [], {} 

    itens_recomendados_potenciais = Counter()
    itens_ja_comprados_pelo_alvo = usuario_alvo_data["itens"]

    for nome_vizinho in vizinhos:
        itens_vizinho = todos_usuarios_data[nome_vizinho]["itens"]
        for item in itens_vizinho:
            if item not in itens_ja_comprados_pelo_alvo and item in info_produtos:
                produto_info_catalogo = info_produtos[item]
                if estacao_atual and produto_info_catalogo.get("sazonalidade") and \
                   estacao_atual.lower() not in produto_info_catalogo["sazonalidade"]:
                    continue 
                itens_recomendados_potenciais[item] += 1
    
    if not itens_recomendados_potenciais:
        return [], {} # nenhum item novo dos vizinhos ap√≥s filtros

    contagem_global_filtrada = Counter()
    todos_itens_catalogados = set(info_produtos.keys())
    itens_possiveis_para_recomendar = todos_itens_catalogados - itens_ja_comprados_pelo_alvo
    
    for _, dados_usr in todos_usuarios_data.items(): 
        for item_comprado in dados_usr["itens"]:
            if item_comprado in itens_possiveis_para_recomendar and item_comprado in info_produtos:
                 produto_info_catalogo = info_produtos[item_comprado]
                 if estacao_atual and produto_info_catalogo.get("sazonalidade") and \
                    estacao_atual.lower() not in produto_info_catalogo["sazonalidade"]:
                     continue
                 contagem_global_filtrada[item_comprado] +=1
    
    mapa_organicos_reais = {prod: dados.get("organico_geral", False) for prod, dados in info_produtos.items()}

    def pontuar_item(item):
        if item not in info_produtos:
            return -1 

        produto_info_catalogo = info_produtos[item]
        score_vizinhos = itens_recomendados_potenciais.get(item, 0)
        score_popularidade_global = contagem_global_filtrada.get(item, 0)
        
        score_organico = 0
        if mapa_organicos_reais.get(item, False): 
            score_organico = 2 
            if usuario_prefere_organicos:
                score_organico += 1 

        avaliacao_media = produto_info_catalogo.get("avaliacao_media_geral", 3.0)

        pontuacao_final = (
            (5 * score_vizinhos) +              
            (2 * score_popularidade_global) +   
            (3 * score_organico) +              
            (1 * avaliacao_media)               
        )
        return pontuacao_final

    itens_finais_ordenados = sorted(
        list(itens_recomendados_potenciais.keys()), 
        key=lambda item: (-pontuar_item(item), item) 
    )

    return itens_finais_ordenados, mapa_organicos_reais

#  gerar_html_recomendacoes (definida, mas n√£o chamada por padr√£o)
def gerar_html_recomendacoes(
    usuario_nome, 
    recomendacoes_ordenadas, 
    mapa_organicos_reais,
    info_produtos, 
    k_vizinhos, 
    estacao_atual, 
    usuario_prefere_organicos, 
    caminho_saida_html
):
    """Gera um arquivo HTML com a lista de recomenda√ß√µes."""
    itens_html = ""
    if not recomendacoes_ordenadas:
        itens_html = "<p>Nenhuma recomenda√ß√£o encontrada com os crit√©rios atuais.</p>"
    else:
        for item in recomendacoes_ordenadas:
            if item not in info_produtos:
                
                continue

            detalhes_produto = info_produtos[item]
            tag_organico = " <strong>(Org√¢nico ‚úîÔ∏è)</strong>" if mapa_organicos_reais.get(item, False) else ""
            
            sazonalidade_info = ""
            saz_item_lista = detalhes_produto.get("sazonalidade", [])
            if saz_item_lista:
                saz_item_str = ", ".join(saz_item_lista).capitalize()
                sazonalidade_info = f" <span style='font-size:0.8em; color:gray;'> (Sazonalidade: {saz_item_str})</span>"
                if estacao_atual and estacao_atual.lower() in saz_item_lista:
                    sazonalidade_info = f" <span style='font-size:0.8em; color:green;'> (Na esta√ß√£o: {estacao_atual.capitalize()} ‚úîÔ∏è)</span>"
                elif estacao_atual:
                    sazonalidade_info = f" <span style='font-size:0.8em; color:red;'> (Fora da esta√ß√£o: {estacao_atual.capitalize()} ‚ùå)</span>"

            avaliacao_valor = detalhes_produto.get('avaliacao_media_geral', 'N/A')
            if isinstance(avaliacao_valor, (int, float)):
                avaliacao_formatada = f"{avaliacao_valor:.1f}"
            else:
                avaliacao_formatada = str(avaliacao_valor)
            avaliacao_info = f" <span style='font-size:0.8em; color:blue;'> (Avalia√ß√£o M√©dia: {avaliacao_formatada}/5)</span>"
            itens_html += f"<li>{item}{tag_organico}{avaliacao_info}{sazonalidade_info}</li>\n"

    filtros_aplicados = f"k-vizinhos: {k_vizinhos}"
    if estacao_atual:
        filtros_aplicados += f", Esta√ß√£o: {estacao_atual.capitalize()}"
    filtros_aplicados += f", Prefer√™ncia por Org√¢nicos: {'Sim' if usuario_prefere_organicos else 'N√£o'}"
   
    html_content = f"""
    <!DOCTYPE html>
    <html lang="pt-BR">
    <head>
        <meta charset="UTF-8">
        <title>Recomenda√ß√µes para {usuario_nome}</title>
        <style>
            body {{ font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; margin: 20px; background-color: #f4f7f6; color: #333; }}
            .container {{ background-color: #fff; padding: 20px; border-radius: 8px; box-shadow: 0 0 15px rgba(0,0,0,0.1); }}
            h1 {{ color: #2a8a51; border-bottom: 2px solid #2a8a51; padding-bottom: 10px;}}
            h2 {{ color: #3a5a70; }}
            ul {{ list-style-type: none; padding: 0; }}
            li {{ background-color: #e9f5ee; margin-bottom: 8px; padding: 12px; border-radius: 5px; border-left: 5px solid #2a8a51; transition: transform 0.2s; }}
            li:hover {{ transform: translateX(5px); }}
            strong {{ color: #1a683b; font-weight: bold; }}
            .filters {{ font-size: 0.9em; color: #555; margin-bottom:15px; padding:10px; background-color:#eee; border-radius:5px;}}
        </style>
    </head>
    <body>
        <div class="container">
            <h1>üåø Recomenda√ß√µes Personalizadas para {usuario_nome} üåø</h1>
            <div class="filters">
                <strong>Filtros Aplicados:</strong> {filtros_aplicados}
            </div>
            <h2>Itens Recomendados:</h2>
            <ul>
                {itens_html}
            </ul>
        </div>
    </body>
    </html>
    """
    
    try:
        os.makedirs(os.path.dirname(caminho_saida_html), exist_ok=True)
        with open(caminho_saida_html, "w", encoding="utf-8") as f:
            f.write(html_content)
        
    except Exception:
        
      
        pass 


if __name__ == "__main__":
    print("Executando modo de teste para RecomendacaoItensKNN...") 
    
    CAMINHO_BASE_DADOS_TESTE = os.path.join("..", "data")
    if not os.path.exists(CAMINHO_BASE_DADOS_TESTE): 
        CAMINHO_BASE_DADOS_TESTE = "data"

    ARQUIVO_USUARIOS_TESTE = os.path.join(CAMINHO_BASE_DADOS_TESTE, "usuarios_compras_atualizado.csv")
    ARQUIVO_PRODUTOS_TESTE = os.path.join(CAMINHO_BASE_DADOS_TESTE, "associacoes_produtos_simulado.csv")

    # carregar dados para teste!!!
    test_usuarios_data = ler_usuarios_compras(ARQUIVO_USUARIOS_TESTE)
    test_info_produtos = ler_info_produtos_corrigida(ARQUIVO_PRODUTOS_TESTE)

    if test_usuarios_data and test_info_produtos:
        # Param de etste
        test_usuario_nome = "usuario0001" #  user existente para teste
        test_k = 5
        test_estacao = "primavera"
        test_pref_organicos = True

        if test_usuario_nome in test_usuarios_data:
            recomendacoes, mapa_org = recomendar_itens_knn(
                test_usuario_nome,
                test_usuarios_data,
                test_info_produtos,
                k_vizinhos=test_k,
                estacao_atual=test_estacao,
                usuario_prefere_organicos=test_pref_organicos
            )

            
            print(f"\n--- Teste: Recomenda√ß√µes para {test_usuario_nome} ---")
            if recomendacoes:
                for i, item in enumerate(recomendacoes[:5]): # Mostrar top 5
                    org_text = " (Org√¢nico)" if mapa_org.get(item, False) else ""
                    print(f"{i+1}. {item}{org_text}")
            else:
                print("Nenhuma recomenda√ß√£o de teste gerada.")
        else:
            print(f"Usu√°rio de teste '{test_usuario_nome}' n√£o encontrado.")
    else:
        print("Falha ao carregar dados de teste. Verifique os caminhos e arquivos.")
    print("Fim do modo de teste.")

Executando modo de teste para RecomendacaoItensKNN...

--- Teste: Recomenda√ß√µes para usuario0001 ---
1. Manga (Org√¢nico)
2. Abacate (Org√¢nico)
3. Mam√£o (Org√¢nico)
4. Chuchu (Org√¢nico)
5. Banana (Org√¢nico)
Fim do modo de teste.
