### Stellar Classification Dataset - SDSS17

**1. CARGA DE DATOS**

In [2]:
from pyspark.sql import SparkSession

# Crear una instancia de SparkSession
spark = SparkSession.builder.getOrCreate()

# Cargar el dataset como DataFrame
df = spark.read.csv(r"file:///home/jovyan/work/star_classification.csv", header=True, inferSchema=True)

# estructura del DataFrame
df.printSchema()

# 5 primeras filas
df.show(5)


root
 |-- obj_ID: double (nullable = true)
 |-- alpha: double (nullable = true)
 |-- delta: double (nullable = true)
 |-- u: double (nullable = true)
 |-- g: double (nullable = true)
 |-- r: double (nullable = true)
 |-- i: double (nullable = true)
 |-- z: double (nullable = true)
 |-- run_ID: integer (nullable = true)
 |-- rerun_ID: integer (nullable = true)
 |-- cam_col: integer (nullable = true)
 |-- field_ID: integer (nullable = true)
 |-- spec_obj_ID: double (nullable = true)
 |-- class: string (nullable = true)
 |-- redshift: double (nullable = true)
 |-- plate: integer (nullable = true)
 |-- MJD: integer (nullable = true)
 |-- fiber_ID: integer (nullable = true)

+--------------------+----------------+------------------+--------+--------+--------+--------+--------+------+--------+-------+--------+--------------------+------+---------+-----+-----+--------+
|              obj_ID|           alpha|             delta|       u|       g|       r|       i|       z|run_ID|rerun_ID|cam_co

In [2]:
# numero de filas
num_rows = df.count()

# numero de columnas
num_columns = len(df.columns)

print("Número de filas: ", num_rows)
print("Número de columnas: ", num_columns)


Número de filas:  100000
Número de columnas:  18


In [None]:
# valores unicos de la columna class
class_values = df.select("class").distinct()

print("Valores de la columna 'class': ")
class_values.show()

**2. Limpieza de valores nulos:**

Se utiliza una expresión lambda con la función select() para aplicar sum(col(column).isNull().cast("int")) a cada columna del DataFrame. 
Esto cuenta el número de valores nulos en cada columna y asigna el nombre de la columna a través de alias(column). 
Luego utilizamos show() para mostrar el resultado.

In [7]:
from pyspark.sql.functions import col, sum

# numero total de nulos por columna
null_counts = df.select([sum(col(column).isNull().cast("int")).alias(column) for column in df.columns])
null_counts.show()


+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+
|obj_ID|alpha|delta|  u|  g|  r|  i|  z|run_ID|rerun_ID|cam_col|field_ID|spec_obj_ID|class|redshift|plate|MJD|fiber_ID|
+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+
|     0|    0|    0|  0|  0|  0|  0|  0|     0|       0|      0|       0|          0|    0|       0|    0|  0|       0|
+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+



In [9]:
# eliminar filas con valores faltantes en cualquier columna
df = df.dropna()

**3. Limpieza de valores duplicados:**

In [10]:
# numero total de filas en el DataFrame
total_rows = df.count()
print(f"Filas totales en el DataFrame: {total_rows}")

# numero de filas despues de eliminar los duplicados
unique_rows = df.dropDuplicates().count()
print(f"Filas únicas en el DataFrame: {unique_rows}")

# verificar si hay datos duplicados
if total_rows == unique_rows:
    print("No hay duplicados en el DataFrame.")
else:
    print(f"Hay {total_rows-unique_rows} duplicados en el DataFrame.")


Filas totales en el DataFrame: 100000
Filas únicas en el DataFrame: 100000
No hay duplicados en el DataFrame.


In [11]:
# se muestran los datos duplicados
duplicated_data = df.exceptAll(df.dropDuplicates())
duplicated_data.show()

print(f"Hay {total_rows-unique_rows} filas duplicadas en el DataFrame.")


+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+
|obj_ID|alpha|delta|  u|  g|  r|  i|  z|run_ID|rerun_ID|cam_col|field_ID|spec_obj_ID|class|redshift|plate|MJD|fiber_ID|
+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+
+------+-----+-----+---+---+---+---+---+------+--------+-------+--------+-----------+-----+--------+-----+---+--------+

Hay 0 filas duplicadas en el DataFrame.


In [16]:
# valores unicos de la columna class
class_values = df.select("class").distinct()

print("Valores de la columna 'class': ")
class_values.show()

Valores de la columna 'class': 
+------+
| class|
+------+
|GALAXY|
|   QSO|
|  STAR|
+------+



**Antes de ejecutar la siguiente celda que se conecta a MySQL y crea la tabla e inserta los 100.000 registros, es necesario crear un nuevo usuario ("user1") en MySQL.**

In [3]:
# this cell takes 1-2mins
url ="jdbc:mysql://192.168.1.42:3306/stellar_class"
tabla_stars = "stars"
propiedades = { 
    "user": "user1",
    "password": "abc123.",
    "driver": "com.mysql.jdbc.Driver"
}

df.select("obj_ID","alpha","delta","u","g","r","i","z","run_ID","rerun_ID","cam_col","field_ID","spec_obj_ID","class","redshift","plate","MJD","fiber_ID").write.format("jdbc").option("url", url).option("dbtable", tabla_stars).option("user", propiedades["user"]).option("password", propiedades["password"]).option("driver", propiedades["driver"]).save()