In [0]:
%pyspark
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType, IntegerType, FloatType, DoubleType
from pyspark.sql.functions import col, when, isnull, count
from pyspark.ml.feature import VectorAssembler, StandardScaler, OneHotEncoder, StringIndexer
from pyspark.ml import Pipeline
from pyspark.sql.functions import col
from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, GBTRegressor, RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator

%matplotlib inline


In [1]:
%pyspark
df = spark.read.csv("/data/co2.csv", header=True, inferSchema=True)
df.show(5)


In [2]:
%pyspark
rows, cols = df.count(), len(df.columns)
print(f"Nombre de lignes: {rows}, Nombre de colonnes: {cols}")

In [3]:
%pyspark
df.describe().show()

In [4]:
%pyspark
df.printSchema()

In [5]:
%pyspark
df_CO2 = df
df_CO2_pd = df_CO2.toPandas()


In [6]:
%pyspark
sns.pairplot(df_CO2_pd, kind="reg", diag_kind="kde", diag_kws={"color": "red"}, plot_kws={"line_kws": {"color": "red"}})

In [7]:
%pyspark

makes = df_CO2.select('Make').distinct().rdd.flatMap(lambda x: x).collect()

co2_means = []
for make in makes:
    co2_mean = df_CO2.filter(df_CO2['Make'] == make).agg({'CO2_Emissions': 'avg'}).collect()[0][0]
    co2_means.append(co2_mean)

sorted_indices = np.argsort(co2_means)
makes_sorted = [makes[i] for i in sorted_indices]
co2_means_sorted = [co2_means[i] for i in sorted_indices]

num_makes = len(makes_sorted)
fig_height = max(1, num_makes * 0.2) 

plt.figure(figsize=(8, fig_height))
bars = plt.barh(makes_sorted, co2_means_sorted, color='skyblue')
plt.title('Émissions moyennes de CO2 par marque', fontsize=16)
plt.xlabel('Émissions de CO2 (g/km)', fontsize=14)
plt.ylabel('Marque', fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)

# Ajouter les valeurs au-dessus de chaque barre
for bar in bars:
    plt.text(bar.get_width(), bar.get_y() + bar.get_height() / 2, 
             f'{bar.get_width():.2f}', va='center', ha='left', fontsize=10)

plt.tight_layout()
plt.show()


In [8]:
%pyspark
numerical_vars = ['Engine_Size', 'Cylinders', 'Fuel_Consumption_City', 'Fuel_Consumption_Hwy', 'Fuel_Consumption_Comb9', 'CO2_Emissions']

plt.figure(figsize=(12, 8))

for i, var in enumerate(numerical_vars, 1):
    plt.subplot(2, 3, i)
    sns.histplot(data=df_CO2_pd, x=var, kde=True)
    plt.title(f'Boxplot of {var}')

plt.tight_layout()
plt.show()


In [9]:
%pyspark
plt.figure(figsize=(12, 8))

for i, var in enumerate(numerical_vars, 1):
    plt.subplot(2, 3, i)
    sns.boxplot(data=df_CO2_pd, y=var)
    plt.title(f'Boxplot of {var}')

plt.tight_layout()
plt.show()

In [10]:
%pyspark

df_CO2_grouped = df_CO2.groupBy('Vehicle_Class').avg('CO2_Emissions')
df_CO2_grouped_pd = df_CO2_grouped.toPandas()

plt.figure(figsize=(12, 6))
sns.barplot(data=df_CO2_grouped_pd, x='Vehicle_Class', y='avg(CO2_Emissions)')
plt.title('Average CO2_Emissions by Vehicle_Class')
plt.xticks(rotation=45)
plt.show()


In [11]:
%pyspark

plt.figure(figsize=(8, 6))
sns.scatterplot(data=df_CO2_pd, x='Engine_Size', y='CO2_Emissions', hue='Cylinders', palette='viridis', alpha=0.7)
plt.title('Taille du moteur par rapport aux émissions de CO2 ')
plt.xlabel('Engine_Size(L)')
plt.ylabel('CO2_Emissions(g/km)')
plt.show()


In [12]:
%pyspark
missing_values = df.select([count(when(isnull(c), c)).alias(c) for c in df.columns])
missing_values.show()


In [13]:
%pyspark

numeric_columns = [col for col, dtype in df.dtypes if dtype in ['int', 'double']]
numeric_df_CO2 = df.select(numeric_columns)

correlation_matrix = numeric_df_CO2.toPandas().corr()

# Tracer la matrice de corrélation
plt.figure(figsize=(6, 4))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', fmt=".2f")
plt.title('Matrice de Corrélation')
plt.show()


In [15]:
%pyspark

cat_cols = [col for col in df_CO2.columns if isinstance(df_CO2.schema[col].dataType, StringType)]
num_cols = [col for col in df_CO2.columns if isinstance(df_CO2.schema[col].dataType, (IntegerType, FloatType, DoubleType))]

print("Categorical Variables:")
print(cat_cols)
print("\n")
print("Numerical Variables:")
print(num_cols)


In [16]:
%pyspark
columns_to_drop = ['Make', 'Model', 'Vehicle_Class', 'Transmission']
df_CO2 = df_CO2.drop(*columns_to_drop)
df_CO2.show(5)

In [17]:
%pyspark

from pyspark.sql.functions import when

# Liste des types de carburants uniques
fuel_types = df_CO2.select("Fuel_Type").distinct().rdd.flatMap(lambda x: x).collect()

# Ajouter des colonnes pour chaque type de carburant (one-hot encoding)
for fuel in fuel_types:
    df_CO2 = df_CO2.withColumn(f"Fuel_Type_{fuel}", when(df_CO2["Fuel_Type"] == fuel, True).otherwise(False))

df_CO2_ft = df_CO2.drop("Fuel_Type")
df_CO2_ft.show()


In [18]:
%pyspark
# X = df_CO2.drop('CO2_Emissions')
X = df_CO2_ft.drop('CO2_Emissions') 
y = df_CO2_ft.select('CO2_Emissions')
assembler = VectorAssembler(inputCols=X.columns, outputCol="features")
df_transformed = assembler.transform(df_CO2_ft)
train_data, test_data = df_transformed.randomSplit([0.7, 0.3], seed=42)

print(f"Ensemble d'entraînement : {train_data.count()} échantillons")
print(f"Ensemble de test : {test_data.count()} échantillons")


In [19]:
%pyspark
lr = LinearRegression(featuresCol="features", labelCol="CO2_Emissions")
dt = DecisionTreeRegressor(featuresCol="features", labelCol="CO2_Emissions")
rf = RandomForestRegressor(featuresCol="features", labelCol="CO2_Emissions")
gbt = GBTRegressor(featuresCol="features", labelCol="CO2_Emissions")

In [20]:
%pyspark
def train_and_evaluate(model, train_data, test_data):
    model_fitted = model.fit(train_data)
    
    predictions = model_fitted.transform(test_data)
    
    evaluator = RegressionEvaluator(labelCol="CO2_Emissions", predictionCol="prediction")
    mse = evaluator.setMetricName("mse").evaluate(predictions)
    rmse = evaluator.setMetricName("rmse").evaluate(predictions)
    
    return mse, rmse


In [21]:
%pyspark
results = []
for model, model_name in zip([lr, dt, rf, gbt], 
                             ["LinearRegression", "DecisionTree", "RandomForest", "GradientBoosting"]):
    mse, rmse = train_and_evaluate(model, train_data, test_data)
    results.append((model_name, mse, rmse))

results_df = pd.DataFrame(results, columns=["Model", "MSE", "RMSE"])
print(results_df)

In [22]:
%pyspark
plt.figure(figsize=(8, 6))
plt.bar(results_df["Model"], results_df["RMSE"], color='skyblue')
plt.xlabel('Modèles')
plt.ylabel('RMSE')
plt.title('Comparaison des modèles de régression')
plt.xticks(rotation=45)
plt.show()

In [23]:
%pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("Read-and-write-data-to-Hive-table-spark")
sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession.builder \
    .appName("Hive Connection Diagnostic") \
    .config("hive.metastore.uris", "thrift://projet-hive-metastore-1:9083") \
    .enableHiveSupport() \
    .getOrCreate()
database_name = "co2"
spark.sql(f"CREATE DATABASE IF NOT EXISTS {database_name}")
df = spark.sql("SHOW DATABASES")
df.show(truncate=False)

In [24]:
%pyspark
spark_df = spark.createDataFrame(results_df)
spark.sql("USE co2")
spark_df.write.mode("overwrite").format("parquet").saveAsTable("resultat")
resultat_df = spark.sql("SELECT * FROM co2.resultat")
resultat_df.show()