# Ejercicio Módulo 6 (diamonds.csv)

## 1. Cargar del dataset con Schema.

In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import col

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType

spark = SparkSession.builder.appName("DiamondsAnalysis").getOrCreate()

schema = StructType([
    StructField("carat", FloatType(), True),  
    StructField("cut", StringType(), True), 
    StructField("color", StringType(), True),
    StructField("clarity", StringType(), True),
    StructField("depth", FloatType(), True),  
    StructField("table", FloatType(), True),   
    StructField("price", IntegerType(), True), 
    StructField("x", FloatType(), True),      
    StructField("y", FloatType(), True),      
    StructField("z", FloatType(), True),
])

file_path = "/Users/mariahidalgo/Desktop/Maria Hidalgo - github/Data/diamonds.csv"

df = spark.read.csv(file_path, header=True, schema=schema)

df.show(10)

25/02/28 09:35:21 WARN Utils: Your hostname, MacBook-Air-de-Maria.local resolves to a loopback address: 127.0.0.1; using 192.168.0.22 instead (on interface en0)
25/02/28 09:35:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/02/28 09:35:52 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+-----+---------+-----+-------+-----+-----+-----+----+----+----+
|carat|      cut|color|clarity|depth|table|price|   x|   y|   z|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
| 0.23|    Ideal|    E|    SI2| 61.5| 55.0|  326|3.95|3.98|2.43|
| 0.21|  Premium|    E|    SI1| 59.8| 61.0|  326|3.89|3.84|2.31|
| 0.23|     Good|    E|    VS1| 56.9| 65.0|  327|4.05|4.07|2.31|
| 0.29|  Premium|    I|    VS2| 62.4| 58.0|  334| 4.2|4.23|2.63|
| 0.31|     Good|    J|    SI2| 63.3| 58.0|  335|4.34|4.35|2.75|
| 0.24|Very Good|    J|   VVS2| 62.8| 57.0|  336|3.94|3.96|2.48|
| 0.24|Very Good|    I|   VVS1| 62.3| 57.0|  336|3.95|3.98|2.47|
| 0.26|Very Good|    H|    SI1| 61.9| 55.0|  337|4.07|4.11|2.53|
| 0.22|     Fair|    E|    VS2| 65.1| 61.0|  337|3.87|3.78|2.49|
| 0.23|Very Good|    H|    VS1| 59.4| 61.0|  338| 4.0|4.05|2.39|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+
only showing top 10 rows



## 2. Pipeline regresión 'price' con preprocesados.

### 2.1. Preprocesados.

In [15]:
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.ml.regression import LinearRegression
from pyspark.sql import functions as F

In [5]:
imputer = Imputer(inputCols=['carat', 'depth', 'table', 'x', 'y', 'z', 'price'], 
                  outputCols=['carat_imputed', 'depth_imputed', 'table_imputed', 'x_imputed', 'y_imputed', 'z_imputed', 'price_imputed'])

In [6]:
indexer_cut = StringIndexer(inputCol="cut", outputCol="cut_index")
encoder_cut = OneHotEncoder(inputCol="cut_index", outputCol="cut_encoded")

indexer_color = StringIndexer(inputCol="color", outputCol="color_index")
encoder_color = OneHotEncoder(inputCol="color_index", outputCol="color_encoded")

indexer_clarity = StringIndexer(inputCol="clarity", outputCol="clarity_index")
encoder_clarity = OneHotEncoder(inputCol="clarity_index", outputCol="clarity_encoded")

In [7]:
assembler_features = VectorAssembler(
    inputCols=["carat_imputed", "depth_imputed", "table_imputed", "x_imputed", "y_imputed", "z_imputed"],
    outputCol="features_temp"
)

scaler = StandardScaler(inputCol="features_temp", outputCol="features_scaled", withMean=True, withStd=True)

In [8]:
final_assembler = VectorAssembler(
    inputCols=["features_scaled", "cut_encoded", "color_encoded", "clarity_encoded"],
    outputCol="features"
)

In [11]:
lr = LinearRegression(featuresCol="features", labelCol="price_imputed")

### 2.2. Creación del pipeline.

In [29]:
pipeline = Pipeline(stages=[
    imputer,
    indexer_cut, encoder_cut,
    indexer_color, encoder_color,
    indexer_clarity, encoder_clarity,
    assembler_features, scaler,
    final_assembler,
    lr
])

In [30]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_data)
predictions = model.transform(test_data)


25/02/28 09:57:07 WARN Instrumentation: [a9d71a45] regParam is zero, which might cause numerical instability and overfitting.
25/02/28 09:57:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/02/28 09:57:07 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [31]:
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="price_imputed", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)

print(f"RMSE: {rmse}")

RMSE: 1148.2259293250036


## 3. Pipeline clasificación multiclase sobre variable 'cut' con preprocesados.

### 3.1. Preprocesados.

In [32]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.feature import Imputer, StringIndexer, OneHotEncoder, MinMaxScaler, StandardScaler, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [33]:
imputer = Imputer(inputCols=["carat", "depth", "table", "price", "x", "y", "z"], 
                  outputCols=["carat_imputed", "depth_imputed", "table_imputed", 
                              "price_imputed", "x_imputed", "y_imputed", "z_imputed"]).setStrategy("median")

In [34]:
indexer_cut = StringIndexer(inputCol="cut", outputCol="cut_indexed")

In [35]:
indexer_color = StringIndexer(inputCol="color", outputCol="color_indexed")
indexer_clarity = StringIndexer(inputCol="clarity", outputCol="clarity_indexed")

encoder_color = OneHotEncoder(inputCol="color_indexed", outputCol="color_encoded")
encoder_clarity = OneHotEncoder(inputCol="clarity_indexed", outputCol="clarity_encoded")

In [36]:
assembler = VectorAssembler(inputCols=["carat_imputed", "depth_imputed", "table_imputed",
                                       "price_imputed", "x_imputed", "y_imputed", "z_imputed"],
                            outputCol="features_assembled")

In [37]:
scaler = MinMaxScaler(inputCol="features_assembled", outputCol="features_scaled")

In [38]:
final_assembler = VectorAssembler(inputCols=["features_scaled", "color_encoded", "clarity_encoded"], 
                                  outputCol="features")

### 3.2. Modelo de clasificación.

In [39]:
classifier = RandomForestClassifier(featuresCol="features", labelCol="cut_indexed", numTrees=100)

In [40]:
pipeline = Pipeline(stages=[imputer, indexer_cut, indexer_color, indexer_clarity,
                            encoder_color, encoder_clarity, assembler, scaler, 
                            final_assembler, classifier])

In [None]:
train_data, test_data = df.randomSplit([0.8, 0.2], seed=42)

model = pipeline.fit(train_data)

                                                                                

In [None]:
predictions = model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="cut_indexed", predictionCol="prediction", metricName="accuracy")

accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")

Accuracy: 0.6924564796905223


                                                                                

## 4. Gridsearch con CrossValidation sobre cualquiera de los pipelines.

In [44]:
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
paramGrid = ParamGridBuilder() \
    .addGrid(classifier.numTrees, [50, 100, 150]) \
    .addGrid(classifier.maxDepth, [5, 10, 15]) \
    .build()


In [None]:
crossval = CrossValidator(
    estimator=pipeline, 
    estimatorParamMaps=paramGrid, 
    evaluator=MulticlassClassificationEvaluator(labelCol="cut_indexed", predictionCol="prediction", metricName="accuracy"), 
    numFolds=3, 
    parallelism=2
)

# Nota: Hola Alan, este último apartado genera un error que indica que Spark no tiene suficiente almacenamiento, creo que nunca antes me ha surgido un error similiar.