In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Ferramentas") \
    .getOrCreate()

In [3]:
df = spark.read.csv("penguins_size.csv", header=True,inferSchema=True)
df.show(5)

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|              181|       3750|  MALE|
| Adelie|Torgersen|            39.5|           17.4|              186|       3800|FEMALE|
| Adelie|Torgersen|            40.3|             18|              195|       3250|FEMALE|
| Adelie|Torgersen|              NA|             NA|               NA|         NA|    NA|
| Adelie|Torgersen|            36.7|           19.3|              193|       3450|FEMALE|
+-------+---------+----------------+---------------+-----------------+-----------+------+
only showing top 5 rows



In [4]:
df.printSchema()

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- culmen_length_mm: string (nullable = true)
 |-- culmen_depth_mm: string (nullable = true)
 |-- flipper_length_mm: string (nullable = true)
 |-- body_mass_g: string (nullable = true)
 |-- sex: string (nullable = true)



In [6]:
from pyspark.sql.functions import col
from pyspark.sql.types import DoubleType

df = df.withColumn("culmen_length_mm", col("culmen_length_mm").cast(DoubleType()))
df = df.withColumn("culmen_depth_mm", col("culmen_depth_mm").cast(DoubleType()))
df = df.withColumn("flipper_length_mm", col("flipper_length_mm").cast(DoubleType()))
df = df.withColumn("body_mass_g", col("body_mass_g").cast(DoubleType()))
df.printSchema()

root
 |-- species: string (nullable = true)
 |-- island: string (nullable = true)
 |-- culmen_length_mm: double (nullable = true)
 |-- culmen_depth_mm: double (nullable = true)
 |-- flipper_length_mm: double (nullable = true)
 |-- body_mass_g: double (nullable = true)
 |-- sex: string (nullable = true)



In [10]:
df.show(5)

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|
| Adelie|Torgersen|            NULL|           NULL|             NULL|       NULL|    NA|
| Adelie|Torgersen|            36.7|           19.3|            193.0|     3450.0|FEMALE|
+-------+---------+----------------+---------------+-----------------+-----------+------+
only showing top 5 rows



In [11]:
df_limpo = df.dropna()
df_limpo.show(5)

+-------+---------+----------------+---------------+-----------------+-----------+------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|
+-------+---------+----------------+---------------+-----------------+-----------+------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|
| Adelie|Torgersen|            36.7|           19.3|            193.0|     3450.0|FEMALE|
| Adelie|Torgersen|            39.3|           20.6|            190.0|     3650.0|  MALE|
+-------+---------+----------------+---------------+-----------------+-----------+------+
only showing top 5 rows



In [18]:
from pyspark.ml.feature import StringIndexer
indexador_species = StringIndexer(inputCol="species", outputCol="species_idx", handleInvalid="keep")
indexador_island = StringIndexer(inputCol="island", outputCol="island_idx", handleInvalid="keep")
indexador_sex = StringIndexer(inputCol="sex", outputCol="sex_idx", handleInvalid="keep")

df_indexado = indexador_species.fit(df_limpo).transform(df_limpo)
df_indexado = indexador_island.fit(df_limpo).transform(df_indexado)
df_indexado = indexador_sex.fit(df_limpo).transform(df_indexado)

In [20]:
df_indexado.show(3)

+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|species_idx|island_idx|sex_idx|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|        0.0|       2.0|    0.0|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|        0.0|       2.0|    1.0|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|        0.0|       2.0|    1.0|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+
only showing top 3 rows



In [23]:
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(
 inputCols= ["species_idx","island_idx","sex_idx"],
 outputCols = ["species_vec","island_vec","sex_vec"]
)
df_codificado = encoder.fit(df_indexado).transform(df_indexado)

In [24]:
df_codificado.show(3)

+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|species_idx|island_idx|sex_idx|  species_vec|   island_vec|      sex_vec|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|        0.0|       2.0|    0.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[0],[1.0])|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|        0.0|       2.0|    1.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[1],[1.0])|
| Adelie|Torgersen|            40.3|           18.0|            195.0|     3250.0|FEMALE|        0.0|       2.0|    1.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[1],[1.0])|
+-------+---------+---

In [27]:
from pyspark.ml.feature import StandardScaler, VectorAssembler

vetor_numericos = VectorAssembler(
    inputCols= ["culmen_length_mm", "culmen_depth_mm", "flipper_length_mm", "body_mass_g"],
    outputCol= "raw_numericos"
)
df_com_raw = vetor_numericos.transform(df_codificado)

In [28]:
df_com_raw.show(2)

+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+--------------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|species_idx|island_idx|sex_idx|  species_vec|   island_vec|      sex_vec|       raw_numericos|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+--------------------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|        0.0|       2.0|    0.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[0],[1.0])|[39.1,18.7,181.0,...|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|        0.0|       2.0|    1.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[1],[1.0])|[39.5,17.4,186.0,...|
+-------+---------+----------------+---------------+-----------------+----------

In [29]:
scaler = StandardScaler(
    inputCol="raw_numericos",
    outputCol="numericos_padronizados",
    withMean=True,
    withStd=True
    
)
scaler_model = scaler.fit(df_com_raw)
df_escalado  = scaler_model.transform(df_com_raw) 

In [34]:
df_escalado.show()

+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+--------------------+----------------------+
|species|   island|culmen_length_mm|culmen_depth_mm|flipper_length_mm|body_mass_g|   sex|species_idx|island_idx|sex_idx|  species_vec|   island_vec|      sex_vec|       raw_numericos|numericos_padronizados|
+-------+---------+----------------+---------------+-----------------+-----------+------+-----------+----------+-------+-------------+-------------+-------------+--------------------+----------------------+
| Adelie|Torgersen|            39.1|           18.7|            181.0|     3750.0|  MALE|        0.0|       2.0|    0.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[0],[1.0])|[39.1,18.7,181.0,...|  [-0.8832046685650...|
| Adelie|Torgersen|            39.5|           17.4|            186.0|     3800.0|FEMALE|        0.0|       2.0|    1.0|(3,[0],[1.0])|(3,[2],[1.0])|(4,[1],[1.0])|[39.5,17.4

In [32]:
df_escalado.select('raw_numericos', 'numericos_padronizados').show(5,truncate=False)

+------------------------+---------------------------------------------------------------------------------+
|raw_numericos           |numericos_padronizados                                                           |
+------------------------+---------------------------------------------------------------------------------+
|[39.1,18.7,181.0,3750.0]|[-0.8832046685650078,0.7843000691036092,-1.4162715251128077,-0.5633167041965338] |
|[39.5,17.4,186.0,3800.0]|[-0.8099390093207578,0.12600327710160344,-1.0606960871531967,-0.5009690301398301]|
|[40.3,18.0,195.0,3250.0]|[-0.6634076908322577,0.4298325657179143,-0.42066029882589706,-1.1867934447635713]|
|[36.7,19.3,193.0,3450.0]|[-1.3227986240305092,1.08812935771992,-0.5628904740097415,-0.9374027485367562]   |
|[39.3,20.6,190.0,3650.0]|[-0.8465718389428835,1.7464261497219258,-0.776235736785508,-0.6880120523099412]  |
+------------------------+---------------------------------------------------------------------------------+
only showing top 5 

In [36]:
assembler_final = VectorAssembler(
    inputCols=['numericos_padronizados','species_vec','island_vec', 'sex_vec'],
    outputCol="features"
)
df_final = assembler_final.transform(df_escalado) \
            .select("features",'body_mass_g') \
            .withColumnRenamed("body_mass_g", "label")

df_final.show(5,truncate=False)


+-------------------------------------------------------------------------------------------------------------------+------+
|features                                                                                                           |label |
+-------------------------------------------------------------------------------------------------------------------+------+
|(14,[0,1,2,3,4,9,10],[-0.8832046685650078,0.7843000691036092,-1.4162715251128077,-0.5633167041965338,1.0,1.0,1.0]) |3750.0|
|(14,[0,1,2,3,4,9,11],[-0.8099390093207578,0.12600327710160344,-1.0606960871531967,-0.5009690301398301,1.0,1.0,1.0])|3800.0|
|(14,[0,1,2,3,4,9,11],[-0.6634076908322577,0.4298325657179143,-0.42066029882589706,-1.1867934447635713,1.0,1.0,1.0])|3250.0|
|(14,[0,1,2,3,4,9,11],[-1.3227986240305092,1.08812935771992,-0.5628904740097415,-0.9374027485367562,1.0,1.0,1.0])   |3450.0|
|(14,[0,1,2,3,4,9,10],[-0.8465718389428835,1.7464261497219258,-0.776235736785508,-0.6880120523099412,1.0,1.0,1.0])  |3650.0|


In [37]:
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

In [46]:
treino, teste = df_final.randomSplit([0.8,0.2],seed=42)

In [56]:
rf1 = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    maxDepth=1,
    numTrees=10,
    seed=42
)

In [39]:
evaluator_rmse_rf1 = RegressionEvaluator(
    labelCol= "label",
    predictionCol="prediction",
    metricName="rmse"
)
evaluator_r2_rf1 = RegressionEvaluator(
    labelCol= "label",
    predictionCol="prediction",
    metricName="r2"
)

In [51]:
modelo1 = rf1.fit(treino)

In [52]:
previsoes1 = modelo1.transform(teste)

In [44]:
rsme1 = evaluator_rmse_rf1.evaluate(previsoes1)
r2 = evaluator_r2_rf1.evaluate(previsoes1)

In [45]:
print(f"RMSE do modelo1: {rsme1:.2f}")
print(f"R2 do modelo1: {r2:.2f}")

RMSE do modelo1: 69.32
R2 do modelo1: 0.99


In [49]:
rsme1 = evaluator_rmse_rf1.evaluate(previsoes1)
r2 = evaluator_r2_rf1.evaluate(previsoes1)
print(f"RMSE do modelo1: {rsme1:.2f}")
print(f"R2 do modelo1: {r2:.2f}")

RMSE do modelo1: 154.04
R2 do modelo1: 0.96


In [53]:
rsme1 = evaluator_rmse_rf1.evaluate(previsoes1)
r2 = evaluator_r2_rf1.evaluate(previsoes1)
print(f"RMSE do modelo1: {rsme1:.2f}")
print(f"R2 do modelo1: {r2:.2f}")

RMSE do modelo1: 504.99
R2 do modelo1: 0.60


In [54]:
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder


In [55]:
rf2 = RandomForestRegressor(
    featuresCol="features",
    labelCol="label",
    seed=42
)

In [57]:
param_grid = ParamGridBuilder() \
    .addGrid(rf2.maxDepth,[1,3,5,10,15]) \
    .addGrid(rf2.numTrees,[1,5,10,20]) \
    .build()


In [58]:
evaluator_novo = RegressionEvaluator(
    labelCol= "label",
    predictionCol="prediction",
    metricName="rmse"
)

In [62]:
tvs = TrainValidationSplit(
    estimator=rf2,
    evaluator=evaluator_novo,
    estimatorParamMaps=param_grid,
    trainRatio=0.8,
    parallelism=2
)

In [63]:
tvs_modelo = tvs.fit(treino)

In [65]:
predicoes = tvs_modelo.transform(teste)

In [68]:
evaluator_r2_novo = RegressionEvaluator(
    labelCol= "label",
    predictionCol="prediction",
    metricName="r2"
)
r2_grid = evaluator_r2_novo.evaluate(predicoes)
rsme_grid = evaluator_novo.evaluate(predicoes)


In [69]:
print(f"RMSE do modelo1: {rsme_grid:.2f}")
print(f"R2 do modelo1: {r2_grid:.2f}")

RMSE do modelo1: 88.47
R2 do modelo1: 0.99


In [70]:
melhor_modelo = tvs_modelo.bestModel

In [73]:
print("\n Melhor Combinacao de Hiperparametros:")
print(f" -> MaxDepth: {melhor_modelo.getOrDefault('maxDepth')} ")
print(f" -> NumTrees: {melhor_modelo.getOrDefault('numTrees') } ")


 Melhor Combinacao de Hiperparametros:
 -> MaxDepth: 5 
 -> NumTrees: 1 


In [78]:
from pyspark.ml.tuning import CrossValidator

rf = RandomForestRegressor(featuresCol="features",labelCol="label",seed=42)

In [79]:
param_grid = ParamGridBuilder() \
        .addGrid(rf.maxDepth,[3,5,10])\
        .addGrid(rf.numTrees,[10,20])\
        .build()

In [80]:
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")


In [83]:
cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=param_grid,
    evaluator=evaluator,
    numFolds=3,
    parallelism=2
)

In [84]:
modelo_cv = cv.fit(df_final)

In [85]:
predicoes = modelo_cv.transform(df_final)

In [86]:
rmse = evaluator.evaluate(predicoes)
r2 = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="r2").evaluate(predicoes)


In [88]:
melhor_modelo = modelo_cv.bestModel
print(f"RMSE do melhor Modelo: {rsme_grid:.2f}")
print(f"R2 do Melohor Modelo : {r2_grid:.2f}")
print("\n Melhor Combinacao de Hiperparametros:")
print(f" -> MaxDepth: {melhor_modelo.getOrDefault('maxDepth')} ")
print(f" -> NumTrees: {melhor_modelo.getOrDefault('numTrees') } ")

RMSE do melhor Modelo: 88.47
R2 do Melohor Modelo : 0.99

 Melhor Combinacao de Hiperparametros:
 -> MaxDepth: 10 
 -> NumTrees: 20 
