In [1]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import sum, count, avg, collect_set
from typing import List, Tuple, Dict
import math

spark = SparkSession.builder.appName("DecisionTree").getOrCreate()

In [2]:


def transpose(cols: List[str], df: DataFrame, colgroup: str) -> DataFrame:
    """
    Función para rotar el dataframe
    
    Parameters:
        cols(List[str]): Lsita de las columnas del DataFrame
        df(DataFrame): Dataframe al cual se le hará transpose
        colgroup(str): columna usada para agrupar
    
    Returns:
        DataFrame: Una copia del dataframe aplicando Transpose
    """
    rows = []
    columns = df.agg(collect_set(colgroup)).collect()[0][0]
    columns.insert(0, 'temp-col')
    for col in cols:
        if col == colgroup:
            continue
        count_v = df.groupBy(colgroup, col).agg(count("*")).collect()
        for item in count_v:
            temp_row = [f"{col}-{item[col]}"]
            for i in range(1, len(columns)):
                if columns[i] == item[colgroup]:
                    temp_row.append(item['count(1)'])
                else:
                    temp_row.append(0)
            rows.append(temp_row)

    transpose_df = spark.createDataFrame(rows, columns)
    return transpose_df


def read_csv(path:str) -> List[str]:
    """
    Función para leer el archivo csv
    Parameters:
        path(str): path de ubicación del archivo csv
    Returns:
        List[DataFrame, List[str]]: retorna el DataFrame y la lista de las columnas
    """
    
    try:
        df = spark.read.csv(path, inferSchema=True, header=True)
        cols = df.columns

    except Exception as e:
        print(f'Error leyendo el archivo')
        print(str(e))
    else:
        return df, cols

In [3]:
def get_prediction_vect(df: DataFrame) -> Dict[str, str]:
    print("Columnas disponibles en el dataframe:")
    print(df.columns)
    
    target_col = input("Selecciona la columna a predecir (target): ").strip()
    
    feature_cols_input = input("Selecciona las columnas a usar como predictores, separadas por comas: ").strip()
    predictors = [col.strip() for col in feature_cols_input.split(",") if col.strip() != ""]
    
    return predictors, target_col

In [20]:
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.sql.types import StringType

def prepare_data(df, target_col, predictors):
    data = df

    if isinstance(df.schema[target_col].dataType, StringType):
        si_target = StringIndexer(inputCol=target_col, outputCol="label")
        si_model_target = si_target.fit(data)
        data = si_model_target.transform(data)
        print(f"\nCodificación de '{target_col}':")
        for i, lbl in enumerate(si_model_target.labels):
            print(f"{i} -> {lbl}")
    else:
        data = data.withColumnRenamed(target_col, "label")

    feat_cols = []
    for col_name in predictors:
        if col_name == target_col or col_name.lower() == "id":
            continue

        if isinstance(df.schema[col_name].dataType, StringType):
            si_feat = StringIndexer(inputCol=col_name, outputCol=f"{col_name}_idx")
            si_model_feat = si_feat.fit(data)
            data = si_model_feat.transform(data)
            feat_cols.append(f"{col_name}_idx")
            print(f"\nCodificación de '{col_name}':")
            for i, lbl in enumerate(si_model_feat.labels):
                print(f"{i} -> {lbl}")
        else:
            feat_cols.append(col_name)

    assembler = VectorAssembler(inputCols=feat_cols, outputCol="features")
    data = assembler.transform(data)

    train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

    print("\nMuestra de train_data:")
    train_data.show(5, truncate=False)

    return train_data, test_data


In [21]:
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.sql.functions import col

def fit_model(train_data, test_data):

    max_depth = float(input("Ingresa max_depth a usar: "))
    min_instances = float(input("Ingresa min_instances a usar: "))
    
    dt = DecisionTreeClassifier(
        labelCol="label",
        featuresCol="features",
        maxDepth=max_depth,
        minInstancesPerNode=min_instances
    )

    # Entrenar
    model = dt.fit(train_data)
    predictions = model.transform(test_data)

    # Accuracy
    correct = predictions.filter(col("prediction") == col("label")).count()
    total = predictions.count()
    accuracy = correct / total
    print(f"Test Accuracy: {accuracy}")

    # Falsos positivos por clase
    labels = [row[0] for row in predictions.select("label").distinct().collect()]
    print("Falsos Positivos por clase:")
    for l in labels:
        fp = predictions.filter((col("prediction") == l) & (col("label") != l)).count()
        print(f"Clase {l}: {fp}")

    # arbol
    print("\nÁrbol de decisión:")
    print(model.toDebugString)

    return model, predictions


In [22]:
from pyspark.ml.linalg import Vectors

def predecir(model):
    while True:
        input_str = input("Ingresa las características separadas por comas (ej: 0.0,0.2,3.5): ")
        if input_str == "":
            break
        
        features = [float(x.strip()) for x in input_str.split(",")]
    
        df = spark.createDataFrame([(Vectors.dense(features),)], ["features"])
    
        prediction = model.transform(df).collect()[0]["prediction"]
        print(f"Predicción del modelo: {prediction}")


In [23]:
print("Bienvenido al uso del modelo Decision Tree")
path = input("Por favor provee el path de ubicación del archivo csv: ")
df, cols = read_csv(path)

print("\n\n")
print("Muestra del Dataframe")
df.show(5)

"""while True:
    try:
        ans_trans = int(input("Desea hacer transpose al Dataframe: 1. Sí\t2. No: "))
        if ans_trans == 1:
            # Fix
        if ans_trans == 2:
            break
        else:
            print("Por favor, ingrese 1 para Sí o 2 para No.")
    except ValueError:
        print("Por favor, ingrese un número válido (1 o 2).")"""

predictors, target_col = get_prediction_vect(df)
train_data, test_data = prepare_data(df, target_col, predictors)

model, predictions = fit_model(train_data, test_data)
predecir(model)

Bienvenido al uso del modelo Decision Tree


Por favor provee el path de ubicación del archivo csv:  iris.csv





Muestra del Dataframe
+---+-------------+------------+-------------+------------+-----------+
| Id|SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|    Species|
+---+-------------+------------+-------------+------------+-----------+
|  1|          5.1|         3.5|          1.4|         0.2|Iris-setosa|
|  2|          4.9|         3.0|          1.4|         0.2|Iris-setosa|
|  3|          4.7|         3.2|          1.3|         0.2|Iris-setosa|
|  4|          4.6|         3.1|          1.5|         0.2|Iris-setosa|
|  5|          5.0|         3.6|          1.4|         0.2|Iris-setosa|
+---+-------------+------------+-------------+------------+-----------+
only showing top 5 rows

Columnas disponibles en el dataframe:
['Id', 'SepalLengthCm', 'SepalWidthCm', 'PetalLengthCm', 'PetalWidthCm', 'Species']


Selecciona la columna a predecir (target):  Species
Selecciona las columnas a usar como predictores, separadas por comas:  PetalWidthCm



Codificación de 'Species':
0 -> Iris-setosa
1 -> Iris-versicolor
2 -> Iris-virginica

Muestra de train_data:
+---+-------------+------------+-------------+------------+-----------+-----+--------+
|Id |SepalLengthCm|SepalWidthCm|PetalLengthCm|PetalWidthCm|Species    |label|features|
+---+-------------+------------+-------------+------------+-----------+-----+--------+
|1  |5.1          |3.5         |1.4          |0.2         |Iris-setosa|0.0  |[0.2]   |
|2  |4.9          |3.0         |1.4          |0.2         |Iris-setosa|0.0  |[0.2]   |
|4  |4.6          |3.1         |1.5          |0.2         |Iris-setosa|0.0  |[0.2]   |
|5  |5.0          |3.6         |1.4          |0.2         |Iris-setosa|0.0  |[0.2]   |
|6  |5.4          |3.9         |1.7          |0.4         |Iris-setosa|0.0  |[0.4]   |
+---+-------------+------------+-------------+------------+-----------+-----+--------+
only showing top 5 rows



Ingresa max_depth a usar:  1
Ingresa min_instances a usar:  1


Test Accuracy: 0.7083333333333334
Falsos Positivos por clase:
Clase 0.0: 0
Clase 1.0: 7
Clase 2.0: 0

Árbol de decisión:
DecisionTreeClassificationModel: uid=DecisionTreeClassifier_b52d9fb4969b, depth=1, numNodes=3, numClasses=3, numFeatures=1
  If (feature 0 <= 0.8)
   Predict: 0.0
  Else (feature 0 > 0.8)
   Predict: 1.0



Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  0.1


Predicción del modelo: 0.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  1


Predicción del modelo: 1.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  5


Predicción del modelo: 1.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  0


Predicción del modelo: 0.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  0.5


Predicción del modelo: 0.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  3.2


Predicción del modelo: 1.0


Ingresa las características separadas por comas (ej: 0.0,0.2,3.5):  
