# Importando bibliotecas e configurando ambiente

In [None]:
# PySpark SQL
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, when, isnan, regexp_replace, round as spark_round
from pyspark.sql.types import StringType, IntegerType, FloatType, DecimalType

# Pacotes para Machine Learning com PySpark
from pyspark.ml.feature import VectorAssembler, StringIndexer   # Para montar features
from pyspark.ml.classification import DecisionTreeClassifier  # Algoritmo de regressão logística
from pyspark.ml.evaluation import MulticlassClassificationEvaluator  # Avaliação de performance
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator   # Validação cruzada e ajuste de parâmetros

#Bibliotecas do Python
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from scipy import stats
import numpy as np

# Criar uma nova sessão do Spark fora do ambiente Databricks
spark = SparkSession.builder \
    .appName("ML Score") \
    .master("local[*]") \
    .getOrCreate()


data = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load("dataset_credito.csv")


df= data

# Funções

In [None]:
def remove_outlier(df, column, rm_out=False):
    # Obtem os quartis com precisão
    summary_stats = df.select(column).summary("25%", "75%").toPandas()
    Q1 = float(summary_stats[summary_stats['summary'] == '25%'][column])
    Q3 = float(summary_stats[summary_stats['summary'] == '75%'][column])
    
    # Intervalo Interquartil
    IQR = Q3 - Q1
    
    # Limites superior e inferior
    lower_bound = Q1 - 1.5 * IQR
    upper_bound = Q3 + 1.5 * IQR
    
    # Filtrar sem outliers
    df_no_outliers = df.filter((col(column) >= lower_bound) & (col(column) <= upper_bound))
    
    # Contar número de outliers
    outliers = df.count() - df_no_outliers.count()
    
    # Se rm_out for True, retornar DataFrame sem outliers
    if rm_out:
        return (outliers, df_no_outliers)
    else:
        return outliers
    
    
def numerization(df, column):
    column_indexered = StringIndexer(inputCol=f"{column}", outputCol=f"{column}_indexered")

    df_indexered = column_indexered.fit(df).transform(df)
    
    return df_indexered

# Tratando os dados

In [None]:
df.sample(withReplacement=False, fraction=0.1).limit(5).toPandas()

In [None]:
df.printSchema()

<center> <ins>Tratar e converter dados numéricos para o tipo apropriado<ins/> <center/>

In [None]:
# Converter strings de valores monetários para FloatType
df = df \
    .withColumn(
        "valor_transacoes_12m", 
        regexp_replace(regexp_replace(col("valor_transacoes_12m"), "\\.", ""), ",", ".").cast(FloatType())
    ) \
    .withColumn(
        "limite_credito", 
        regexp_replace(regexp_replace(col("limite_credito"), "\\.", ""), ",", ".").cast(FloatType())
)

In [None]:
df.printSchema()

<center> <ins> Conferindo valores nulos ou NaN <ins/> <center/>

In [None]:
df.select([sum(when(col(c).isNull() | isnan(c), 1).otherwise(0)).alias(c) for c in df.columns]).toPandas()

<center> <ins> Retirando os valores "na" <ins/> <center/>

In [None]:
for x in df.columns:
    print(f"{x}  -  {df.filter(col(x) == "na").count()}")

In [None]:
# Trocando os valores "na" para "Desconhecido"
for column in ["escolaridade", "estado_civil", "salario_anual"]:
    df = df.withColumn(column, when(col(column) == "na", "Desconhecido").otherwise(col(column)))

In [None]:
for x in df.columns:
    print(f"{x}  ->  {df.filter(col(x) == "na").count()}")

# Teste de normalidade

## Teste de Lilliefors (Kolmogorov-Smirnov)

In [None]:
df_KS = remove_outlier(df, "limite_credito", rm_out=True)[1]
df_KS = remove_outlier(df, "valor_transacoes_12m", rm_out=True)[1]

In [None]:
data = df_KS.select("limite_credito", "valor_transacoes_12m").toPandas()

for column in data.columns:
    values = data[column].dropna()
    
    # Parâmetros da distribuição normal teórica
    mu, std = np.mean(values), np.std(values)
    
    # Criar a distribuição normal teórica
    normal_dist = stats.norm(mu, std).cdf(values)
    
    # Aplicar o teste Kolmogorov-Smirnov
    stat, p_value = stats.kstest(values, lambda x: stats.norm.cdf(x, mu, std))

    # Exibir os resultados formatados
    print("###############################")
    print(f"Variável: {column}")
    print(f"Estatística K-S: {stat}")
    print(f"P-valor: {p_value}")
    
    if p_value > 0.05:
        print("Interpretação: A hipótese de normalidade é aceita.")
    else:
        print("Interpretação: A hipótese de normalidade é rejeitada.")
    
    print("###############################\n")
    

# DecisionTree

In [None]:
# Numerizando os valores categóricos
for column in df.columns[4:9]:
    df = numerization(df, column)

# Isolamento dos dados que serão utilizados como vetor
columns = [
     'default',
     'dependentes_indexered',
     'escolaridade_indexered',
     'estado_civil_indexered',
     'salario_anual_indexered',
     'tipo_cartao_indexered',
     'meses_de_relacionamento',
     'qtd_produtos',
     'iteracoes_12m',
     'meses_inativo_12m',
     'limite_credito',
     'valor_transacoes_12m',
     'qtd_transacoes_12m'
]

df_Vector = df.select(columns)

# Criando o vetor das colunas
assembler = VectorAssembler(inputCols=columns, outputCol="vector")

# Aplicando ao df isolado
df_Vector = assembler.transform(df_Vector)
# df_Vector.select("vector", "default").show(truncate=True) <- vizualização

# Separação do conjunto de treino para o de teste
train_data, test_data = df_Vector.randomSplit([0.7, 0.3], seed=42)
# train_data.select("vector", "default").s"how(5, truncate=False)  <- vizualização

# Treinando o modelo
dt = DecisionTreeClassifier(labelCol="default", featuresCol="vector")

dt_model = dt.fit(train_data)

prediction = dt_model.transform(test_data)

prediction.select("vector", "default", "prediction").show()

In [None]:
# Avaliação do Modelo
evalutator = MulticlassClassificationEvaluator(
    labelCol="default", predictionCol="prediction", metricName="accuracy"
)

accuracy = evalutator.evaluate(prediction)
print(f"Acurácia do Modelo: {accuracy}")