# Functions to be stored in a class

1. Normalized Gini Coefficient (gini_normalizado)
We'll convert the gini_normalizado function to work with PySpark DataFrames. Since the Gini coefficient calculation involves sorting and cumulative sums, we'll use window functions in PySpark.

In [1]:
from pyspark.sql import Window
from pyspark.sql.functions import col, sum as spark_sum, desc, asc, row_number

def gini_normalizado(spark_df, actual_col, pred_col):
    """
    Calculates the normalized Gini coefficient using PySpark DataFrame.

    :param spark_df: PySpark DataFrame containing actual and predicted values.
    :param actual_col: Name of the column with actual values.
    :param pred_col: Name of the column with predicted values.
    :return: Normalized Gini coefficient.
    """
    # Total number of records
    n = spark_df.count()
    
    # Create a window for ordering by predicted values
    window = Window.orderBy(desc(pred_col), asc("row_idx"))
    
    # Add a row index to maintain the original order
    df = spark_df.select(actual_col, pred_col).withColumn("row_idx", row_number().over(Window.orderBy(lit(1))))
    
    # Calculate cumulative sum of actual values
    df = df.withColumn("cumulative_actual", spark_sum(col(actual_col)).over(window))
    
    # Calculate total actual values
    total_actual = df.agg(spark_sum(actual_col).alias("total_actual")).collect()[0]["total_actual"]
    
    # Calculate Gini numerator
    df = df.withColumn("lorentz", col("cumulative_actual") / total_actual)
    gini_sum = df.agg(spark_sum("lorentz").alias("gini_sum")).collect()[0]["gini_sum"]
    
    # Adjust Gini sum
    gini_sum -= (n + 1) / 2.0
    
    # Calculate normalized Gini
    normalized_gini = gini_sum / n
    
    return normalized_gini


In [None]:
# Assuming 'df' is your DataFrame with 'actual' and 'predicted' columns
gini = gini_normalizado(df, 'actual', 'predicted')
print(f"Normalized Gini Coefficient: {gini}")


2. Generate Metadata (pod_academy_generate_metadata)
This function generates metadata about the DataFrame columns, such as null counts, percentages, and cardinality.

In [None]:
from pyspark.sql.functions import col, sum, when
from pyspark.sql import Row

# # Contar valores nulos em cada coluna
# valores_nulos = df_dm.select([sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_dm.columns])
# valores_nulos.show()
# Contar valores nulos em cada coluna e coletar os resultados

# Contar o número total de linhas no DataFrame
total_linhas = df_dm.count()

# Contar valores nulos em cada coluna e coletar os resultados
valores_nulos = df_dm.select([
    sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df_dm.columns
])

# Coletar os resultados como um dicionário
nulos_dict = valores_nulos.collect()[0].asDict()

# Converter o dicionário em uma lista de Rows, adicionando o percentual de valores nulos
nulos_lista = [
    Row(
        Coluna=k, 
        Valores_Nulos=v, 
        Porcentagem_Nulos=round((v / total_linhas) * 100, 2)
    ) for k, v in nulos_dict.items()
]

# Criar um DataFrame Spark a partir da lista
valores_nulos_df = spark.createDataFrame(nulos_lista)

# Exibir os valores nulos e percentuais por coluna
valores_nulos_df.show(valores_nulos_df.count(), truncate=False)


In [None]:
metadata_df = pod_academy_generate_metadata(df)
metadata_df.show(truncate=False)


3. Generate Metadata with Usage (pod_academy_generate_metadata with IDs and Targets)

In [None]:
from pyspark.sql.functions import col, isnan, when, countDistinct

def pod_academy_generate_metadata(spark_df, ids, targets, orderby='PC_NULOS'):
    """
    Returns a table with descriptive information about a PySpark DataFrame.

    :param spark_df: PySpark DataFrame to describe.
    :param ids: List of columns that are identifiers.
    :param targets: List of columns that are target variables.
    :param orderby: Column name to order by.
    :return: PySpark DataFrame with descriptive information.
    """
    total_rows = spark_df.count()
    summary_list = []
    
    for column in spark_df.columns:
        uso_feature = 'ID' if column in ids else 'Target' if column in targets else 'Explicativa'
        qt_nulos = spark_df.filter(col(column).isNull() | isnan(col(column))).count()
        pc_nulos = (qt_nulos / total_rows) * 100
        cardinalidade = spark_df.select(countDistinct(col(column))).collect()[0][0]
        tipo_feature = str(spark_df.schema[column].dataType)
        
        summary_list.append((column, uso_feature, qt_nulos, round(pc_nulos, 2), cardinalidade, tipo_feature))
    
    summary_df = spark.createDataFrame(summary_list, ['FEATURE', 'USO_FEATURE', 'QT_NULOS', 'PC_NULOS', 'CARDINALIDADE', 'TIPO_FEATURE'])
    
    # Sort by specified column
    summary_df = summary_df.orderBy(col(orderby).desc())
    
    return summary_df


In [None]:
ids = ['id_column']
targets = ['target_column']
summary_df = pod_academy_generate_metadata(df, ids, targets, orderby='PC_NULOS')
summary_df.show(truncate=False)


4. Custom Fill NA (pod_custom_fillna)
This function replaces -1 with null and fills missing values for numerical and categorical columns.

In [None]:
from pyspark.sql.functions import col, when, mean as spark_mean, lit
from pyspark.sql.types import NumericType, StringType

def pod_custom_fillna(spark_df):
    """
    Replaces missing values in the DataFrame.

    - Replaces -1 with null.
    - Fills numerical columns with the mean.
    - Fills categorical columns with 'POD_VERIFICAR'.

    :param spark_df: PySpark DataFrame to process.
    :return: Tuple of (Processed DataFrame, Dictionary of means).
    """
    # Replace -1 with null
    df = spark_df.replace(-1, None)
    
    numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
    categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    
    means = {}
    
    # Compute means for numerical columns
    for col_name in numerical_cols:
        mean_value = df.select(spark_mean(col(col_name))).first()[0]
        means[col_name] = mean_value
        df = df.withColumn(col_name, when(col(col_name).isNull(), lit(mean_value)).otherwise(col(col_name)))
    
    # Fill categorical columns with 'POD_VERIFICAR'
    df = df.fillna('POD_VERIFICAR', subset=categorical_cols)
    
    return df, means


In [None]:
df_filled, means = pod_custom_fillna(df)


5. Custom Fill NA for Production (pod_custom_fillna_prod)
This function fills missing values using provided means (useful in production environments).

In [None]:
from pyspark.sql.functions import col, when, lit
from pyspark.sql.types import NumericType, StringType

def pod_custom_fillna_prod(spark_df, means):
    """
    Fills missing values in the DataFrame using provided means.

    :param spark_df: PySpark DataFrame to process.
    :param means: Dictionary of means for numerical columns.
    :return: Processed DataFrame.
    """
    # Replace -1 with null
    df = spark_df.replace(-1, None)
    
    numerical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, NumericType)]
    categorical_cols = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]
    
    # Fill numerical columns with provided means
    for col_name in numerical_cols:
        mean_value = means.get(col_name)
        if mean_value is not None:
            df = df.withColumn(col_name, when(col(col_name).isNull(), lit(mean_value)).otherwise(col(col_name)))
    
    # Fill categorical columns with 'POD_VERIFICAR'
    df = df.fillna('POD_VERIFICAR', subset=categorical_cols)
    
    return df


In [None]:
# Assuming 'means' is the dictionary obtained from 'pod_custom_fillna'
df_prod_filled = pod_custom_fillna_prod(df_new, means)


6. Evaluate Model (avaliar_modelo)
Converting the avaliar_modelo function is a bit more complex because PySpark does not directly support plotting with Matplotlib. However, we can perform the calculations in PySpark and collect the results for plotting.

Important Notes:

Since plotting is not natively supported in PySpark, we collect the necessary data to the driver node for plotting.
Be cautious when collecting data; ensure that the datasets are small enough to fit into memory.
The plotting code would remain largely the same as your original function, using the collected data.

In [None]:
from pyspark.sql.functions import col, when
from pyspark.sql.window import Window
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegressionModel
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def avaliar_modelo(df_train, df_test, features_col, label_col, modelo, nm_modelo):
    """
    Evaluates the model by computing various metrics and plotting graphs.

    :param df_train: PySpark DataFrame for training data.
    :param df_test: PySpark DataFrame for test data.
    :param features_col: Name of the features column (assembled vector).
    :param label_col: Name of the label column.
    :param modelo: Trained PySpark model.
    :param nm_modelo: Name of the model.
    """
    # Predict on training and test data
    predictions_train = modelo.transform(df_train)
    predictions_test = modelo.transform(df_test)
    
    # Convert predictions to Pandas DataFrames
    y_train = predictions_train.select(label_col).toPandas()
    y_pred_train = predictions_train.select('prediction').toPandas()
    y_score_train = predictions_train.select('probability').rdd.map(lambda x: float(x['probability'][1])).collect()
    
    y_test = predictions_test.select(label_col).toPandas()
    y_pred_test = predictions_test.select('prediction').toPandas()
    y_score_test = predictions_test.select('probability').rdd.map(lambda x: float(x['probability'][1])).collect()
    
    # Import necessary sklearn metrics
    from sklearn.metrics import confusion_matrix, roc_curve, precision_recall_curve, roc_auc_score
    
    # Proceed to plotting as in your original function, using the collected data
    # ... (Include your plotting code here, using y_train, y_pred_train, y_score_train, etc.)
    # For example:

    # Confusion Matrix - Training
    cm_train = confusion_matrix(y_train, y_pred_train)
    # Similarly for test data and other metrics
    
    # Create the plots as before using Matplotlib
    # Note: Ensure that the data size is manageable for collecting to the driver

    # (Plotting code should replicate the functionality of your original function)


8. Calculate KS Statistic (calcular_ks_statistic)
We'll use PySpark window functions to compute the KS statistic.

In [None]:
from pyspark.sql.functions import col, when, sum as spark_sum, desc, lit
from pyspark.sql import Window


def calcular_ks_statistic(spark_df, label_col, score_col):
    """
    Calculates the KS statistic using a PySpark DataFrame.

    :param spark_df: PySpark DataFrame containing actual labels and scores.
    :param label_col: Name of the label column.
    :param score_col: Name of the score (prediction probability) column.
    :return: KS statistic.
    """
    
    total_events = spark_df.filter(col(label_col) == 1).count()
    total_non_events = spark_df.filter(col(label_col) == 0).count()
    
    # Create cumulative counts
    window = Window.orderBy(desc(score_col)).rowsBetween(Window.unboundedPreceding, 0)
    
    df = spark_df.withColumn('event', when(col(label_col) == 1, 1).otherwise(0))
    df = df.withColumn('non_event', when(col(label_col) == 0, 1).otherwise(0))
    
    df = df.withColumn('cum_event', spark_sum('event').over(window))
    df = df.withColumn('cum_non_event', spark_sum('non_event').over(window))
    
    df = df.withColumn('cum_event_rate', col('cum_event') / total_events)
    df = df.withColumn('cum_non_event_rate', col('cum_non_event') / total_non_events)
    
    # Calculate KS statistic
    df = df.withColumn('ks', abs(col('cum_event_rate') - col('cum_non_event_rate')))
    ks_statistic = df.agg({'ks': 'max'}).collect()[0][0]
    
    return ks_statistic


In [None]:
# Assuming 'df' has 'label' and 'score' columns
ks_stat = calcular_ks_statistic(df, 'label', 'score')
print(f"KS Statistic: {ks_stat}")
