In [200]:

from pyspark.sql import SparkSession
spark = SparkSession.\
        builder.\
        appName("pyspark-nb-1").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        config("spark.eventLog.enabled", "true").\
        config("spark.eventLog.dir", "file:///opt/workspace/events").\
        getOrCreate()

In [201]:
# Only log errors at ERROR, skip INFO  and WARN
spark.sparkContext.setLogLevel("ERROR")

In [202]:
# import date_format fuction and col ("return a column") function
from pyspark.sql.functions import date_format, col
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType


In [203]:
import os
current_path = os.getcwd()
print("Current Working Directory:", current_path)


Current Working Directory: /opt/workspace/notebooks/algorithia


In [204]:
import os

file_path = "/opt/workspace/datain/algorithia/example_1000.csv"

print("Does the file exist?", os.path.exists(file_path))

Does the file exist? True


In [205]:
# Load data

# Cargar datos en un DataFrame Spark
df = spark.read.format("csv").option("header", "true").load(file_path)


#df = spark.read.option("inferSchema", True).option("header", True).csv("/opt/workspace/notebooks/algorithia/addresses.csv")

                                                                                

In [206]:
df

DataFrame[id_master: string, cod_mov: string, mto_operacion: string, fec_operacion: string, tm_fec_operacion: string, num_periodo_mes: string]

In [207]:
df.count()

1000

In [208]:
df.show(30)

+---------+--------------------+-------------+-------------+--------------------+---------------+
|id_master|             cod_mov|mto_operacion|fec_operacion|    tm_fec_operacion|num_periodo_mes|
+---------+--------------------+-------------+-------------+--------------------+---------------+
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-05-22|2023-05-22 11:03:...|         202305|
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-04-24|2023-04-24 11:04:...|         202304|
| 68704981|       NOMINA_COD_N2|      1455.94|   2023-04-03|2023-04-03 13:10:...|         202304|
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-08-28|2023-08-28 15:37:...|         202308|
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-06-19|2023-06-19 12:37:...|         202306|
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-05-29|2023-05-29 13:55:...|         202305|
| 68704981|       NOMINA_COD_N2|      1503.94|   2023-04-29|2023-04-29 10:33:...|         202304|
| 68704981|       NO

In [209]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *
import pyspark.sql.functions as F

def limits_calculation(df):
    #window_spec = Window.partitionBy("id_master","cod_mov").orderBy("tm_fec_operacion")
    window_spec = Window.partitionBy("id_master","cod_mov")
    # Calculate the lower and upper bounds for each partition
    quantile_cols = F.expr("percentile_approx(dias_dif, array(0.25, 0.5, 0.75))")
    df_with_quantiles = df.withColumn("quantiles", quantile_cols.over(window_spec))

    # Extract quantiles
    df_with_bounds = df_with_quantiles.withColumn("Q1", df_with_quantiles.quantiles[0])\
                                     .withColumn("Q2", df_with_quantiles.quantiles[1])\
                                     .withColumn("Q3", df_with_quantiles.quantiles[2])\
                                     .withColumn("IQR", df_with_quantiles.quantiles[2] - df_with_quantiles.quantiles[0])\
                                     .withColumn("lower_bound_dias_dif", F.when(df_with_quantiles.quantiles[0] == df_with_quantiles.quantiles[2], df_with_quantiles.quantiles[0] - 5)\
                                                                         .otherwise(df_with_quantiles.quantiles[1] - 1.5 * (df_with_quantiles.quantiles[2] - df_with_quantiles.quantiles[0])))\
                                     .withColumn("upper_bound_dias_dif", F.when(df_with_quantiles.quantiles[0] == df_with_quantiles.quantiles[2], df_with_quantiles.quantiles[0] + 5)\
                                                                         .otherwise(df_with_quantiles.quantiles[1] + 1.5 * (df_with_quantiles.quantiles[2] - df_with_quantiles.quantiles[0])))

    # Drop unnecessary columns
    #df_with_bounds = df_with_bounds.drop("quantiles", "Q1", "Q2", "Q3", "IQR")

    return df_with_bounds




def processing_dataset(df):
    df = df.withColumn("tm_fec_operacion", F.to_timestamp("fec_operacion"))
    window_spec = Window.partitionBy("id_master","cod_mov").orderBy("tm_fec_operacion")

    df = df.withColumn("dias_dif", F.datediff("tm_fec_operacion", F.lag("tm_fec_operacion").over(window_spec)))
    df = df.withColumn("num_txm_sin_filtro_outliers", F.count("cod_mov").over(Window.partitionBy("id_master","cod_mov")))

    # Calculate limits and add them to the DataFrame
    #df_with_bounds = limits_calculation(df.select("id_master","cod_mov","tm_fec_operacion","dias_dif","num_txm_sin_filtro_outliers","mto_operacion"))
    df_with_bounds = limits_calculation(df)

    # Calculate the percentage change of the amount compared to the previous movement
    df_with_bounds = df_with_bounds.withColumn("porcentaje_dif_monto", F.abs((F.col("mto_operacion") / F.lag("mto_operacion").over(window_spec)) - 1))

    return df_with_bounds




In [210]:
def remove_outliers(df):
    
    specific_id_master='20347214'
    specific_cod_mov='NOMINA_TRANSFERENCIA'
    specific_df = df.filter((df["id_master"] == specific_id_master) & (df["cod_mov"] == specific_cod_mov))
    
    # Count the number of rows before filtering
    count_before = specific_df.count()

    #remove outliers
    df_filtered = df.filter((df["dias_dif"] >= df["lower_bound_dias_dif"]) & (df["dias_dif"] <= df["upper_bound_dias_dif"]))
    
    # Count the number of rows after filtering for the specific id_master and cod_mov
    count_after = df_filtered.filter((df_filtered["id_master"] == specific_id_master) & (df_filtered["cod_mov"] == specific_cod_mov)).count()

    # Print the count before and after
    print(f"For id_master={specific_id_master}, cod_mov={specific_cod_mov}:")
    print(f"Number of rows before filtering: {count_before}")
    print(f"Number of rows after filtering: {count_after}")

    return df_filtered



In [211]:
def dataset_processing_without_outliers(df, window_spec):
    # Obtener nuevamente el delta en días con el df filtrado de outliers
    # Quedarse con el mínimo entre el nuevo delta y el original sin filtrar outliers (de los registros filtrados)
    df = df.withColumn("dias_dif_pre2", F.datediff("tm_fec_operacion", F.lag("tm_fec_operacion").over(window_spec)).cast(IntegerType()))
    df = df.withColumn("dias_dif_pre2", F.when(F.isnan(df["dias_dif_pre2"]), df["dias_dif"]).otherwise(df["dias_dif_pre2"]))

    # Quedarse con el mínimo entre el nuevo delta y el original sin filtrar outliers (de los registros filtrados)
    #df = df.withColumn("dias_dif_pre2", F.lead("dias_dif").over(window_spec))
    #df = df.withColumn("dias_dif2", F.when(F.col("dias_dif") <= F.col("dias_dif_pre2"), F.col("dias_dif")).otherwise(F.col("dias_dif_pre2")))

    
    window_spec = Window.partitionBy("id_master","cod_mov")
    # Obtener el promedio, mediana y desviación estándar del delta en días actualizado
    df = df.withColumn("avg_dias_dif", F.avg("dias_dif").over(window_spec))
    df = df.withColumn("median_dias_dif", F.expr("percentile(dias_dif, 0.5)").over(window_spec))
    df = df.withColumn("std_dias_dif", F.stddev("dias_dif").over(window_spec))

    
   
    # Guardar en una columna el número de transacciones ya filtrados los outliers
    df = df.withColumn("num_txm_con_filtro_outliers", F.count("dias_dif").over(window_spec))

    # Calcular nuevamente el porcentaje de cambio del monto respecto al movimiento previo
    df = df.withColumn("porcentaje_dif_monto2", F.abs(F.col("mto_operacion") / F.lag("mto_operacion").over(window_spec.orderBy("tm_fec_operacion")) - 1))

    # Para el caso del registro más antiguo, quedarnos con el coalesce respecto al calculo anterior
    df = df.withColumn("porcentaje_dif_monto2", F.coalesce(df["porcentaje_dif_monto2"], df["porcentaje_dif_monto"]))

    # Verificar que el campo de fecha-hora de operación sea de tipo timestamp
    df = df.withColumn("tm_fec_operacion", F.to_timestamp(df["tm_fec_operacion"]))

    # Calcular el delta más lejano dentro del rango esperado
    df = df.withColumn("delta_fecha_estimada", F.col("median_dias_dif") + (2 * F.col("std_dias_dif")))

    # Obtener la fecha estimada de su próximo depósito
    df = df.withColumn("prox_fecha_estimada", F.expr("date_add(tm_fec_operacion, CAST(median_dias_dif AS INT))"))
    df = df.withColumn("prox_fecha_estimada_max", F.expr("date_add(tm_fec_operacion, CAST(delta_fecha_estimada AS INT))"))

 

    # Se obtiene num_periodo_mes máximo
    #df = df.withColumn("max_num_periodo_mes", F.max("num_periodo_mes").over(Window.partitionBy("id_master", "cod_mov")))
    df = df.withColumn("max_num_periodo_mes", F.max("num_periodo_mes").over(Window.partitionBy("id_master", "cod_mov")))
    # df = df.withColumn("max_num_periodo_mes", F.min("num_periodo_mes").over(Window.partitionBy("id_master", "cod_mov")))
    
    #show_df_columns(df.select("id_master","cod_mov","tm_fec_operacion","num_periodo_mes","max_num_periodo_mes"),columns=3)
    return df


In [226]:
def calculation_of_periodicities(df):
   # Definir las condiciones
    dias_periodicidad = {
             'bimestral': 60,
             'mensual': 30,
             'quincenal': 15,
             'catorcenal': 14,
             'semanal': 7,
             'diario': 1,
         }
    condiciones = [
        (F.col("std_dias_dif") >= F.col("median_dias_dif")),
        (F.abs(F.col("median_dias_dif") - 60) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 30) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 14) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 7) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 1) <= (F.col("std_dias_dif") * 1.5))
    ]

    # Definir las opciones
    opciones = [
        "muy_esporadico", "bimestral", "mensual", "quincenal", "semanal", "diario"
    ]

    # Aplicar las condiciones y obtener la periodicidad
    df = df.withColumn("periodicidad", F.when(F.coalesce(*condiciones), F.lit("otro"))
                      .otherwise(F.expr("CASE WHEN median_dias_dif = 60 THEN 'bimestral' WHEN median_dias_dif = 30 THEN 'mensual' WHEN median_dias_dif = 14 THEN 'quincenal' WHEN avg_dias_dif = 7 THEN 'semanal' WHEN median_dias_dif = 1 THEN 'diario' ELSE 'muy_esporadico' END")))

    # Obtener la periodicidad en días
    #df = df.withColumn("periodicidad_dias", F.when(( F.col("periodicidad") == "otro") | (F.col("periodicidad") == "muy_esporadico"), F.col("median_dias_dif")).otherwise(F.col("periodicidad").map(dias_periodicidad)))
    # Apply the mapping using a series of when() statements
    periodicidad_mapping = F.when(F.col("periodicidad") == "bimestral", F.lit(dias_periodicidad['bimestral'])) \
                            .when(F.col("periodicidad") == "mensual", F.lit(dias_periodicidad['mensual'])) \
                            .when(F.col("periodicidad") == "quincenal", F.lit(dias_periodicidad['quincenal'])) \
                            .when(F.col("periodicidad") == "catorcenal", F.lit(dias_periodicidad['catorcenal'])) \
                            .when(F.col("periodicidad") == "semanal", F.lit(dias_periodicidad['semanal'])) \
                            .when(F.col("periodicidad") == "diario", F.lit(dias_periodicidad['diario']))

    # Apply the conditional logic
    df = df.withColumn("periodicidad_dias", F.when(F.col("periodicidad").isin(["otro", "muy_esporadico"]), F.col("median_dias_dif")).otherwise(periodicidad_mapping))

    #print("----test-----")
    #show_df_columns(df.select("id_master","cod_mov","std_dias_dif","median_dias_dif","periodicidad","periodicidad_dias"),columns=4)
    #df.select("id_master","cod_mov","dias_dif","tm_fec_operacion","std_dias_dif","median_dias_dif","periodicidad","periodicidad_dias").show(100)
    return df


def calculation_of_periodicities_window(df, window_spec):
    # Calculate median and standard deviation within each window
    #df = df.withColumn("median_dias_dif", F.expr("percentile_approx(dias_dif, 0.5)").over(window_spec))
    #df = df.withColumn("std_dias_dif", F.stddev("dias_dif").over(window_spec))

    # Defining conditions
    condiciones = [
        (F.col("std_dias_dif") >= F.col("median_dias_dif")),
        (F.abs(F.col("median_dias_dif") - 60) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 30) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 14) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 7) <= (F.col("std_dias_dif") * 1.5)),
        (F.abs(F.col("median_dias_dif") - 1) <= (F.col("std_dias_dif") * 1.5))
    ]

    # Defining options (periodicities)
    opciones = [
        "muy_esporadico", "bimestral", "mensual", "quincenal", "semanal", "diario"
    ]

    # Apply conditions to obtain periodicity
    df = df.withColumn("periodicidad", F.coalesce(
        *(F.when(cond, F.lit(opt)) for cond, opt in zip(condiciones, opciones)),
        F.lit("otro")
    ))

    # Obtain periodicity in days
    df = df.withColumn("periodicidad_dias", F.when(F.col("periodicidad") != "otro", F.col("median_dias_dif")).otherwise(F.lit(None)))
    
    
    print("----test periodicidades-----")
    show_df_columns(df,columns=6)
    #df.show(50)
    
    return df


In [265]:
def calculate_amount(df, n_periodos=3):
    
  # Definir la ventana para cálculos basados en periodos
    window_spec =Window.partitionBy("id_master","cod_mov").orderBy("num_periodo_mes")

    # Obtener la suma de todos los depósitos por mes
    #df = df.groupBy("num_periodo_mes").agg(F.sum("mto_operacion").alias("mto_operacion_mes"))
    
    df = df.withColumn("mto_operacion_mes", F.mean("mto_operacion").over(window_spec))

     
    # Obtener el promedio mensual de los depósitos de los últimos n (3) meses previos
    window_spec_mensual =Window.partitionBy("id_master","cod_mov",).orderBy("num_periodo_mes")
    #df = df.withColumn("mnto_promedio_mes_estable", F.mean("mto_operacion_mes").over(window_spec.rowsBetween(-n_periodos, -1)))

    window_count_spec = window_spec_mensual.rowsBetween(-n_periodos, -1)
    df = df.withColumn("count_periods", F.count("num_periodo_mes").over(window_count_spec))

    # Calculate the rolling average conditionally
    df = df.withColumn("mnto_promedio_mes_estable", 
                       F.when(F.col("count_periods") >= n_periodos, 
                              F.mean("mto_operacion_mes").over(window_spec))
                       .otherwise(0))  # Or another default value as appropriate

    #print("----test-----")
    #show_df_columns(df.select("id_master","cod_mov","std_dias_dif","median_dias_dif","periodicidad","periodicidad_dias"),columns=4)
    #df.select("id_master","cod_mov","dias_dif","tm_fec_operacion","median_dias_dif","periodicidad","periodicidad_dias","num_periodo_mes","mto_operacion_mes","mnto_promedio_mes_estable").show(100)
    
       
    # Obtener el factor para convertir el monto a mensual
    df = df.withColumn("mnto_factor_mes", F.lit(30) / F.col("median_dias_dif"))

    print("----test-----")
    #show_df_columns(df.select("id_master","cod_mov","std_dias_dif","median_dias_dif","periodicidad","periodicidad_dias"),columns=4)
    df.select("id_master","cod_mov","dias_dif","median_dias_dif","periodicidad","num_periodo_mes","mto_operacion_mes","count_periods","mnto_promedio_mes_estable","mnto_factor_mes").show(100)
    
    # Convertir el monto original a su equivalente mensual
    df = df.withColumn("_mnto_mensual", F.col("mto_operacion") * F.col("mnto_factor_mes"))

    # Hacer ajuste si la periodicidad es bimestral
    df = df.withColumn("mnto_promedio_mes_estable", F.when(F.col("periodicidad_dias") >= 40, F.col("mnto_promedio_mes_estable") / (F.col("periodicidad_dias") / 30)).otherwise(F.col("mnto_promedio_mes_estable")))

    # Obtener el porcentaje de cambio del monto equivalente mensual respecto al promedio mensual estable
    df = df.withColumn("_monto_diff_prom_mes_estable", F.abs((F.col("_mnto_mensual") - F.col("mnto_promedio_mes_estable")) / F.col("mnto_promedio_mes_estable")))

    # Obtener el promedio y la desviación estándar del porcentaje de cambio del monto equivalente mensual respecto al promedio mensual estable
    df = df.withColumn("_monto_diff_prom_mes_estable_avg", F.mean("_monto_diff_prom_mes_estable").over(Window.partitionBy("cod_mov")))
    df = df.withColumn("_monto_diff_prom_mes_estable_std", F.stddev("_monto_diff_prom_mes_estable").over(Window.partitionBy("cod_mov")))

    # Obtener en una columna temporal "metric" la suma del promedio + la desv. est. del porcentaje de cambio del monto equivalente mensual respecto al promedio mensual estable
    df = df.withColumn("_monto_diff_metric", F.col("_monto_diff_prom_mes_estable_avg") + F.col("_monto_diff_prom_mes_estable_std"))

    # Cuando el porcentaje de cambio del monto equivalente mensual respecto al promedio mensual estable sea > que "metric",
    # se pone el monto mensual estable, en otro caso, se conserva el monto equivalente mensual
    df = df.withColumn("_monto_final_prev", F.when(F.col("_monto_diff_prom_mes_estable") > F.col("_monto_diff_metric"), F.col("mnto_promedio_mes_estable")).otherwise(F.col("_mnto_mensual")))

    # Se crea la columna para determinar si el monto es estable con base a la chi2
    # Se calcula el porcentaje de cambio respecto al periodo previo del monto equivalente mensual tratado
    df = df.withColumn("porcentaje_dif_monto_chi2", F.abs(F.col("_monto_final_prev") / F.lag("_monto_final_prev").over(window_spec) - 1))

    # **Experimental se podría ver el resultado de la chi2 tomando el min entre la columna anterior y la diferencia porcentual respecto al promedio mensual estable
    df = df.withColumn("porcentaje_dif_monto_chi2_2", F.least(F.col("porcentaje_dif_monto_chi2"), F.col("_monto_diff_prom_mes_estable")))

    # Obtener el monto correspondiente del periodo, a partir de la columna tratada
    df = df.withColumn("_mnto_sin_factor_mes", F.col("_monto_final_prev") / F.col("mnto_factor_mes"))

    # Obtener la suma por mes de los montos tratados
    df_mensual_tratado = df.groupBy("num_periodo_mes").agg(F.sum("_mnto_sin_factor_mes").alias("_mnto_sin_factor_mes_sum"))

    # Obtener el promedio mensual de los depósitos tratados de los últimos n (3) meses, considerando el actual
    df_mensual_tratado = df_mensual_tratado.withColumn("_monto_final_prev2", F.mean("_mnto_sin_factor_mes_sum").over(Window.orderBy("num_periodo_mes").rowsBetween(-n_periodos, 0)))

    # Unir las columnas de promedios mensuales tratados al df de movimientos
    df = df.join(df_mensual_tratado.select("num_periodo_mes", "_monto_final_prev2"), on="num_periodo_mes", how="left")

    # Hacer ajuste si la periodicidad es bimestral
    df = df.withColumn("_mnto_sin_factor_mes", F.when(F.col("periodicidad_dias") >= 40, F.col("_mnto_sin_factor_mes") / (F.col("periodicidad_dias") / 30)).otherwise(F.col("_mnto_sin_factor_mes")))
    df = df.withColumn("_monto_final_prev2", F.when(F.col("periodicidad_dias") >= 40, F.col("_monto_final_prev2") / (F.col("periodicidad_dias") / 30)).otherwise(F.col("_monto_final_prev2")))

    # Obtener el monto equivalente mensual final a informar,
    # tomando el min entre el promedio mensual de los depósitos tratados de los

    # Obtener el monto equivalente mensual final a informar
    df = df.withColumn("monto_final_mensual", F.least("_monto_final_prev2", "_monto_final_prev"))

    # Obtener el monto final a informar del periodo
    df = df.withColumn("monto_final_periodo", F.col("monto_final_mensual") / F.col("mnto_factor_mes"))

    return df





In [214]:
!pip install scipy
from scipy.stats import chi2

[0m

In [215]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
from scipy.stats import chi2, chisquare

def chi2_test(statistic, df_length, alpha):
    valor_critico = chi2.ppf(1-alpha, df_length - 1)
    return int(statistic < valor_critico)

# Register the UDF
chi2_test_udf = udf(chi2_test, IntegerType())

# Assuming df is your Spark DataFrame and you have calculated chi2_periodicidad and chi2_monto for each row
# Add the results as new columns in the DataFrame
#df = df.withColumn("es_recurrente", chi2_test_udf(df["chi2_periodicidad"], len(df.columns), lit(0.95)))
#df = df.withColumn("es_estable", chi2_test_udf(df["chi2_monto"], len(df.columns), lit(0.95)))

# Show the results
#df.show()

In [223]:

from IPython.display import display, HTML

def show_html_df(spark_df, num_rows=30):
    pandas_df = spark_df.limit(num_rows).toPandas()
    display(HTML(pandas_df.to_html()))



def show_df(spark_df, num_rows=30, truncate=30):
    if truncate:
        for row in spark_df.limit(num_rows).collect():
            print(" | ".join(str(item).ljust(truncate) for item in row))
    else:
        for row in spark_df.limit(num_rows).collect():
            print(" | ".join(str(item) for item in row))

            
def show_df_columns(df,columns=6):
    
    all_columns = df.columns
    first_5_columns = all_columns[:columns-3]
    last_5_columns = all_columns[-(columns):]

    # Select these columns from the DataFrame and show
    # Combine the first 5 and last 5 columns
    selected_columns = first_5_columns + last_5_columns

    # Create a new DataFrame with these selected columns
    new_df = df.select(*selected_columns)

    # Show the new DataFrame
    new_df.show(80)
# Usage
#show_df(spark_df, num_rows=10, truncate=30)  # Set 'truncate=None' to show full content


In [237]:
from pyspark.sql.window import Window

def analyze_recurrence(df):
    processed_df = processing_dataset(df)
    df_no_outliers = remove_outliers(processed_df)
    window_spec = Window.partitionBy("id_master", "cod_mov").orderBy("tm_fec_operacion")
    df_ready_without_outliers = dataset_processing_without_outliers(df_no_outliers, window_spec)
    df_with_periodicities = calculation_of_periodicities(df_ready_without_outliers)
    df_amount = calculate_amount(df_with_periodicities)
    #df_amount = calculate_amount(df_with_periodicities)

    # Create temporary views
    #processed_df.createOrReplaceTempView("processed_df_view")
    #df_no_outliers.createOrReplaceTempView("df_no_outliers_view")
    #df_ready_without_outliers.createOrReplaceTempView("df_ready_without_outliers_view")
    #df_with_periodicities.createOrReplaceTempView("df_with_periodicities_view")
    #df_amount.createOrReplaceTempView("df_amount_view")

    # Example of how to query these views using Spark SQL
    print("PROCESSING DATA SET")
    show_df_columns(processed_df)
    #spark.sql("SELECT * FROM processed_df_view LIMIT 30").show()
    # Select the first 5 and last 5 columns by name
    
    print("DATA SET WITHOUT OUTLIERS")
    show_df_columns(df_no_outliers,columns=8)
    #df_no_outliers.show(10)
    #spark.sql("SELECT * FROM df_no_outliers_view LIMIT 30").show()
    print("NEW DATASET READY  WITHOUT OUTLIERS")
    show_df_columns(df_ready_without_outliers,columns=8)
    #show_df_columns(df_ready_without_outliers,columns=6)
    #spark.sql("SELECT * FROM df_ready_without_outliers_view LIMIT 30").show()
    #print("CALCULATION OF PERIODICITIES")
    #show_df_columns(df_with_periodicities,columns=5)
    #spark.sql("SELECT * FROM df_with_periodicities_view LIMIT 30").show()
    #print("CALCULATE AMOUNT")
    #show_df_columns(df_amount,columns=5)
    #spark.sql("SELECT * FROM df_amount_view LIMIT 30").show()
    return df_amount
    # Continue with your processing...


In [266]:
df_amount=analyze_recurrence(df)

                                                                                

For id_master=20347214, cod_mov=NOMINA_TRANSFERENCIA:
Number of rows before filtering: 48
Number of rows after filtering: 40
----test-----
+---------+--------------------+--------+---------------+--------------+---------------+------------------+-------------+-------------------------+-----------------+
|id_master|             cod_mov|dias_dif|median_dias_dif|  periodicidad|num_periodo_mes| mto_operacion_mes|count_periods|mnto_promedio_mes_estable|  mnto_factor_mes|
+---------+--------------------+--------+---------------+--------------+---------------+------------------+-------------+-------------------------+-----------------+
| 20347214|NOMINA_TRANSFERENCIA|       9|            6.5|muy_esporadico|         202210|             540.0|            0|                      0.0|4.615384615384615|
| 20347214|NOMINA_TRANSFERENCIA|       5|            6.5|muy_esporadico|         202210|             540.0|            1|                      0.0|4.615384615384615|
| 20347214|NOMINA_TRANSFERENCIA

In [219]:
!pip install scipy

[0m

In [220]:
from scipy.stats import chi2, chisquare

In [221]:
from pyspark.sql.functions import udf, col
from pyspark.sql.types import MapType, StringType, IntegerType
from scipy.stats import chi2, chisquare
import numpy as np


window_spec = Window.partitionBy("id_master", "cod_mov").orderBy("tm_fec_operacion")

# Define a UDF to perform chi-squared tests
def chi2_test(dias_dif, porcentaje_dif_monto):
    chi2_recurrente = {}
    chi2_monto_estable = {}

    # Assuming dias_dif and porcentaje_dif_monto are lists of values for each group
    for x in [0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95, 0.975, 0.99, 0.999]:
        valor_critico = chi2.ppf(1-x, len(dias_dif) - 1)
        chi2_periodicidad = chisquare(f_obs=np.array(dias_dif), ddof=len(dias_dif)-1).statistic
        chi2_monto = chisquare(f_obs=np.array(porcentaje_dif_monto), ddof=len(porcentaje_dif_monto)-1).statistic
        es_recurrente = chi2_periodicidad < valor_critico
        es_estable = chi2_monto < valor_critico
        chi2_recurrente[str(x)] = int(es_recurrente)
        chi2_monto_estable[str(x)] = int(es_estable)

    return {"chi2_recurrente": chi2_recurrente, "chi2_monto_estable": chi2_monto_estable}

chi2_udf = udf(chi2_test, MapType(StringType(), MapType(StringType(), IntegerType())))

# Collect lists of values for each group defined by the window spec
df_amount = df_amount.withColumn("dias_dif_list", F.collect_list("dias_dif2").over(window_spec))
df_amount = df_amount.withColumn("porcentaje_dif_monto_list", F.collect_list("porcentaje_dif_monto_chi2_2").over(window_spec))

# Apply the UDF to each group
df_amount = df_amount.withColumn("chi2_results", chi2_udf("dias_dif_list", "porcentaje_dif_monto_list"))

# Now df contains the results of the chi-squared test for each group
#df_amount.select("id_master", "cod_mov", "chi2_results").show(truncate=False)
show_df_columns(df_amount,columns=5)
# For the rest of the statistics, use window functions or aggregate functions as required


[Stage 2214:>                                                       (0 + 1) / 1]23/12/11 22:52:36 ERROR TaskSetManager: Task 0 in stage 2214.0 failed 4 times; aborting job


Py4JJavaError: An error occurred while calling o21139.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2214.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2214.0 (TID 33302, 172.19.0.4, executor 69): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 366, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 241, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 168, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 587, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'scipy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage42.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:365)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:3389)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset$$anonfun$52.apply(Dataset.scala:3370)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3369)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2550)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2764)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:254)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:291)
	at sun.reflect.GeneratedMethodAccessor106.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 366, in main
    func, profiler, deserializer, serializer = read_udfs(pickleSer, infile, eval_type)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 241, in read_udfs
    arg_offsets, udf = read_single_udf(pickleSer, infile, eval_type, runner_conf)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 168, in read_single_udf
    f, return_type = read_command(pickleSer, infile)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 69, in read_command
    command = serializer._read_with_length(file)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/usr/bin/spark-2.4.5-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 587, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'scipy'

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:456)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:81)
	at org.apache.spark.sql.execution.python.PythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:64)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage42.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:858)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:346)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:310)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
