In [1]:
import io
import os
import subprocess
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
import psycopg2
from psycopg2 import sql
from multiprocessing import Pool, cpu_count

In [2]:
DEBUG = os.environ.get("DEBUG", "True") == "True"
SCRIPT_ROOT = "/root/scripts"

In [3]:
# Unzip input files
subprocess.run([SCRIPT_ROOT + "/unzip.sh"])

Archive:  /root/data/input/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2020_1.zip
  inflating: /tmp/tmp.r1H9gk3vGF/On_Time_Marketing_Carrier_On_Time_Performance_(Beginning_January_2018)_2020_1.csv  
  inflating: /tmp/tmp.r1H9gk3vGF/readme.html  
Archive:  /root/data/input/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2021_1.zip
  inflating: /tmp/tmp.P6yLjDYFbA/On_Time_Marketing_Carrier_On_Time_Performance_(Beginning_January_2018)_2021_1.csv  
  inflating: /tmp/tmp.P6yLjDYFbA/readme.html  
Archive:  /root/data/input/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2022_1.zip
  inflating: /tmp/tmp.HWCZiITzXu/On_Time_Marketing_Carrier_On_Time_Performance_(Beginning_January_2018)_2022_1.csv  
  inflating: /tmp/tmp.HWCZiITzXu/readme.html  
Archive:  /root/data/input/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1.zip
  inflating: /tmp/tmp.P0kovWfO9J/On_Time_Marketing_Carrier_On_Time_Performance_(

CompletedProcess(args=['/root/scripts/unzip.sh'], returncode=0)

In [4]:
# Upload CSV files to HDFS
subprocess.run([SCRIPT_ROOT + "/upload.sh"])

Tous les fichiers CSV ont été traités avec succès


CompletedProcess(args=['/root/scripts/upload.sh'], returncode=0)

In [5]:
class DBConfig:
    HOST = os.environ.get("DB_HOST", "localhost")
    PORT = os.environ.get("DB_PORT", "5432")
    DATABASE = os.environ.get("DB_NAME", "postgres")
    USER = os.environ.get("DB_USER", "postgres")
    PASSWORD = os.environ.get("DB_PASSWORD", "password")

    def get_conn(self):
        """Create a connection to the PostgreSQL database."""
        return psycopg2.connect(
            host=self.HOST,
            port=self.PORT,
            dbname=self.DATABASE,
            user=self.USER,
            password=self.PASSWORD,
        )

In [6]:
class File:
    def __init__(self, path: str, size: int, owner: str, date: str):
        self.path = path
        self.extension = path.split(".")[-1]
        self.size = size
        self.name = path.split("/")[-1].split(".")[0]
        self.owner = owner
        self.datetime = datetime.strptime(date, "%Y-%m-%d %H:%M")

    def __repr__(self):
        return f"{self.name} ({self.size} bytes) by {self.owner} on {self.datetime}"

    def __str__(self):
        return self.__repr__()

In [7]:
spark = SparkSession.builder.appName("FlightDataAnalysis").master("spark://hadoop-namenode:7077").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [8]:
def create_file_from_stdout(line: str) -> File:
    parts = line.split()
    path = parts[-1]
    size = int(parts[4])
    owner = parts[2]
    date = f"{parts[5]} {parts[6]}"
    return File(path, size, owner, date)

In [9]:
def list_hdfs_files(filepath: str, extensions: list[str] = ["csv"]) -> list[File]:
    try:
        lines = (
            subprocess.check_output(["hdfs", "dfs", "-ls", filepath], text=True)
            .strip()
            .split("\n")
        )
        files = [
            create_file_from_stdout(line) for line in lines if line.startswith("-")
        ]
        return [file for file in files if file.extension in extensions]
    except subprocess.CalledProcessError as e:
        print(f"Error listing files in {filepath}: {e}\n") if DEBUG else None
        return []

In [10]:
def moove_hdfs_files(files: list[File], destination: str) -> None:
    for file in files:
        try:
            subprocess.run(["hdfs", "dfs", "-mv", file.path, destination], check=True)
            print(f"Moved {file} to {destination}\n") if DEBUG else None
        except subprocess.CalledProcessError as e:
            print(f"Error moving {file} to {destination}: {e}\n") if DEBUG else None

In [11]:
def copy_to_postgres(table_name: str, file: File) -> None:
    conn = DBConfig().get_conn()
    cursor = conn.cursor()

    try:
        hdfs_file_content = subprocess.check_output(
            ["hdfs", "dfs", "-cat", file.path], text=True
        )
        file_like_object = io.StringIO(hdfs_file_content)
        cursor.copy_expert(
            sql.SQL("COPY {} FROM STDIN WITH CSV HEADER").format(
                sql.Identifier(table_name)
            ),
            file_like_object,
        )
        conn.commit()  # Valider les changements
    except Exception as e:
        print(f"Error copying {file} to {table_name}: {e}\n") if DEBUG else None
    finally:
        cursor.close()
        conn.close()

In [12]:
def create_temp_chunks_dir():
    output_dir = f"/tmp/spark_output/{datetime.now().strftime('%Y%m%d%H%M%S')}"
    subprocess.run(["hdfs", "dfs", "-mkdir", "-p", output_dir], check=True)
    print(f"Created HDFS directory: {output_dir}\n") if DEBUG else None
    return output_dir

In [13]:
def create_temp_chunks_files(df, output_dir: str, partition_factor=2) -> list[File]:
    num_partitions = cpu_count() * partition_factor  # Dépend du nombre de CPU
    (
        print(
            f"Writing DataFrame in {num_partitions} partitions to HDFS at {output_dir}"
        )
        if DEBUG
        else None
    )
    df.repartition(num_partitions).write.option("maxRecordsPerFile", 100000).mode(
        "overwrite"
    ).csv(output_dir)

    return list_hdfs_files(output_dir)

In [14]:
def clean_temp_chunks_files(output_dir: str) -> None:
    try:
        subprocess.run(["hdfs", "dfs", "-rm", "-r", output_dir], check=True)
        print(f"Cleaned HDFS directory: {output_dir}") if DEBUG else None
    except subprocess.CalledProcessError as e:
        print(f"Error cleaning HDFS directory {output_dir}: {e}") if DEBUG else None

In [15]:
create_on_time_performance_table = """
    CREATE TABLE IF NOT EXISTS on_time_performance (
        FlightDate DATE,
        Year INT,
        Month INT,
        DayofMonth INT,
        DayOfWeek INT,
        OriginAirportID INT,
        DestAirportID INT,
        Operating_Airline CHAR(2),
        CRSDepTime INT,
        DepTime INT,
        CRSArrTime INT,
        ArrTime INT,
        WheelsOff INT,
        WheelsOn INT,
        Cancelled BOOLEAN,
        CRSElapsedTime INT,
        ActualElapsedTime INT,
        AirTime INT,
        Flights INT,
        Distance INT,
        CarrierDelay INT,
        WeatherDelay INT,
        NASDelay INT,
        SecurityDelay INT,
        LateAircraftDelay INT
    );
    """

In [16]:
def ensure_on_time_performance_table_exists():
    conn = DBConfig().get_conn()
    cursor = conn.cursor()

    try:
        cursor.execute(create_on_time_performance_table)
        conn.commit()
        print("Table on_time_performance created successfully") if DEBUG else None
    except Exception as e:
        print(f"Error creating table on_time_performance: {e}") if DEBUG else None
    finally:
        cursor.close()
        conn.close()

In [17]:
def clean_and_transform_on_time_performance_data(df):
    """
    Transforme les données avant l'insertion dans PostgreSQL.
    """

    # Supprimer les espaces blancs dans les noms de colonnes
    df = df.select([col(c).alias(c.strip()) for c in df.columns])

    # Garder uniquement les colonnes nécessaires
    df = df.select(
        "FlightDate",
        "Year",
        "Month",
        "DayofMonth",
        "DayOfWeek",
        "OriginAirportID",
        "DestAirportID",
        "Operating_Airline",
        "CRSDepTime",
        "DepTime",
        "CRSArrTime",
        "ArrTime",
        "WheelsOff",
        "WheelsOn",
        "Cancelled",
        "CRSElapsedTime",
        "ActualElapsedTime",
        "AirTime",
        "Flights",
        "Distance",
        "CarrierDelay",
        "WeatherDelay",
        "NASDelay",
        "SecurityDelay",
        "LateAircraftDelay",
    )

    # Supprimer les doublons
    df = df.dropDuplicates()

    int_columns = ["Year", "Month", "DayofMonth", "DayOfWeek", "OriginAirportID", "DestAirportID", 
                   "Operating_Airline", "CRSElapsedTime", "ActualElapsedTime", "AirTime", "Flights", 
                   "Distance", "CarrierDelay", "WeatherDelay", "NASDelay", "SecurityDelay", "LateAircraftDelay",
                   "CRSDepTime", "DepTime", "CRSArrTime", "ArrTime", "WheelsOff", "WheelsOn"]
    
    for col_name in int_columns:
        df = df.withColumn(col_name, col(col_name).cast("int"))
    
    # Convertir Cancelled en BOOLEAN
    df = df.withColumn("Cancelled", when(col("Cancelled") == 1, True).otherwise(False))
    
    # Gérer les vols annulés (remplacer les colonnes liées au temps par NULL si annulé)
    df = df.withColumn("DepTime", when(col("Cancelled"), None).otherwise(col("DepTime")))
    df = df.withColumn("ArrTime", when(col("Cancelled"), None).otherwise(col("ArrTime")))
    df = df.withColumn("ActualElapsedTime", when(col("Cancelled"), None).otherwise(col("ActualElapsedTime")))
    df = df.withColumn("AirTime", when(col("Cancelled"), None).otherwise(col("AirTime")))
    df = df.withColumn("CarrierDelay", when(col("Cancelled"), None).otherwise(col("CarrierDelay")))
    df = df.withColumn("WeatherDelay", when(col("Cancelled"), None).otherwise(col("WeatherDelay")))
    df = df.withColumn("NASDelay", when(col("Cancelled"), None).otherwise(col("NASDelay")))
    df = df.withColumn("SecurityDelay", when(col("Cancelled"), None).otherwise(col("SecurityDelay")))
    df = df.withColumn("LateAircraftDelay", when(col("Cancelled"), None).otherwise(col("LateAircraftDelay")))

    return df

In [18]:
def process_on_time_performance_csv(file: File):
    print(f"Processing {file.path}...\n") if DEBUG else None
    try:
        df = spark.read.csv(file.path, header=True, inferSchema=True)
        df = clean_and_transform_on_time_performance_data(df)
        tmp_dir = create_temp_chunks_dir()
        chunks = create_temp_chunks_files(df, tmp_dir)
        with Pool(cpu_count()) as pool:
            pool.starmap(
                copy_to_postgres,
                [("on_time_performance", chunk) for chunk in chunks],
            )

        clean_temp_chunks_files(tmp_dir)
        print(f"{len(chunks)} chunks processed from {file.path}") if DEBUG else None
    except Exception as e:
        print(f"Error processing {file.path}: {e}\n") if DEBUG else None

In [22]:
def process_data(path: str, fn) -> None:
    files = list_hdfs_files(path)
    print(f"{len(files)} files found in {path}\n") if DEBUG else None

    for file in files:
        fn(file)

    moove_hdfs_files(files, "/completed/")

In [20]:
ensure_on_time_performance_table_exists()

Table on_time_performance created successfully


In [21]:
process_data(
        path="/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018*",
        fn=process_on_time_performance_csv,
    )

6 files found in /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018*

Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2019_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020205514

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020205514


24/10/20 20:55:17 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

Deleted /tmp/spark_output/20241020205514
Cleaned HDFS directory: /tmp/spark_output/20241020205514
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2019_1.csv
Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2020_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020205738

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020205738


                                                                                

Deleted /tmp/spark_output/20241020205738
Cleaned HDFS directory: /tmp/spark_output/20241020205738
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2020_1.csv
Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2021_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020205948

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020205948


                                                                                

Deleted /tmp/spark_output/20241020205948
Cleaned HDFS directory: /tmp/spark_output/20241020205948
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2021_1.csv
Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2022_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020210200

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020210200


                                                                                

Deleted /tmp/spark_output/20241020210200
Cleaned HDFS directory: /tmp/spark_output/20241020210200
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2022_1.csv
Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020210415

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020210415


                                                                                

Deleted /tmp/spark_output/20241020210415
Cleaned HDFS directory: /tmp/spark_output/20241020210415
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1.csv
Processing /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2024_1.csv...



                                                                                

Created HDFS directory: /tmp/spark_output/20241020210630

Writing DataFrame in 32 partitions to HDFS at /tmp/spark_output/20241020210630


                                                                                

Deleted /tmp/spark_output/20241020210630
Cleaned HDFS directory: /tmp/spark_output/20241020210630
32 chunks processed from /staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2024_1.csv
Moved On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2019_1 (316759084 bytes) by root on 2024-10-20 19:03:00 to /completed



mv: `/completed': File exists


Error moving On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2020_1 (327859854 bytes) by root on 2024-10-20 20:54:00 to /completed: Command '['hdfs', 'dfs', '-mv', '/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2020_1.csv', '/completed']' returned non-zero exit status 1.



mv: `/completed': File exists


Error moving On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2021_1 (188159563 bytes) by root on 2024-10-20 20:54:00 to /completed: Command '['hdfs', 'dfs', '-mv', '/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2021_1.csv', '/completed']' returned non-zero exit status 1.



mv: `/completed': File exists


Error moving On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2022_1 (278168409 bytes) by root on 2024-10-20 20:54:00 to /completed: Command '['hdfs', 'dfs', '-mv', '/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2022_1.csv', '/completed']' returned non-zero exit status 1.



mv: `/completed': File exists


Error moving On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1 (284746166 bytes) by root on 2024-10-20 20:54:00 to /completed: Command '['hdfs', 'dfs', '-mv', '/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2023_1.csv', '/completed']' returned non-zero exit status 1.



mv: `/completed': File exists


Error moving On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2024_1 (288685356 bytes) by root on 2024-10-20 20:54:00 to /completed: Command '['hdfs', 'dfs', '-mv', '/staging/On_Time_Marketing_Carrier_On_Time_Performance_Beginning_January_2018_2024_1.csv', '/completed']' returned non-zero exit status 1.

