In [0]:
from pyspark.sql.functions import col, median, when, isnan, count, sum
from pyspark.sql.functions import mean, stddev
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.sql.types import IntegerType
from pyspark.sql.types import FloatType
from pyspark.sql.types import DoubleType
from pyspark.sql.functions import regexp_replace, col

In [0]:
df = spark.sql("SELECT * from honda")
df.show(4)

### Delete All nulls columns

In [0]:
# Count null values in all columns
null_counts = df.select(
    *[sum(when(col(c).isNull(), 1).otherwise(0)).alias(c) for c in df.columns]
)

# Show the null counts
display(null_counts)

In [0]:
columns_with_nulls = [c for c in null_counts.columns if null_counts.first()[c] > 0]

# Delete columns with null values

df_clean = df.drop(*columns_with_nulls)


In [0]:

df_clean.show(5)

### Fix data type


In [0]:
df_clean.printSchema()

In [0]:
# Change data type of Year, Consumer_Rating, Consumer_Review_#)
df_clean = df_clean.withColumn("Year", col("Year").cast(IntegerType())) \
                       .withColumn("Consumer_Rating", col("Consumer_Rating").cast(FloatType())) \
                       .withColumn("Consumer_Review_#", col("Consumer_Review_#").cast(FloatType())) 

In [0]:
df_clean.printSchema()

### Clean and normalize price

In [0]:
# Creteate new column "Price_int" from "Price""
df_clean = df_clean.withColumn("Price_int", 
                   regexp_replace(col("Price"), r"\$|,", "").cast(DoubleType()))


In [0]:
df_clean.show(5)

In [0]:
df_clean.printSchema()


### Count models and add them as a new column


In [0]:
model_count = df_clean.groupBy("Model").count().orderBy("count", ascending=False)


### One Hot Encoding


In [0]:
categorical_col = "Condition"
indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col+"_index")
encoders = OneHotEncoder(inputCols=[indexer.getOutputCol()], outputCols=[categorical_col+"_vec"])
pipeline = Pipeline(stages=[indexer, encoders])
df_clean = pipeline.fit(df_clean).transform(df_clean)

In [0]:
display(df_clean.limit(3))

### Training and evaluating model

In [0]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import col
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import RegressionEvaluator
import pandas as pd
import matplotlib.pyplot as plt




In [0]:
# Select columns
feature_cols = ["Year", "Consumer_Rating", "Condition_vec"]

# Create an Assembler
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")

# Transform dataframe
df_assembled = assembler.transform(df_clean)

# Crear la columna de etiqueta (depreciación)
df_assembled = df_assembled.withColumn("label", col("Price_int"))

In [0]:
# Divide dates
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=1234)


In [0]:


# Create LinearRegression model
lr = LinearRegression(featuresCol="features", labelCol="Price_int")

# Train model
lr_model = lr.fit(train_data)


In [0]:


# Realiza predicciones en el conjunto de prueba
predictions = lr_model.transform(test_data)

# Crear un evaluador de regresión
evaluator = RegressionEvaluator(predictionCol="prediction", labelCol="Price_int", metricName="rmse")

# Calcular el RMSE (Root Mean Squared Error)
rmse = evaluator.evaluate(predictions)
print(f"Root Mean Squared Error (RMSE): {rmse}")

# Calcular el MAE (Mean Absolute Error)
evaluator.setMetricName("mae")
mae = evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae}")

# Calcular el R^2
evaluator.setMetricName("r2")
r2 = evaluator.evaluate(predictions)
print(f"R^2: {r2}")


In [0]:

# Convertir el DataFrame de PySpark a Pandas para facilitar la visualización
predictions_df = predictions.select("features", "Price_int", "prediction").toPandas()

# Crear el gráfico de dispersión
plt.figure(figsize=(10, 6))
plt.scatter(predictions_df["Price_int"], predictions_df["prediction"], alpha=0.5)
plt.plot([predictions_df["Price_int"].min(), predictions_df["Price_int"].max()], 
         [predictions_df["Price_int"].min(), predictions_df["Price_int"].max()], 
         color='red', lw=2, linestyle='--')
plt.xlabel("Precio Real")
plt.ylabel("Predicción")
plt.title("Comparación de Predicciones vs. Valores Reales")
plt.grid(True)
plt.show()
