In [None]:
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
import mlflow

# --- 1. Inicialização da Sessão Spark ---
# Configura a conexão com o cluster Spark, MinIO e habilita o Delta Lake.
print("--- 1/6: Inicializando a Sessão Spark ---")
spark = (
    SparkSession.builder.appName("AnaliseChurnComMLflow")
    .master("spark://spark-master:7077")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000")
    .config("spark.hadoop.fs.s3a.access.key", "admin")
    .config("spark.hadoop.fs.s3a.secret.key", "mysecretpassword")
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .getOrCreate()
)
print("Sessão Spark iniciada com sucesso!")

# --- 2. Geração de Dados Genéricos ---
# Criamos um DataFrame Pandas com dados fictícios de clientes.
print("\n--- 2/6: Gerando dados de clientes (churn) ---")
dados_ficticios = {
    'cliente_id': range(1, 101),
    'idade': [_ for _ in range(20, 70)] * 2,
    'tempo_de_uso_meses': [int(i/2) + 1 for i in range(100)],
    'valor_mensal': [50 + (i % 20) * 5 for i in range(100)],
    # Clientes com mais tempo de uso e valor mais alto têm menos chance de cancelar
    'churn': [1 if i < 20 or (i % 10 == 0 and i > 50) else 0 for i in range(100)]
}
pandas_df = pd.DataFrame(dados_ficticios)
spark_df = spark.createDataFrame(pandas_df)
print("Dados gerados. Amostra:")
spark_df.show(5)

# --- 3. Salvando a Tabela no MinIO (Data Lake) ---
# O bucket é 'bases' e a tabela se chamará 'dados_clientes'
print("\n--- 3/6: Salvando dados como tabela Delta no MinIO ---")
delta_path = "s3a://bases/dados_clientes"
spark_df.write.format("delta").mode("overwrite").save(delta_path)
print(f"Tabela salva com sucesso em: {delta_path}")

# --- 4. Carregando a Tabela do MinIO ---
# Confirmamos que os dados podem ser lidos de volta.
print("\n--- 4/6: Lendo dados da tabela Delta ---")
df_lido = spark.read.format("delta").load(delta_path)
print(f"Total de {df_lido.count()} registros lidos do MinIO.")

# --- 5. Análise e Treinamento com MLflow ---
print("\n--- 5/6: Treinando modelo de Machine Learning e registrando no MLflow ---")

# Conecta ao servidor MLflow que está rodando no Docker
mlflow.set_tracking_uri("http://mlflow:5000")

# Define um nome para o nosso "projeto" de experimentos
experiment_name = "Previsao_Churn_Clientes"
mlflow.set_experiment(experiment_name)

# Converte de volta para Pandas para usar com Scikit-learn
df_treino = df_lido.toPandas()

# Separa os dados em features (X) e alvo (y)
features = ['idade', 'tempo_de_uso_meses', 'valor_mensal']
target = 'churn'
X_train, X_test, y_train, y_test = train_test_split(df_treino[features], df_treino[target], test_size=0.3, random_state=42)

# Inicia uma "run" no MLflow para registrar tudo
with mlflow.start_run() as run:
    print(f"Iniciando run do MLflow: {run.info.run_id}")
    
    # Parâmetros do modelo
    C_param = 1.0
    solver_param = 'liblinear'
    
    # Cria e treina o modelo de Regressão Logística
    model = LogisticRegression(C=C_param, solver=solver_param)
    model.fit(X_train, y_train)
    
    # Faz previsões e calcula a acurácia
    predictions = model.predict(X_test)
    accuracy = accuracy_score(y_test, predictions)
    
    # Registra tudo no MLflow
    mlflow.log_param("regularization_strength_C", C_param)
    mlflow.log_param("solver", solver_param)
    mlflow.log_metric("accuracy", accuracy)
    
    # Salva o modelo treinado como um artefato no MLflow (que por sua vez salva no MinIO)
    mlflow.sklearn.log_model(model, "modelo_regressao_logistica")
    
    print(f"Acurácia do modelo: {accuracy:.2f}")
    print("Parâmetros, métricas e modelo registrados no MLflow!")

# --- 6. Finalização ---
print("\n--- 6/6: Encerrando a sessão Spark ---")
spark.stop()
print("Processo concluído!")