In [1]:
!pip install plotly



# **Import Libraries**

In [2]:
from pyspark.sql.functions import col, avg, sum,count,round,dayofweek, desc, when
from pyspark.sql.types import IntegerType, FloatType
from pyspark.sql.window import Window
import plotly.express as px
import seaborn as sns
import pandas as pd
import matplotlib.pyplot as plt

# **Init SparkSession**

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("Test HDFS") \
    .getOrCreate()

https://drive.google.com/file/d/1uzIrvqLTBSYrcB2XmJZD7bbdNjuAvFY_/view?usp=sharing


# **Chargement CSV**

In [4]:
flights_bronze = spark.read.csv("hdfs://namenode:8020/user/data/flights.csv", header=True, inferSchema=True)
airlines_bronze = spark.read.csv("hdfs://namenode:8020/user/data/airlines.csv", header=True, inferSchema=True)
airports_bronze = spark.read.csv("hdfs://namenode:8020/user/data/airports.csv", header=True, inferSchema=True)

**Afficher 10 lignes**

In [6]:
   flights_bronze.limit(10).toPandas()

Unnamed: 0,YEAR,MONTH,DAY,DAY_OF_WEEK,AIRLINE,FLIGHT_NUMBER,TAIL_NUMBER,ORIGIN_AIRPORT,DESTINATION_AIRPORT,SCHEDULED_DEPARTURE,...,ARRIVAL_TIME,ARRIVAL_DELAY,DIVERTED,CANCELLED,CANCELLATION_REASON,AIR_SYSTEM_DELAY,SECURITY_DELAY,AIRLINE_DELAY,LATE_AIRCRAFT_DELAY,WEATHER_DELAY
0,2015,1,1,4,AS,98,N407AS,ANC,SEA,5,...,408,-22,0,0,,,,,,
1,2015,1,1,4,AA,2336,N3KUAA,LAX,PBI,10,...,741,-9,0,0,,,,,,
2,2015,1,1,4,US,840,N171US,SFO,CLT,20,...,811,5,0,0,,,,,,
3,2015,1,1,4,AA,258,N3HYAA,LAX,MIA,20,...,756,-9,0,0,,,,,,
4,2015,1,1,4,AS,135,N527AS,SEA,ANC,25,...,259,-21,0,0,,,,,,
5,2015,1,1,4,DL,806,N3730B,SFO,MSP,25,...,610,8,0,0,,,,,,
6,2015,1,1,4,NK,612,N635NK,LAS,MSP,25,...,509,-17,0,0,,,,,,
7,2015,1,1,4,US,2013,N584UW,LAX,CLT,30,...,753,-10,0,0,,,,,,
8,2015,1,1,4,AA,1112,N3LAAA,SFO,DFW,30,...,532,-13,0,0,,,,,,
9,2015,1,1,4,DL,1173,N826DN,LAS,ATL,30,...,656,-15,0,0,,,,,,


# Data Prep


**Casting et nettoyage**


In [7]:
# flights
cols_to_cast = [
    "DEPARTURE_DELAY", "TAXI_OUT", "ELAPSED_TIME", "AIR_TIME",
    "DISTANCE", "TAXI_IN", "ARRIVAL_DELAY",
    "AIR_SYSTEM_DELAY", "SECURITY_DELAY", "AIRLINE_DELAY",
    "LATE_AIRCRAFT_DELAY", "WEATHER_DELAY"
]

flights_df = flights_bronze.select(*[
    col(c).cast(IntegerType()) if c in cols_to_cast else col(c)
    for c in flights_df.columns
])


#airports
cols_to_cast_airports = ["LATITUDE", "LONGITUDE"]
airports_df = airlines_bronze.select(*[
    col(c).cast(FloatType()) if c in cols_to_cast_airports else col(c)
    for c in airports_df.columns
]).dropna(subset=["IATA_CODE"]).dropDuplicates(["IATA_CODE"])

#Suppression Douplon Airlines
airlines_df = airlines_df.dropna(subset=["IATA_CODE", "AIRLINE"]).dropDuplicates(["IATA_CODE"])

NameError: name 'flights_df' is not defined

# Analytics

**Retards moyens par compagnie**

In [None]:
from pyspark.sql.functions import avg

avg_delay_by_airline = flights_df.groupBy("AIRLINE").agg(
    round(avg("DEPARTURE_DELAY"), 2).alias("AVG_DEPARTURE_DELAY"),
    round(avg("ARRIVAL_DELAY"), 2).alias("AVG_ARRIVAL_DELAY")
)
avg_df = avg_delay_by_airline.alias("avg")
airlines_df_alias = airlines_df.alias("air")

joined_df = avg_df.join(
    airlines_df_alias,
    avg_df["AIRLINE"] == airlines_df_alias["IATA_CODE"],
    "left"
)

result_df = joined_df.select(
    avg_df["AIRLINE"].alias("AIRLINE_CODE"),
    airlines_df_alias["AIRLINE"].alias("AIRLINE_NAME"),
    avg_df["AVG_DEPARTURE_DELAY"],
    avg_df["AVG_ARRIVAL_DELAY"]
).orderBy("AVG_ARRIVAL_DELAY", ascending=False)


result_df.show(30)


**Graphique retards à l’arrivée**

In [None]:
pdf = result_df.toPandas()

fig = px.bar(
    pdf,
    x='AIRLINE_NAME',
    y='AVG_ARRIVAL_DELAY',
    labels={'AIRLINE_NAME': 'Compagnie aérienne', 'AVG_ARRIVAL_DELAY': 'Retard moyen à l\'arrivée (minutes)'},
    title='Retard moyen à l\'arrivée par compagnie aérienne',
    text=pdf['AVG_ARRIVAL_DELAY'].round(2)
)

fig.update_traces(textposition='outside')
fig.update_layout(xaxis_tickangle=-45, yaxis=dict(title='Retard moyen (min)'), uniformtext_minsize=8)
fig.show()

**Graphique retards au départ**

In [None]:
pdf = result_df.toPandas()

fig = px.bar(
    pdf,
    x='AIRLINE_NAME',
    y='AVG_DEPARTURE_DELAY',
    labels={'AIRLINE_NAME': 'Compagnie aérienne', 'AVG_DEPARTURE_DELAY': 'Retard moyen à l\'arrivée (minutes)'},
    title='Retard moyen au départ par compagnie aérienne',
    text=pdf['AVG_DEPARTURE_DELAY'].round(2)
)

fig.update_traces(textposition='outside')
fig.update_layout(xaxis_tickangle=-45, yaxis=dict(title='Retard moyen (min)'), uniformtext_minsize=8)
fig.show()

**Nombre de vols en retard par compagnie**


In [None]:
delay_count_by_airline = flights_df.filter(col("ARRIVAL_DELAY") > 0) \
    .groupBy("AIRLINE") \
    .agg(sum((col("ARRIVAL_DELAY") > 0).cast("int")).alias("DELAYED_COUNT"))

delay_df = delay_count_by_airline.alias("delay")
airlines_df_alias = airlines_df.alias("air")

joined_delay_df = delay_df.join(
    airlines_df_alias,
    delay_df["AIRLINE"] == airlines_df_alias["IATA_CODE"],
    "left"
)

final_delay_df = joined_delay_df.select(
    delay_df["AIRLINE"].alias("AIRLINE_CODE"),
    airlines_df_alias["AIRLINE"].alias("AIRLINE_NAME"),
    delay_df["DELAYED_COUNT"]
).orderBy("DELAYED_COUNT", ascending=False)


**Barres horizontales retards**

In [None]:
plot_df = final_delay_df.toPandas()

fig = px.bar(
    plot_df,
    x="DELAYED_COUNT",
    y="AIRLINE_NAME",
    orientation="h",
    title="Nombre de vols en retard par compagnie",
    labels={"AIRLINE_NAME": "Compagnie", "DELAYED_COUNT": "Nombre de retards"}
)

fig.show()


**Nombre de vols déviés par compagnie**

In [None]:
from pyspark.sql.functions import col, sum as _sum

diverted_count_df = flights_df.filter(col("DIVERTED") == 1) \
    .groupBy("AIRLINE") \
    .agg(_sum((col("DIVERTED") == 1).cast("int")).alias("DIVERTED_COUNT"))

div_df = diverted_count_df.alias("div")
airlines_df_alias = airlines_df.alias("air")

joined_diverted_df = div_df.join(
    airlines_df_alias,
    div_df["AIRLINE"] == airlines_df_alias["IATA_CODE"],
    "left"
)

final_diverted_df = joined_diverted_df.select(
    div_df["AIRLINE"].alias("AIRLINE_CODE"),
    airlines_df_alias["AIRLINE"].alias("AIRLINE_NAME"),
    div_df["DIVERTED_COUNT"]
).orderBy("DIVERTED_COUNT", ascending=False)


**Barres horizontales vols déroutés**

In [None]:
plot_df = final_diverted_df.toPandas()

fig = px.bar(
    plot_df,
    x="DIVERTED_COUNT",
    y="AIRLINE_NAME",
    orientation="h",
    title="Nombre de vols déroutés par compagnie",
    labels={"AIRLINE_NAME": "Compagnie", "DIVERTED_COUNT": "Nombre de déroutements"}
)

fig.show()


**Retard moyen arrivée par compagnie**

In [None]:
avg_delay_df = flights_df.groupBy("AIRLINE") \
    .agg(avg("ARRIVAL_DELAY").alias("avg_arrival_delay"))

avg_delay_pd = avg_delay_df.toPandas()

fig = px.bar(
    avg_delay_pd.sort_values(by="avg_arrival_delay", ascending=False),
    x="AIRLINE",
    y="avg_arrival_delay",
    title="Retard moyen à l'arrivée par compagnie",
    labels={"avg_arrival_delay": "Retard moyen (min)", "AIRLINE": "Compagnie"}
)
fig.show()

**Retard moyen par jour de semaine**

In [None]:
df_delay_day = flights_df.groupBy("DAY_OF_WEEK") \
    .agg(avg("ARRIVAL_DELAY").alias("avg_arrival_delay")) \
    .toPandas()

fig = px.bar(df_delay_day.sort_values("DAY_OF_WEEK"),
             x="DAY_OF_WEEK", y="avg_arrival_delay",
             title="Retard moyen à l'arrivé par jour de la semaine")
fig.show()

**Retard moyen départ par aéroport**

In [None]:
from pyspark.sql.functions import avg, round

# Moyenne des retards au départ par aéroport
avg_delay_origin = flights_df.groupBy("ORIGIN_AIRPORT").agg(
    round(avg("DEPARTURE_DELAY"), 2).alias("AVG_DEP_DELAY")
)

# Joins
avg_df = avg_delay_origin.alias("avg")
airports_df_alias = airports_df.alias("air")

# Join sur IATA
joined_df = avg_df.join(
    airports_df_alias,
    avg_df["ORIGIN_AIRPORT"] == airports_df_alias["IATA_CODE"],
    "left"
)
final_df = joined_df.select(
    avg_df["ORIGIN_AIRPORT"],
    airports_df_alias["AIRPORT"].alias("AIRPORT_NAME"),
    avg_df["AVG_DEP_DELAY"]
).orderBy("AVG_DEP_DELAY", ascending=False)


**Top 10 aéroports retards départ**

In [None]:
# Top 10
top_10_df = final_df.orderBy(col("AVG_DEP_DELAY").desc()).limit(17)

# Pandas -> Plotly
plot_df = top_10_df.toPandas()

# Graphique
import plotly.express as px

fig = px.bar(
    plot_df,
    x="AIRPORT_NAME",
    y="AVG_DEP_DELAY",
    title="Les 10 aéroports avec le plus de retards au départ",
    labels={"AIRPORT_NAME": "Aéroport", "AVG_DEP_DELAY": "Retard moyen (min)"},
    text="AVG_DEP_DELAY"
)
fig.update_traces(texttemplate='%{text:.1f} min', textposition='outside')
fig.update_layout(xaxis_tickangle=-45)

fig.show()


**Taux annulation par compagnie**

In [None]:
cancel_rate = flights_df.groupBy("AIRLINE").agg(
    ((sum("CANCELLED") / count("*")) * 100).alias("cancel_rate")
)

plot4 = cancel_rate.toPandas()

fig4 = px.bar(
    plot4.sort_values("cancel_rate", ascending=False),
    x="AIRLINE", y="cancel_rate",
    title="Taux d’annulation par compagnie ",
    labels={"AIRLINE": "Compagnie", "cancel_rate": "Taux d’annulation en %"}
)
fig4.show()

**Retard moyen par compagnie et jour**

In [None]:
delay_grouped = flights_df.groupBy("AIRLINE", "DAY_OF_WEEK") \
    .agg(avg("ARRIVAL_DELAY").alias("avg_arrival_delay")) \
    .toPandas()


fig = px.bar(
    delay_grouped,
    x="AIRLINE",
    y="avg_arrival_delay",
    color="DAY_OF_WEEK",
    barmode="group",
    title="Retard moyen à l’arrivée par compagine et par jour de la semaine",
    labels={
        "AIRLINE": "Aéroport d'origine",
        "avg_arrival_delay": "Retard moyen (min)",
        "DAY_OF_WEEK": "Jour de la semaine"
    },
    color_discrete_sequence=px.colors.qualitative.Set2
)

fig.update_layout(xaxis_tickangle=-45)
fig.show()

**Somme retards par cause**

In [None]:
delay_by_cause = (
    flights_df
    .select([sum(c).alias(c) for c in [
        'AIR_SYSTEM_DELAY',
        'SECURITY_DELAY',
        'AIRLINE_DELAY',
        'LATE_AIRCRAFT_DELAY',
        'WEATHER_DELAY'
    ]])
)

delay_by_cause.show()

**Taux vols retardés**

In [None]:
# Récupère le DF
delays = delay_by_cause.collect()[0].asDict()

cause_names = list(delays.keys())
minutes = list(delays.values())
pdf = pd.DataFrame({'cause': cause_names, 'minutes': minutes})

import plotly.express as px
fig = px.bar(pdf, x='cause', y='minutes',
             title="Total de minutes de retard par cause")
fig.show()


**Moyenne retard arrivée par compagnie**

In [None]:
avg_delay_airline = (
    flights_df
    .groupBy('AIRLINE')
    .agg(
        avg('ARRIVAL_DELAY').alias('avg_arr_delay'),
        count('*').alias('n_flights')
    )
    .orderBy(desc('avg_arr_delay'))
)

avg_delay_airline.show(10)


Viz - Barre Horizontal

In [None]:
# ->  pandas
pdf_airline = avg_delay_airline.toPandas()

# Visualisation
fig = px.bar(
    pdf_airline,
    x="avg_arr_delay",
    y="AIRLINE",
    orientation='h',
    color="avg_arr_delay",
    color_continuous_scale="Reds",
    title="Retard moyen à l’arrivée par compagnie",
    labels={"avg_arr_delay": "Retard moyen (min)", "AIRLINE": "Compagnie"},
    text=pdf_airline["avg_arr_delay"].round(1)
)

fig.update_traces(textposition="outside")
fig.update_layout(
    xaxis_title="Retard moyen (min)",
    yaxis_title="Compagnie",
    template="plotly_white"
)
fig.show()

**Retard moyen par jour & mois**

In [None]:
#  moy day/week
avg_delay_dow = (
    flights_df
    .groupBy('DAY_OF_WEEK')
    .agg(avg('ARRIVAL_DELAY').alias('avg_delay'))
    .orderBy('DAY_OF_WEEK')
)

#  moy month
avg_delay_month = (
    flights_df
    .groupBy('MONTH')
    .agg(avg('ARRIVAL_DELAY').alias('avg_delay'))
    .orderBy('MONTH')
)

#to pandas
pdf_dow = avg_delay_dow.toPandas()
pdf_month = avg_delay_month.toPandas()

# Ajout label
day_labels = {1: "Lun", 2: "Mar", 3: "Mer", 4: "Jeu", 5: "Ven", 6: "Sam", 7: "Dim"}
pdf_dow['DAY_LABEL'] = pdf_dow['DAY_OF_WEEK'].astype(int).map(day_labels)

month_labels = {
    1: "Jan", 2: "Fév", 3: "Mar", 4: "Avr", 5: "Mai", 6: "Juin",
    7: "Juil", 8: "Août", 9: "Sept", 10: "Oct", 11: "Nov", 12: "Déc"
}
pdf_month['MONTH_LABEL'] = pdf_month['MONTH'].astype(int).map(month_labels)

# dataViz -delay/month
fig_dow = px.bar(
    pdf_dow,
    x="DAY_LABEL", y="avg_delay",
    color="avg_delay", color_continuous_scale="RdBu_r",
    title="Retard moyen par jour de la semaine",
    labels={"avg_delay": "Retard moyen (min)", "DAY_LABEL": "Jour"},
    text=pdf_dow["avg_delay"].round(1)
)
fig_dow.update_traces(textposition="outside")
fig_dow.update_layout(
    yaxis_title="Retard moyen (min)",
    xaxis_title="Jour",
    uniformtext_minsize=8,
    template="plotly_white"
)
fig_dow.show()

#dataViz - delay/month
fig_month = px.bar(
    pdf_month,
    x="MONTH_LABEL", y="avg_delay",
    color="avg_delay", color_continuous_scale="Blues",
    title="Retard moyen par mois",
    labels={"avg_delay": "Retard moyen (min)", "MONTH_LABEL": "Mois"},
    text=pdf_month["avg_delay"].round(1)
)
fig_month.update_traces(textposition="outside")
fig_month.update_layout(
    yaxis_title="Retard moyen (min)",
    xaxis_title="Mois",
    uniformtext_minsize=8,
    template="plotly_white"
)
fig_month.show()

# **Corrélation** des variables de vol

In [None]:
# 1. Sélection des colonnes numériques pertinentes
numeric_cols = [
    "DEPARTURE_DELAY", "ARRIVAL_DELAY", "AIR_TIME", "DISTANCE",
    "TAXI_OUT", "TAXI_IN", "ELAPSED_TIME"
]

# 2. Conversion vers Pandas (avec limitation si nécessaire pour performance)
flights_sample = flights_df.select(numeric_cols).dropna().limit(10000).toPandas()

# 3. Matrice de corrélation
correlation_matrix = flights_sample.corr(numeric_only=True)

# 4. Affichage
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix, annot=True, cmap="coolwarm", fmt=".2f")
plt.title("Matrice de corrélation des variables de vol")
plt.show()

# Training

In [None]:
! pip install tensorflow

In [None]:
numeric_feature_names = ['SCHEDULED_DEPARTURE', 'DEPARTURE_TIME', 'DISTANCE', 'SCHEDULED_ARRIVAL', 'ARRIVAL_TIME','AIRLINE', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT']

In [None]:
df_cleaned = flights_df.select(numeric_feature_names).dropna()

MAX_SIZE = 10000

df_limited = df_cleaned.limit(MAX_SIZE)
X = df_limited.toPandas()
y = flights_df.select(['DIVERTED', 'CANCELLED', 'ARRIVAL_DELAY']).dropna().limit(MAX_SIZE).toPandas()

In [None]:
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


In [None]:
for col in ['AIRLINE', 'ORIGIN_AIRPORT', 'DESTINATION_AIRPORT']:
    unique_values = X[col].unique()
    mapping = {val: idx for idx, val in enumerate(unique_values)}
    X_train[col] = X_train[col].map(mapping).fillna(-1).astype(int)
    X_test[col] = X_test[col].map(mapping).fillna(-1).astype(int)

y_train['ARRIVAL_DELAY'] = y_train['ARRIVAL_DELAY'].apply(lambda x: 1 if int(x) > 0 else 0)
y_test['ARRIVAL_DELAY'] = y_test['ARRIVAL_DELAY'].apply(lambda x: 1 if int(x) > 0 else 0)

y_train = y_train.to_numpy()
y_test = y_test.to_numpy()

X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

In [None]:
model = tf.keras.Sequential([
    tf.keras.layers.InputLayer(input_shape=(X_train.shape[1],)),
    tf.keras.layers.Dense(64, activation='relu'),
    tf.keras.layers.Dense(32, activation='relu'),
    tf.keras.layers.Dense(3, activation='sigmoid')
])

In [None]:
model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

In [None]:
model.fit(X_train, y_train, validation_data=(X_test, y_test), epochs=20, batch_size=32)


In [None]:
model.save('model.h5')
from google.colab import files
files.download('model.h5')