In [47]:
!pip install pyspark



In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, avg
from pyspark.sql.types import DateType, FloatType, DoubleType, IntegerType
from pyspark.ml.feature import StringIndexer, Imputer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.stat import Correlation

In [3]:
spark = SparkSession.builder.master("spark://spark-master:7077").getOrCreate()
df = spark.read.csv('flaskapp/data/hwc.csv', header=True, inferSchema=True)
df.show()

+--------------------+---------------+--------------------+------+----------+---------+----------------+----------------+------------+----------------+--------+------------------+------------------+--------------+---------+------------------+------------------+--------------+-----------------+---------------------------+---------------------------+-----------------------+--------------+------------------------+------------------------+--------------------+-------------+-----------------------+-----------------------+-------------------+-------+-----------------+-----------------+-------------+-------------------+---------+----------+--------+---------+----------+------------+-------------+-------+---------------+---------------+----------+--------------------+--------------------+-------------+-----------------------+-----------------------+-------------------+------+----------------+----------------+------------+--------+------------------+------------------+--------------+-----------

In [7]:
compact_data = df.drop('P_GEO_ALBEDO', 'P_DETECTION_MASS', 'P_DETECTION_RADIUS', 'P_ALT_NAMES', 'P_ATMOSPHERE', 'S_DISC', 'S_MAGNETIC_FIELD',
                 'P_TEMP_MEASURED', 'P_TPERI',
'P_DENSITY', 'P_ESCAPE', 'P_GRAVITY', 'P_POTENTIAL', 'P_OMEGA', 'P_INCLINATION', 'P_IMPACT_PARAMETER', 'P_HILL_SPHERE',
                 'P_MASS')

error_columns = [col for col in compact_data.columns if 'ERROR' in col]
df_cleaned = compact_data.drop(*error_columns)
df_cleaned.show()


+--------------------+---------------+--------------------+------+----------+------------+----------------+--------+--------------+---------+--------------+-----------------+-----------------------+--------------+--------------------+-------------------+-------------+-------------------+---------+----------+--------+---------+----------+------------+-------------+-------+----------+-------------+-------------------+------+------------+--------+--------------+-------------+-------------------+-----+-----------+---------+---------------+-------+-------------+-----------+------------+----------+--------------+------------+------------+------------+------------+----------------+----------------+-----------+---------------+---------------+-----------+-----------+-----------+------------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-----------+-----------+------------+-------------+-------------+-----------+---------

In [8]:
df_cleaned.dtypes

[('P_NAME', 'string'),
 ('P_DETECTION', 'string'),
 ('P_DISCOVERY_FACILITY', 'string'),
 ('P_YEAR', 'int'),
 ('P_UPDATE', 'date'),
 ('P_MASS_LIMIT', 'int'),
 ('P_MASS_ORIGIN', 'string'),
 ('P_RADIUS', 'double'),
 ('P_RADIUS_LIMIT', 'int'),
 ('P_PERIOD', 'double'),
 ('P_PERIOD_LIMIT', 'int'),
 ('P_SEMI_MAJOR_AXIS', 'double'),
 ('P_SEMI_MAJOR_AXIS_LIMIT', 'int'),
 ('P_ECCENTRICITY', 'double'),
 ('P_ECCENTRICITY_LIMIT', 'int'),
 ('P_INCLINATION_LIMIT', 'int'),
 ('P_OMEGA_LIMIT', 'int'),
 ('S_NAME', 'string'),
 ('S_NAME_HD', 'string'),
 ('S_NAME_HIP', 'string'),
 ('S_TYPE', 'string'),
 ('S_RA', 'double'),
 ('S_DEC', 'double'),
 ('S_RA_STR', 'string'),
 ('S_DEC_STR', 'string'),
 ('S_MAG', 'double'),
 ('S_DISTANCE', 'double'),
 ('S_TEMPERATURE', 'double'),
 ('S_TEMPERATURE_LIMIT', 'int'),
 ('S_MASS', 'double'),
 ('S_MASS_LIMIT', 'int'),
 ('S_RADIUS', 'double'),
 ('S_RADIUS_LIMIT', 'int'),
 ('S_METALLICITY', 'double'),
 ('S_METALLICITY_LIMIT', 'int'),
 ('S_AGE', 'double'),
 ('S_AGE_LIMIT', 'i

In [52]:
df_cleaned.count()

5599

In [53]:
#calcul des valeurs null dans chaque colonne du dataFrame
null_counts = df_cleaned.select([count(when(col(c).isNull(), c)).alias(c) for c in df_cleaned.columns])

In [54]:
null_counts.show()

+------+-----------+--------------------+------+--------+------------+-------------+--------+--------------+--------+--------------+-----------------+-----------------------+--------------+--------------------+-------------------+-------------+------+---------+----------+------+----+-----+--------+---------+-----+----------+-------------+-------------------+------+------------+--------+--------------+-------------+-------------------+-----+-----------+---------+---------------+-------+-------------+----------+------------+----------+--------------+------+----------+----------+------------+----------------+----------------+-----------+---------------+---------------+------+-----------+--------+---------+------------+------------+------------+------------+------------+-------------+-------------+-------------+-------------+-----------+-----------+------------+-------------+-------------+-----------+-----------+-----+---------------+-------------------+-------------------+
|P_NAME|P_DETEC

In [56]:
mean_values = {
    col_name: df_cleaned.select(avg(col_name)).first()[0]
    for col_name, dtype in df_cleaned.dtypes
    if dtype != 'string' and not isinstance(df_cleaned.schema[col_name].dataType, DateType)
}

# Remplir les valeurs manquantes avec les moyennes calculées
compact_data = df_cleaned.na.fill(mean_values)

In [57]:
compact_data.show()

+--------------------+---------------+--------------------+------+----------+------------+----------------+--------+--------------+----------------+--------------+-----------------+-----------------------+-------------------+--------------------+-------------------+-------------+-------------------+---------+----------+--------+---------+----------+------------+-------------+------------------+-----------------+-----------------+-------------------+------+------------+----------------+--------------+--------------------+-------------------+-----------------+-----------+--------------------+---------------+----------------+-------------+-----------+------------+----------+--------------+-----------------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+-----------+-----------+-----------+------------+-----------------+-----------------+----------------+------------------+------------------+-----

# Convert Categorical Features to Numerical

In [60]:
# Identifier les colonnes catégorielles
categorical_columns = [col_name for col_name, dtype in compact_data.dtypes if dtype == 'string']

# Créer un indexeur pour chaque colonne catégorielle
indexers = [StringIndexer(inputCol=column, outputCol=column + "_indexed").setHandleInvalid("keep") for column in categorical_columns]

# Créer le pipeline
pipeline = Pipeline(stages=indexers)

# Appliquer le pipeline au DataFrame
compact_data = pipeline.fit(compact_data).transform(compact_data)

#supprimer les anciennes colonnes catégorielles si nécessaire
for column in categorical_columns:
    compact_data = compact_data.drop(column)

In [63]:
# Identifier les colonnes numériques pour l'imputation
numeric_cols = [col_name for col_name, dtype in compact_data.dtypes if dtype in ['int', 'double']]

# Créer un imputer pour les colonnes numériques identifiées
imputer = Imputer(
    inputCols=numeric_cols,
    outputCols=["{}_imputed".format(col_name) for col_name in numeric_cols]
).setStrategy("mean")  # ou "median" selon le cas d'usage

# Appliquer l'imputation
compact_data = imputer.fit(compact_data).transform(compact_data)

compact_data.show()

+------+----------+------------+--------+--------------+----------------+--------------+-----------------+-----------------------+-------------------+--------------------+-------------------+-------------+---------+----------+------------------+-----------------+-----------------+-------------------+------+------------+----------------+--------------+--------------------+-------------------+-----------------+-----------+--------------------+---------------+----------------+-------------+-----------+------------+----------+--------------+-----------------+----------------+-----------------+-----------------+-----------------+-----------------+-----------------+-----------------+----------------+-----------------+-----------------+----------------+------------------+------------------+------------------+------------------+------------------+------------------+------------------+--------------------+------------+-------------+-------------+-----------+-------------------+--------------+--

# Removing Multicollinearity

In [65]:
# Identifier les colonnes numériques
numeric_cols = [col_name for col_name, dtype in compact_data.dtypes if dtype in ['int', 'double']]

# Assembler les colonnes numériques dans un vecteur
assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
feature_vector_df = assembler.transform(compact_data)

# Calculer la matrice de corrélation
correlation_matrix = Correlation.corr(feature_vector_df, "features").head()[0]

# Extraire la matrice de corrélation en tant que tableau Numpy (si nécessaire)
correlation_array = correlation_matrix.toArray()

# Afficher la matrice de corrélation
print(correlation_array)

[[ 1.          0.00902213 -0.21140022 ... -0.00558671 -0.00558671
  -0.00619957]
 [ 0.00902213  1.         -0.05395405 ... -0.01573223 -0.01573223
  -0.01567132]
 [-0.21140022 -0.05395405  1.         ...  0.44583817  0.44583817
   0.44617999]
 ...
 [-0.00558671 -0.01573223  0.44583817 ...  1.          1.
   0.99989613]
 [-0.00558671 -0.01573223  0.44583817 ...  1.          1.
   0.99989613]
 [-0.00619957 -0.01567132  0.44617999 ...  0.99989613  0.99989613
   1.        ]]


In [66]:
import numpy as np

# Seuil de corrélation haute pour retirer les colonnes
high_corr_threshold = 0.9
cols_to_remove = []

for i in range(len(correlation_array)):
    for j in range(i+1, len(correlation_array)):
        if abs(correlation_array[i, j]) > high_corr_threshold:
            cols_to_remove.append(numeric_cols[j])

# Supprimer les colonnes sélectionnées
cols_to_remove = list(set(cols_to_remove))  # Enlever les doublons
compact_data = compact_data.drop(*cols_to_remove)

# Removal of Outliers

In [69]:
# Calculer Q1 et Q3 pour chaque colonne numérique et en déduire l'IQR
quantile_probs = [0.25, 0.75]
rel_error = 0.05  # Erreur relative pour le calcul approximatif des quantiles

# Calculer les quantiles pour chaque colonne numérique
Q1 = {col_name: compact_data.approxQuantile(col_name, [0.25], rel_error)[0] for col_name in compact_data.columns if compact_data.schema[col_name].dataType in [FloatType(), DoubleType(), IntegerType()]}
Q3 = {col_name: compact_data.approxQuantile(col_name, [0.75], rel_error)[0] for col_name in compact_data.columns if compact_data.schema[col_name].dataType in [FloatType(), DoubleType(), IntegerType()]}

# Calculer l'IQR pour chaque colonne numérique
IQR = {col_name: Q3[col_name] - Q1[col_name] for col_name in Q1}

# Filtrer les outliers
for col_name in IQR:
    lower_bound = Q1[col_name] - 1.5 * IQR[col_name]
    upper_bound = Q3[col_name] + 1.5 * IQR[col_name]
    compact_data = compact_data.filter((col(col_name) >= lower_bound) & (col(col_name) <= upper_bound))

# Final state of dataframe

In [70]:
compact_data.show()

+------+----------+------------+--------+--------------+---------+--------------+-----------------+-----------------------+-------------------+--------------------+-------------------+-------------+---------+----------+------+----------+-------------+-------------------+------+------------+--------+--------------+-------------+-------------------+-----------------+-----------+---------+---------------+-------+---------+------------+-----------------+------------+------------+-----------+------------+-------------+-------------+-----------+-----------+--------------+-------------------+----------------------------+---------------------+--------------+-----------------+------------------+--------------+----------------+-----------------+--------------+-------------------+-------------------+-----------------------+---------------------+--------------------+----------------------+-----------------------+-----------------------------+----------------------------+---------------------------