# Criando sessão Spark e Dataframe com o dataset titanic

In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.master("local[*]")
    .appName("sparkML")
    .getOrCreate()
)

df = spark.read.csv(
    "titanic.csv",
    header=True,
    inferSchema=True,
)

df.show()

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/21 20:02:41 WARN Utils: Your hostname, spark-bigdata, resolves to a loopback address: 127.0.1.1; using 192.168.0.9 instead (on interface enp0s3)
25/07/21 20:02:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/21 20:02:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
                                                                                

+--------+------+-----+----+
|survived|status|  age| sex|
+--------+------+-----+----+
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
|     yes| first|adult|male|
+--------+------+-----+----+
only showing top 20 rows


# Trasformando campos string em indices

In [2]:
from pyspark.ml.feature import StringIndexer

string_indexer_columns = [col for col in df.columns]
string_indexer = StringIndexer(inputCols=string_indexer_columns, outputCols=[f"{col}_index" for col in string_indexer_columns])
model_string_indexer = string_indexer.fit(df)
df = model_string_indexer.transform(df)

df.show()

                                                                                

+--------+------+-----+----+--------------+------------+---------+---------+
|survived|status|  age| sex|survived_index|status_index|age_index|sex_index|
+--------+------+-----+----+--------------+------------+---------+---------+
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|

# Tratando campos categóricos

In [3]:
from pyparsing import col
from pyspark.ml.feature import OneHotEncoder

one_hot_encoder_columns = [col for col in df.columns if col != "survived_index" and col.endswith("_index")]

one_hot_encoder = OneHotEncoder(inputCols=one_hot_encoder_columns, outputCols=[f"{col}_ohe" for col in one_hot_encoder_columns])
model_one_hot_encoder = one_hot_encoder.fit(df)
df = model_one_hot_encoder.transform(df)

df.show()

+--------+------+-----+----+--------------+------------+---------+---------+----------------+-------------+-------------+
|survived|status|  age| sex|survived_index|status_index|age_index|sex_index|status_index_ohe|age_index_ohe|sex_index_ohe|
+--------+------+-----+----+--------------+------------+---------+---------+----------------+-------------+-------------+
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|
|     yes| first|adult|m

# Criando vetor das features

In [4]:
from pyspark.ml.feature import VectorAssembler

vector_assembler = VectorAssembler(
    inputCols=[col for col in df.columns if col.endswith("_ohe")],
    outputCol="features")

df = vector_assembler.transform(df)

df.show()

+--------+------+-----+----+--------------+------------+---------+---------+----------------+-------------+-------------+--------------------+
|survived|status|  age| sex|survived_index|status_index|age_index|sex_index|status_index_ohe|age_index_ohe|sex_index_ohe|            features|
+--------+------+-----+----+--------------+------------+---------+---------+----------------+-------------+-------------+--------------------+
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|[0.0,0.0,1.0,1.0,...|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|[0.0,0.0,1.0,1.0,...|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|[0.0,0.0,1.0,1.0,...|
|     yes| first|adult|male|           1.0|         2.0|      0.0|      0.0|   (3,[2],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|[0.0,0.0,1.0,1.0,...|

# Treinamento modelo Random Florest

In [5]:
from pyspark.ml.classification import RandomForestClassifier

(training_data, test_data) = df.randomSplit([0.7, 0.3])

random_forest = RandomForestClassifier(labelCol="survived_index", featuresCol="features")

modelo_treinado = random_forest.fit(training_data)

df_previsao = modelo_treinado.transform(test_data)

df_previsao.show()

[Stage 24:>                                                         (0 + 1) / 1]

+--------+------+-----+------+--------------+------------+---------+---------+----------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|survived|status|  age|   sex|survived_index|status_index|age_index|sex_index|status_index_ohe|age_index_ohe|sex_index_ohe|            features|       rawPrediction|         probability|prediction|
+--------+------+-----+------+--------------+------------+---------+---------+----------------+-------------+-------------+--------------------+--------------------+--------------------+----------+
|      no|  crew|adult|female|           0.0|         0.0|      0.0|      1.0|   (3,[0],[1.0])|(1,[0],[1.0])|    (1,[],[])| (5,[0,3],[1.0,1.0])|[2.81541705786445...|[0.14077085289322...|       1.0|
|      no|  crew|adult|  male|           0.0|         0.0|      0.0|      0.0|   (3,[0],[1.0])|(1,[0],[1.0])|(1,[0],[1.0])|[1.0,0.0,0.0,1.0,...|[15.7765242749810...|[0.78882621374905...|       0.0|
|      no|

                                                                                

# Avaliação do modelo

In [6]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(
    labelCol="survived_index",
    predictionCol="prediction",
    metricName="f1"
)

f1_score = evaluator.evaluate(df_previsao)
print(f"F1-Score do modelo: {f1_score:.4f}")

F1-Score do modelo: 0.7836


# Treinamento modelo Logistic Regression

In [7]:
from pyspark.ml.classification import LogisticRegression

# Instanciando o modelo de regressão logística
logistic_regression = LogisticRegression(labelCol="survived_index", featuresCol="features")

# Treinando o modelo
modelo_logistic = logistic_regression.fit(training_data)

# Fazendo previsões
df_previsao_logistic = modelo_logistic.transform(test_data)

# Avaliando o modelo
f1_score_logistic = evaluator.evaluate(df_previsao_logistic)
print(f"F1-Score do modelo Logistic Regression: {f1_score_logistic:.4f}")

F1-Score do modelo Logistic Regression: 0.7780
