**Examen 2B Computación Paralela**

Instalación e importación de librerías

In [2]:
#!pip install pyspark #Instalar Apache Spark y sus dependencias
#Librerias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, isnan, when, count, avg
from sklearn.metrics import classification_report
import pandas as pd
import matplotlib.pyplot as plt
#Librerias MLlib
from pyspark.ml.regression import LinearRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator, RegressionEvaluator, ClusteringEvaluator
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, DecisionTreeRegressionModel
from pyspark.ml.clustering import KMeans
from pyspark.ml import Pipeline
from pyspark.ml.feature import PCA


**1. Exploración de Datos:**

Pregunta 1: ¿Cuántos pasajeros hay en el conjunto de datos y cuántas características contiene?


*   Número de pasajeros: 418
*   Número de características: 12



In [3]:
#Carga del archivo csv
spark = SparkSession.builder.appName("Examen2B").getOrCreate()
df = spark.read.csv('titanic.csv', header=True, inferSchema=True)
df.show(5)
df.describe().show()
print("Pregunta 1:")
print("Número de pasajeros:", df.count())
print("Número de características:", len(df.columns))

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|        892|       0|     3|    Kelly, Mr. James|  male|34.5|    0|    0| 330911| 7.8292| NULL|       Q|
|        893|       1|     3|Wilkes, Mrs. Jame...|female|47.0|    1|    0| 363272|    7.0| NULL|       S|
|        894|       0|     2|Myles, Mr. Thomas...|  male|62.0|    0|    0| 240276| 9.6875| NULL|       Q|
|        895|       0|     3|    Wirz, Mr. Albert|  male|27.0|    0|    0| 315154| 8.6625| NULL|       S|
|        896|       1|     3|Hirvonen, Mrs. Al...|female|22.0|    1|    1|3101298|12.2875| NULL|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows

+-------+------------

**2. Preparación de Datos:**

 Pregunta 2: ¿Qué estrategias utilizaste para manejar los valores nulos en las columnas 'Age' y 'Embarked'?

*   Para la columna 'Age', se maneja valor faltante con la media de la edad entre los registros no nulos.
*   Para la columna 'Embarked', se maneja valor faltante con la moda (el valor más frecuente) entre los registros no nulos.

In [4]:
# Manejo de valores nulos en la columna 'Age'
median_age = df.select("Age").dropna().approxQuantile("Age", [0.5], 0.25)[0]
df = df.withColumn("Age", when(col("Age").isNull(), median_age).otherwise(col("Age")))
# Manejo de valores nulos en la columna 'Embarked'
mode_embarked = df.groupBy("Embarked").count().orderBy('count', ascending=False).first()[0]
df = df.withColumn("Embarked", when(col("Embarked").isNull(), mode_embarked).otherwise(col("Embarked")))
# Mostrar la cantidad de valores nulos en cada columna
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns]).show()
# Manejo de valores nulos para columnas numéricas
numero = [c for c, t in df.dtypes if t in ('int', 'double')]
for c in numero:
    prom = df.select(avg(col(c))).collect()[0][0]  # Para obtener el promedio
    df = df.na.fill({c: prom})
# Manejo de valores nulos para columnas de tipo string
dato = [c for c, t in df.dtypes if t == 'string']
df = df.dropna(subset=dato)  # Para eliminar filas con nulos en datos tipo string
df.show()

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   1|  327|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

+-----------+--------+------+--------------------+------+----+-----+-----+-----------+-------+---------------+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|     Ticket|   Fare|          Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-----------+-------+---------------+--------+
|        904|       1|     1|Snyder, Mrs. John...|female|23.0|    1|    0|      21228|82.2667|            B45|       S|
|        906|       1|     1|Chaffee, Mrs. Her...|female|47.0|    1|    0|W.E.P. 5734| 61.175|            E31

**3. Ingeniería de Características:**

Pregunta 3: ¿Cómo codificaste las variables categóricas 'Sex' y 'Embarked' para que pudieran ser utilizadas en el modelo?

Se utilizó la técnica de codificación de índice de cadenas para convertir las variables categóricas 'Sex' y 'Embarked' en valores numéricos. Esta técnica crea un índice para cada categoría única y asigna un valor numérico correspondiente a cada registro.

In [5]:
index = [StringIndexer(inputCol=c, outputCol=c+"_indexed") for c in dato] #Lista de etiquetas
for indexer in index:
    df = indexer.fit(df).transform(df) #Transforma en numéricas las etiquetas
#Codificación y transformación en binario de las etiquetas
encoder = OneHotEncoder(inputCols=[c+"_indexed" for c in dato], outputCols=[c+"_encoded" for c in dato])
df = encoder.fit(df).transform(df)
#Para combinar las columnas, Normaliza las caracteristicas
combina = VectorAssembler(inputCols=[c+"_encoded" for c in dato] + numero, outputCol="features_vector")
df = combina.transform(df)
scaler = StandardScaler(inputCol="features_vector", outputCol="caracteristicaEscalada") #Escalar las caracteristicas (media a cero y desviación estandar a 1)
scaler_model = scaler.fit(df)
df = scaler_model.transform(df) #Ajusta y transforma las caracterisiticas
df.select("caracteristicaEscalada").show()

+----------------------+
|caracteristicaEscalada|
+----------------------+
|  (244,[76,102,162,...|
|  (244,[16,222,236,...|
|  (244,[70,92,160,2...|
|  (244,[64,114,183,...|
|  (244,[10,90,113,1...|
|  (244,[61,90,98,16...|
|  (244,[37,90,116,2...|
|  (244,[52,119,212,...|
|  (244,[18,90,148,1...|
|  (244,[13,124,211,...|
|  (244,[73,90,100,1...|
|  (244,[35,101,165,...|
|  (244,[1,90,137,23...|
|  (244,[17,92,188,2...|
|  (244,[68,90,92,16...|
|  (244,[85,90,134,2...|
|  (244,[36,101,165,...|
|  (244,[65,90,143,2...|
|  (244,[40,91,194,2...|
|  (244,[50,90,91,19...|
+----------------------+
only showing top 20 rows



**4. Entrenamiento del Modelo:**

Pregunta 4: ¿Cuáles fueron los pasos para dividir los datos en conjuntos de entrenamiento y prueba, y qué proporción de los datos se utilizó para cada conjunto?


*   El conjunto de datos se dividio en 70% para entrenamiento y 30% para pruebas
*   La porción de datos utilizados se las establece de forma randomica, en este caso 65 datos para train y 26 para test




In [7]:
train, test = df.randomSplit([0.7, 0.3], seed=2)
print(f"Tamaño del conjunto de entrenamiento: {train.count()}")
print(f"Tamaño del conjunto de prueba: {test.count()}")

Tamaño del conjunto de entrenamiento: 65
Tamaño del conjunto de prueba: 26


Entrenamiento del Modelo con regresión logística

In [8]:
label_column = 'Survived'

if label_column not in df.columns:
    raise ValueError(f"La columna '{label_column}' no existe en el DataFrame.")

# Crear una columna indexada para la etiqueta si es una variable categórica
label_indexer = StringIndexer(inputCol=label_column, outputCol="indexed_label")
label_model = label_indexer.fit(train)
train = label_model.transform(train)
test = label_model.transform(test)
lr = LogisticRegression(featuresCol='caracteristicaEscalada', labelCol='indexed_label')
lr_model = lr.fit(train)

**5. Evaluación del Modelo**

Utilizando la métrica de AUC-ROC

In [9]:
predic = lr_model.transform(test)
predic.select('caracteristicaEscalada', 'indexed_label', 'prediction').show(5)
#Evaluador de clasificación binaria
evaluador = BinaryClassificationEvaluator(labelCol='indexed_label', metricName='areaUnderROC')
auc_roc = evaluador.evaluate(predic) #calcula la métrica AUC-ROC para las predicciones
print(f"Área bajo la curva ROC (AUC-ROC): {auc_roc}")

+----------------------+-------------+----------+
|caracteristicaEscalada|indexed_label|prediction|
+----------------------+-------------+----------+
|  (244,[10,90,113,1...|          0.0|       0.0|
|  (244,[18,90,148,1...|          0.0|       0.0|
|  (244,[68,90,92,16...|          0.0|       1.0|
|  (244,[36,101,165,...|          1.0|       1.0|
|  (244,[24,123,163,...|          1.0|       1.0|
+----------------------+-------------+----------+
only showing top 5 rows

Área bajo la curva ROC (AUC-ROC): 1.0


Métrica para determinar la Precisión del Modelo

Precisión: 0.9615384615384616

In [10]:
evaluador_precision = MulticlassClassificationEvaluator(labelCol='indexed_label', metricName='accuracy')
precision = evaluador_precision.evaluate(predic)
print(f"Precisión del modelo: {precision}")

Precisión del modelo: 0.9615384615384616


Resultado de predicciones del conjunto de prueba

In [11]:
predictions = lr_model.transform(test)
predictions.select('indexed_label', 'prediction', 'probability').show(5)

+-------------+----------+--------------------+
|indexed_label|prediction|         probability|
+-------------+----------+--------------------+
|          0.0|       0.0|[0.99992753844553...|
|          0.0|       0.0|[0.99996594734867...|
|          0.0|       1.0|[0.44289867113317...|
|          1.0|       1.0|[3.69930424608692...|
|          1.0|       1.0|[1.00599836806475...|
+-------------+----------+--------------------+
only showing top 5 rows

