# Establecer el entorno Pyspark en Colab

In [1]:
# Descargar la máquina virtual de Java (JVM)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
# Descargar Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
# Unzip the file
!tar xf spark-3.4.0-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.4.0-bin-hadoop3"

In [4]:
!pip install -q findspark
import findspark
findspark.init()

In [5]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("My Spark Application") \
    .config("spark.executor.memory", "16g") \
    .config("spark.driver.memory", "16g") \
    .master("local[*]") \
    .getOrCreate()

### Leer los datos desde Drive

In [6]:
from google.colab import drive
drive.mount('/content/gdrive', force_remount=True)

Mounted at /content/gdrive


In [7]:
df = spark.read.format('csv').option('header','true').option('inferSchema', 'true').load('/content/gdrive/MyDrive/Colab Notebooks/Minería/cleaned_data.csv')

In [8]:
df.show(5)

+------------+---------------+--------------------+-----------------+------------+----------------+-------------------+-----------+----------------+--------------+-------------+---------+------+----------+------+-------------+---------+---------------+-------+----------+-------+------------------+------+------------+--------------------+---------+-----+----+-------------+---------+------------+------------+-----------------+----------------+------------+-------------+---------------------+---------------+-----------------------+-----------------+----------------------+
|back_legroom|      body_type|                city|city_fuel_economy|daysonmarket|engine_cylinders|engine_displacement|engine_type|franchise_dealer|franchise_make|front_legroom|fuel_type|height|horsepower|is_new|listing_color|make_name|maximum_seating|mileage|model_name|  price|     seller_rating|torque|transmission|wheel_system_display|wheelbase|width|year|Backup Camera|Bluetooth|Alloy Wheels|Heated Seats|Navigation Sys

In [9]:
# Crear la columnas "extras"
from pyspark.sql.functions import col

# List of columns to sum
columns_to_sum = ['Backup Camera', 'Bluetooth', 'Alloy Wheels', 'Heated Seats', 'Navigation System', 
                  'Sunroof/Moonroof', 'Remote Start', 'Leather Seats', 'Blind Spot Monitoring', 
                  'Parking Sensors', 'Adaptive Cruise Control', 'Third Row Seating']

# Create new column 'extras' that is the row sum of the specified columns
for column in columns_to_sum:
    df = df.withColumn(column, df[column].cast("double")) # ensure the columns are of type double

df = df.withColumn('extras', sum(df[column] for column in columns_to_sum))

# Drop original columns
df = df.drop(*columns_to_sum)

In [10]:
# List of column names to be converted
columns_to_convert = ['city_fuel_economy','width','wheelbase','torque',
                      'seller_rating','price','mileage','maximum_seating',
                      'horsepower','height','front_legroom']

# Convert columns from double to int
for column in columns_to_convert:
    df = df.withColumn(column, col(column).cast("int"))

In [11]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

In [12]:
df = df.na.drop() # Se eliminan los NA para poder correr los modelos

In [13]:
# Se seleccionan solo los carros usados filtrando por la columna is_new
df = df.filter(col('is_new') != True)
# Leugo se elimina la columna
df = df.drop('is_new')

### Agrupar las etiquetas que tienen menos del 1% de frecuencia en la categoría "otros"

In [14]:
from pyspark.sql.functions import count, when, col, desc

# Listado de features categóricos
categorical_features = ['city', 'model_name', 'make_name', 'body_type','fuel_type',
                        'listing_color','transmission', 'wheel_system_display','engine_type']

# Crear un diccionario para almacenar el resultado
results = {}

for feature in categorical_features:
    # Calcular el número de valores únicos
    unique_count = df.select(feature).distinct().count()

    # Calcular el conteo de cada categoría
    value_counts = df.groupBy(feature).count().orderBy(desc('count'))

    # Obtener la categoría más frecuente y su porcentaje
    most_freq_row = value_counts.first()
    most_freq_category = most_freq_row[feature]
    most_freq_percentage = 100.0 * most_freq_row['count'] / df.count()

    least_freq_row = value_counts.orderBy('count').first()
    least_freq_category = least_freq_row[feature]
    least_freq_percentage = 100.0 * least_freq_row['count'] / df.count()

    # Almacenar los resultados en e diccionario
    results[feature] = {
        'unique_count': unique_count,
        'most_freq_category': most_freq_category,
        'most_freq_percentage': most_freq_percentage,
        'least_freq_category': least_freq_category,
        'least_freq_percentage': least_freq_percentage
    }

# Mostrar los resultados
for feature, result in results.items():
    print(f"{feature}:")
    print(f"  Unique count: {result['unique_count']}")
    print(f"  Most frequent category: {result['most_freq_category']} ({result['most_freq_percentage']:.2f}%)")
    print(f"  Least frequent category: {result['least_freq_category']} ({result['least_freq_percentage']:.2f}%)")
    print()


city:
  Unique count: 4061
  Most frequent category: Houston (1.36%)
  Least frequent category: North Las Vegas (0.00%)

model_name:
  Unique count: 602
  Most frequent category: F-150 (2.93%)
  Least frequent category: Seville (0.00%)

make_name:
  Unique count: 44
  Most frequent category: Ford (13.11%)
  Least frequent category: Isuzu (0.00%)

body_type:
  Unique count: 9
  Most frequent category: SUV / Crossover (46.38%)
  Least frequent category: Van (0.07%)

fuel_type:
  Unique count: 6
  Most frequent category: Gasoline (91.46%)
  Least frequent category: Compressed Natural Gas (0.00%)

listing_color:
  Unique count: 15
  Most frequent category: BLACK (20.86%)
  Least frequent category: PINK (0.01%)

transmission:
  Unique count: 4
  Most frequent category: A (83.83%)
  Least frequent category: Dual Clutch (0.39%)

wheel_system_display:
  Unique count: 5
  Most frequent category: Front-Wheel Drive (47.51%)
  Least frequent category: 4X2 (3.41%)

engine_type:
  Unique count: 24
 

In [15]:
# Acá vemos las categoraías que más se repiten y las agrupamos
from pyspark.sql.functions import when, col, lit, count
def replace_rare_categories(df, columns, threshold=0.01):
    for col_name in columns:
        # Calcular las frecuencias
        total_count = df.count()
        freq = df.groupBy(col_name).count()
        freq = freq.withColumn('freq', freq['count'] / total_count)

        # Identificar las etiquetas raras
        rare_labels = freq.filter(freq['freq'] <= threshold).select(col_name).rdd.flatMap(lambda x: x).collect()

        # Reemplazarlos con 'Other'
        df = df.withColumn(col_name, when(col(col_name).isin(rare_labels), 'Other').otherwise(col(col_name)))
    return df

# Uso
df_replaced = replace_rare_categories(df, ['city', 'model_name', 'make_name',
                                           'body_type','fuel_type',
                                           'listing_color','transmission',
                                           'wheel_system_display','engine_type'])


### Encode para realizar los modelos de predicción

In [16]:
from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml import Pipeline

# Lista de variables categóricas
categorical_features = ['city', 'model_name', 'make_name', 'body_type','fuel_type',
                        'listing_color','transmission', 'wheel_system_display','engine_type']

# Construir el pipeline para volver dummies
stages = []
for categorical_col in categorical_features:
    string_indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col + "Index")
    encoder = OneHotEncoder(inputCols=[string_indexer.getOutputCol()], outputCols=[categorical_col + "classVec"], dropLast=True)
    stages += [string_indexer, encoder]

# Llamando el pipeline para que haga la transformación
pipeline = Pipeline(stages=stages)
pipeline_model = pipeline.fit(df_replaced)
df_encoded = pipeline_model.transform(df_replaced)

# Dropear las columnas intermedias creadas
for categorical_col in categorical_features:
    df_encoded = df_encoded.drop(categorical_col + "Index")

df_encoded.show()


+------------+---------------+-----+-----------------+------------+--------------------+-------------------+--------------------+----------------+--------------+-------------+-----------------+------+----------+-------------+----------+---------------+-------+--------------+-----+-------------+------+------------+--------------------+---------+-----+----+----------------------+------+-------------+------------------+-----------------+-----------------+-----------------+---------------------+--------------------+----------------------------+-------------------+
|back_legroom|      body_type| city|city_fuel_economy|daysonmarket|    engine_cylinders|engine_displacement|         engine_type|franchise_dealer|franchise_make|front_legroom|        fuel_type|height|horsepower|listing_color| make_name|maximum_seating|mileage|    model_name|price|seller_rating|torque|transmission|wheel_system_display|wheelbase|width|year|word_count_description|extras| cityclassVec|model_nameclassVec|make_nameclas

In [17]:
df = df_encoded

# Construcción de modelos de predicción

In [18]:
# Partir en train y test
# Separar la variable objetivo
df = df.withColumnRenamed('price', 'label')

# Hacer el split
train, test = df.randomSplit([0.8, 0.2], seed=42)

# Now 'train' corresponds to the combined 'X_train' and 'y_train' from sklearn,
# and 'test' corresponds to the combined 'X_test' and 'y_test'.



# Regresión Lineal

In [19]:
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.regression import LinearRegression
from pyspark.ml import Pipeline

# Replace 'continuous_feature_cols' with a list of column names of the continuous features
continuous_feature_cols = ['back_legroom', 'city_fuel_economy', 'daysonmarket',
                           'engine_displacement', 'front_legroom', 'height',
                           'horsepower', 'maximum_seating', 'mileage', 'seller_rating',
                           'torque', 'wheelbase', 'width', 'year', 'word_count_description', 
                           'extras']

# Unir todos los features en un mismo vector
assembler = VectorAssembler(inputCols=continuous_feature_cols, outputCol="features")

# Estandarizar los features
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                        withStd=True, withMean=True)

# Hacer la regresión RIDGE para regularizar
ridge = LinearRegression(featuresCol="scaledFeatures", labelCol="label",
                         elasticNetParam=0, regParam=1.0)

# Crear el pipeline
pipeline = Pipeline(stages=[assembler, scaler, ridge])

# Fit and transform the training data
model = pipeline.fit(train)

# Transformar los datos de test
test_scaled = model.transform(test)


In [20]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator

# Definir el conjunto de hiperparámetros a probar: 
# Se pueden añadir otros, por simplicidad no se hace
paramGrid = ParamGridBuilder() \
    .addGrid(ridge.regParam, [1.0]) \
    .build()

# Define cross-validation procedure
crossval = CrossValidator(estimator=pipeline,  
                          estimatorParamMaps=paramGrid,  # Aca se llaman los hiperparámetros
                          evaluator=RegressionEvaluator(metricName="rmse"),  # Evaluar el modelo con el RMSE
                          numFolds=5)  # Número de folds en CV

# Correr el CV
cvModel = crossval.fit(train)

# Obtener el RMSE del mejor modelo
rmse = cvModel.avgMetrics[0]
print(f'Ridge Regression CV RMSE: {rmse}') 


Ridge Regression CV RMSE: 5475.736785613694


#### Ver las métricas en test

In [None]:
from pyspark.sql.functions import col, abs
from pyspark.ml.evaluation import RegressionEvaluator

# Construir un evaluador de la regresión
rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

# Para el R2
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")

# Definir la función para el MAPE
def calculate_mape(df):
    return df.select(100 * abs(col("label") - col("prediction")) / col("label")).agg({"*": "avg"}).collect()[0][0]

# Predicciones en conjunto de entrenamiento
train_pred = model.transform(train)
rmse_train = rmse_evaluator.evaluate(train_pred)
mape_train = calculate_mape(train_pred)
r2_train = r2_evaluator.evaluate(train_pred)
print(f'Ridge Regression Train RMSE: {rmse_train}, Train MAPE: {mape_train}%, Train R-squared: {r2_train}')

# Predicciones en conjunto de prueba
test_pred = model.transform(test)
rmse_test = rmse_evaluator.evaluate(test_pred)
mape_test = calculate_mape(test_pred)
r2_test = r2_evaluator.evaluate(test_pred)
print(f'Ridge Regression Test RMSE: {rmse_test}, Test MAPE: {mape_test}%, Test R-squared: {r2_test}')


Ridge Regression Train RMSE: 5474.615215734273, Train MAPE: 24.170553344236666%, Train R-squared: 0.7632112138161617
Ridge Regression Test RMSE: 5442.804854944251, Test MAPE: 23.7189723348648%, Test R-squared: 0.7688984175876103


# Random Forest

In [None]:
from pyspark.ml.feature import VectorAssembler

# Definir los features
feature_cols = [ 'city_fuel_economy', 'daysonmarket', 
                'engine_displacement','franchise_dealer','front_legroom', 
                'height', 'horsepower','maximum_seating', 
                'mileage', 'seller_rating', 'torque', 
                'wheelbase', 'width', 'year', 'word_count_description', 'extras', 'cityclassVec', 
                'model_nameclassVec', 'make_nameclassVec', 'body_typeclassVec', 'fuel_typeclassVec', 
                'listing_colorclassVec', 'transmissionclassVec', 'wheel_system_displayclassVec', 'engine_typeclassVec']

# Inicializar el vector con la información
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transformar los datos
train = assembler.transform(train)
test = assembler.transform(test)


In [None]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col, abs

# Crear el objeto de la regresion con Random Forest
rf = RandomForestRegressor(featuresCol="features", labelCol="label")

# definir el grid de hiperparámetros:
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [10]) \
    .build()

# Hacer el cross validation
crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid,
                          evaluator=RegressionEvaluator(metricName="rmse"),
                          numFolds=5)

# Ejecutar el CV
cvModel = crossval.fit(train)

# Definir la función de MAPE
def calculate_mape(df):
    return df.select(100 * abs(col("label") - col("prediction")) / col("label")).agg({"*": "avg"}).collect()[0][0]

# Crear un evaluador para el RMSE
rmse_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="rmse")

# Crear un evaluador para el R2
r2_evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="label", metricName="r2")

In [None]:
# Predicciones en train
train_pred = cvModel.transform(train)
rmse_train = rmse_evaluator.evaluate(train_pred)
mape_train = calculate_mape(train_pred)
r2_train = r2_evaluator.evaluate(train_pred)
print(f'Random Forest Train RMSE: {rmse_train}, Train MAPE: {mape_train}%, Train R-squared: {r2_train}')

# Predicciones en test
test_pred = cvModel.transform(test)
rmse_test = rmse_evaluator.evaluate(test_pred)
mape_test = calculate_mape(test_pred)
r2_test = r2_evaluator.evaluate(test_pred)
print(f'Random Forest Test RMSE: {rmse_test}, Test MAPE: {mape_test}%, Test R-squared: {r2_test}')


Random Forest Train RMSE: 4927.262089810299, Train MAPE: 19.153222462177542%, Train R-squared: 0.8081926440807524
Random Forest Test RMSE: 4943.073808221099, Test MAPE: 19.181414064034183%, Test R-squared: 0.8093874099113136
