# Pipeline de Churn Prediction

Este notebook simula a transição da camada **Silver** para a **Gold**, aplicando engenharia de atributos (Feature Engineering) e treinando um modelo de Machine Learning para prever o cancelamento de clientes (Churn). Foi otimizado para a infraestrutura do Databricks Free Edition.

## 1. Instalação e Importação de Bibliotecas
O Databricks já possui o PySpark nativamente, mas vamos importar as funções necessárias para o nosso pipeline de ML.

In [0]:
from pyspark.sql.functions import col, datediff, current_date, to_date
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

## 2. Ingestão de Dados (Camada Silver)
Nesta etapa, carregamos os dados já limpos e estruturados da nossa tabela (ou arquivo CSV) na camada Silver.

In [0]:
# Lendo o CSV mapeado no Unity Catalog / Volumes (Camada Silver)
file_path = "/Volumes/workspace/voc/churn/churn_silver_2025.csv"

df_silver = spark.read.csv(file_path, header=True, inferSchema=True)
display(df_silver.limit(5))

## 3. Tratamento e Feature Engineering (Silver -> Gold)
Criei novas variáveis (features) que ajudarão o algoritmo a encontrar padrões.
* **taxa_uso_valor**: Relação entre os logs de uso e a mensalidade paga.
* **dias_desde_assinatura**: O tempo de vida (tenure) do cliente na base.

In [0]:
df_gold_prep = df_silver.withColumn(
    "taxa_uso_valor", 
    col("total_logs_app_30d") / (col("valor_mensalidade") + 0.01) # Evita divisão por zero
).withColumn(
    "dias_desde_assinatura", 
    datediff(current_date(), to_date(col("data_assinatura")))
).na.drop() # Tratamento básico de eventuais nulos

display(df_gold_prep.limit(5))

## 4. Tratamento manual (Bypass de StringIndexer)

In [0]:

from pyspark.sql.functions import col, when

# Em vez de StringIndexer, usamos mapeamento direto via Spark SQL (Não bloqueado)
# Isso transforma as categorias em números manualmente
categorias = [row[0] for row in df_gold_prep.select("categoria_principal_voc").distinct().collect()]
df_gold_indexed = df_gold_prep
for i, cat in enumerate(categorias):
    df_gold_indexed = df_gold_indexed.withColumn(
        "categoria_indexada", 
        when(col("categoria_principal_voc") == cat, float(i)).otherwise(col("categoria_indexada") if "categoria_indexada" in df_gold_indexed.columns else 0.0)
    )

## 5. Modelagem (Usando Scikit-Learn para evitar o bloqueio do Py4J)

In [0]:
# Em clusters Shared, o RandomForest do pyspark.ml costuma ser bloqueado.

import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score, accuracy_score

# Convertendo para Pandas apenas para o treinamento do modelo
pdf = df_gold_indexed.select(
    "valor_mensalidade", "total_logs_app_30d", "tickets_suporte_abertos", 
    "score_sentimento_voc", "taxa_uso_valor", "dias_desde_assinatura", 
    "categoria_indexada", "churn", "id_cliente"
).toPandas()

# Definindo Features e Target
X = pdf.drop(['churn', 'id_cliente'], axis=1)
y = pdf['churn']

# Split simples
from sklearn.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Treinando o Random Forest (Sklearn não sofre restrição de 'whitelist' do Py4J)
rf_model = RandomForestClassifier(n_estimators=100, max_depth=5, random_state=42)
rf_model.fit(X_train, y_train)

## 6. Avaliação do Modelo e saída da tabela Gold
Testamos a qualidade da nossa IA em dados não vistos previamente utilizando duas métricas de classificação robustas.

In [0]:

y_pred = rf_model.predict(X_test)
y_prob = rf_model.predict_proba(X_test)[:, 1]

acc = accuracy_score(y_test, y_pred)
auc = roc_auc_score(y_test, y_prob)

print(f" Modelo Treinado no Cluster Shared!")
print(f" Acurácia: {acc:.4f} | AUC-ROC: {auc:.4f}")

# Criando o DataFrame Final (Gold) de volta para Spark
pdf_results = pd.DataFrame({
    'id_cliente': pdf.loc[X_test.index, 'id_cliente'],
    'probabilidade_churn': y_prob,
    'previsao_final': y_pred
})

churn_predictions_gold = spark.createDataFrame(pdf_results)
display(churn_predictions_gold)

### Conclusão
O pipeline foi executado com sucesso! 
Os dados brutos (Silver) foram higienizados e transformados matematicamente (Feature Eng). Após serem indexados e vetorizados, passaram por um classificador de Floresta Aleatória que identificou o padrão de evasão. Agora possuímos um Dataframe na camada Gold enriquecido com a probabilidade de evasão, pronto para ser plugado em painéis ou fluxos de CRM de retenção.