In [1]:
import os
import sys
from pyspark.sql.functions import date_format , split , col, unix_timestamp, from_unixtime, expr  ,when
from pyspark.sql import SparkSession
from hdfs import InsecureClient
import pandas as pd 


In [2]:
def get_hdfs_csv_files(client, folder_path):
    csv_files = {}
    # Listing files in the folder
    file_status = client.list(folder_path)
    # Iterating through files
    for file_or_folder_name in file_status:
        file_or_folder_full_path = folder_path + '/' + file_or_folder_name
        if client.status(file_or_folder_full_path)['type'] == 'DIRECTORY':
            # If it's a directory, recursively call the function
            csv_files.update(get_hdfs_csv_files(client, file_or_folder_full_path))
        elif file_or_folder_name.endswith('.csv'):
            # If it's a CSV file, add it to the dictionary along with its name
            csv_files[file_or_folder_name] = file_or_folder_full_path
    return csv_files


def add_filename_column(df, filename):
    df['Filename'] = filename
    return df

def process_csv_files(csv_files_dict, hdfs_client):
    dfs_with_filenames = []
    for filename, filepath in csv_files_dict.items():
        with hdfs_client.read(filepath) as reader:
            try:
                df = pd.read_csv(reader)
                if not df.empty:
                    df = add_filename_column(df, filename)
                    dfs_with_filenames.append(df) 
                else:
                    print(f"Empty DataFrame found in file: {filename}")
            except pd.errors.EmptyDataError:
                print(f"Empty file: {filename}")
            except pd.errors.ParserError as e:
                print(f"Error parsing file {filename}: {e}")
    concatenated_df = pd.concat(dfs_with_filenames, ignore_index=True)
    return concatenated_df


ARRIVAL DATA

In [3]:
# Example usage:
hdfs_url = 'http://localhost:50070'
folder_path = '/user/PFE_data/arrival_flights'  # Adjust this path accordingly

# Create an HDFS client
client = InsecureClient(hdfs_url)

# Get all CSV files from the specified folder and its subfolders
csv_files_dict = get_hdfs_csv_files(client, folder_path)
print()
# Process CSV files: open each CSV file, add a filename column, and store the DataFrame in a dictionary
dfs_arrival = process_csv_files(csv_files_dict, client)

dfs_arrival


Empty file: Cameroon2024-03-14.csv
Error parsing file Guinea2024-03-08.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 14

Empty file: Guyana2024-03-14.csv
Empty file: Malaysia2024-03-14.csv
Error parsing file Norway2024-03-07.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 14

Empty DataFrame found in file: Uruguay2024-03-13.csv
Error parsing file Wallis-And-Futuna2024-03-07.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 14



Unnamed: 0,Date,Flight Time,Aircraft,Origin Airport,flight,Aircraft Model,Flight Status,time_arrival,Destination Aeroport,Filename
0,"Sunday, Mar 31",11:00,Kam Air,"Kabul International Airport, Afghanistan",RQ101,B733,Unknown,Unknown,Herat International Airport,Afghanistan2024-04-01.csv
1,"Sunday, Mar 31",13:10,Ariana Afghan Airlines,"Kabul International Airport, Afghanistan",FG251,B734,Unknown,Unknown,Herat International Airport,Afghanistan2024-04-01.csv
2,"Monday, Apr 01",11:00,Kam Air,"Kabul International Airport, Afghanistan",RQ101,B733,Unknown,Unknown,Herat International Airport,Afghanistan2024-04-01.csv
3,"Tuesday, Apr 02",11:00,Kam Air,"Kabul International Airport, Afghanistan",RQ101,737,Scheduled,Scheduled,Herat International Airport,Afghanistan2024-04-01.csv
4,"Tuesday, Apr 02",13:10,Ariana Afghan Airlines,"Kabul International Airport, Afghanistan",FG251,737,Scheduled,Scheduled,Herat International Airport,Afghanistan2024-04-01.csv
...,...,...,...,...,...,...,...,...,...,...
99456,"Saturday, Mar 09",19:20,Air France,"Paris Orly Airport, France",AF7566,320,Scheduled,Scheduled,,example.csv
99457,"Saturday, Mar 09",21:10,Air Corsica,"Nice Cote d'Azur Airport, France",XK105,AT7,Scheduled,Scheduled,,example.csv
99458,"Saturday, Mar 09",21:30,Air Corsica,"Marseille Provence Airport, France",XK157,32N,Scheduled,Scheduled,,example.csv
99459,"Sunday, Mar 10",00:06,AlbaStar,"Aqaba King Hussein International Airport, Jordan",LAV1876,B738,Scheduled,Scheduled,,example.csv


In [4]:
# Example usage:
hdfs_url = 'http://localhost:50070'
folder_path = '/user/PFE_data/departure_flights'  # Adjust this path accordingly

# Create an HDFS client
client = InsecureClient(hdfs_url)

# Get all CSV files from the specified folder and its subfolders
csv_files_dict = get_hdfs_csv_files(client, folder_path)
print()
# Process CSV files: open each CSV file, add a filename column, and store the DataFrame in a dictionary
dfs_departure = process_csv_files(csv_files_dict, client)

dfs_departure


Error parsing file Cayman-Islands2024-03-07.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 20

Error parsing file Congo2024-03-07.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 20

Error parsing file Dominican-Republic2024-03-07.csv: Error tokenizing data. C error: Expected 11 fields in line 4, saw 20

Empty DataFrame found in file: Grenada2024-03-19.csv
Empty DataFrame found in file: Guinea2024-03-25.csv
Empty DataFrame found in file: Montenegro2024-03-12.csv
Empty file: Solomon-Islands2024-03-14.csv
Empty file: Tajikistan2024-03-14.csv
Empty file: Timor-leste-east-Timor2024-03-14.csv
Empty file: Trinidad-And-Tobago2024-03-14.csv


Unnamed: 0,Date,Flight Time,Airline,Destination Aeroport,flight,Aircraft Model,Flight Status,time_arrival,Original Aeroport,Filename,Aircraft
0,"Sunday, Mar 10",23:05,Wizz Air,"London Luton Airport, United Kingdom",W94472,A21N,Departed,03:14,Tirana International Airport,Albania2024-03-12.csv,
1,"Monday, Mar 11",01:55,Pegasus,"Istanbul Sabiha Gokcen International Airport, ...",PC282,A21N,Departed,01:57,Tirana International Airport,Albania2024-03-12.csv,
2,"Monday, Mar 11",04:00,AirSERBIA,"Belgrade Nikola Tesla Airport, Serbia",JU217,AT76,Departed,04:01,Tirana International Airport,Albania2024-03-12.csv,
3,"Monday, Mar 11",04:55,Austrian Airlines,"Vienna International Airport, Austria",OS850,A320,Departed,04:55,Tirana International Airport,Albania2024-03-12.csv,
4,"Monday, Mar 11",06:00,Wizz Air,"Dortmund Airport, Germany",W43841,A320,Departed,06:02,Tirana International Airport,Albania2024-03-12.csv,
...,...,...,...,...,...,...,...,...,...,...,...
72544,"Saturday, Mar 09",13:10,Air Tanzania,"Lubumbashi International Airport, Democratic R...",TC213,223,Scheduled,Scheduled,Ndola Simon Mwansa Kapwepwe International Airport,Zambia2024-03-08.csv,
72545,"Saturday, Mar 09",13:15,Airlink,"Johannesburg OR Tambo International Airport, S...",4Z151,E135,Scheduled,Scheduled,Ndola Simon Mwansa Kapwepwe International Airport,Zambia2024-03-08.csv,
72546,"Saturday, Mar 09",13:45,Proflight Zambia,"Lusaka Kenneth Kaunda International Airport, Z...",P0335,J41,Scheduled,Scheduled,Ndola Simon Mwansa Kapwepwe International Airport,Zambia2024-03-08.csv,
72547,"Saturday, Mar 09",18:15,Proflight Zambia,"Lusaka Kenneth Kaunda International Airport, Z...",P0309,J41,Scheduled,Scheduled,Ndola Simon Mwansa Kapwepwe International Airport,Zambia2024-03-08.csv,


In [117]:
dfs_departure["Flight Status"].value_counts()

Flight Status
Departed          28808
Scheduled         25535
Estimated dep.    15589
Unknown            1929
Canceled            688
Name: count, dtype: int64

In [43]:
dfs_arrival= dfs_arrival.rename(columns={"Aircraft" : "Airline"})
dfs_arrival.columns

Index(['Date', 'Flight Time', 'Airline', 'Origin Airport', 'flight',
       'Aircraft Model', 'Flight Status', 'time_arrival',
       'Destination Aeroport', 'Filename'],
      dtype='object')

In [74]:
dfs_departure["time_arrival"]

0            03:14
1            01:57
2            04:01
3            04:55
4            06:02
           ...    
70960    Scheduled
70961    Scheduled
70962    Scheduled
70963    Scheduled
70964    Scheduled
Name: time_arrival, Length: 70965, dtype: object

In [73]:
dfs_arrival["time_arrival"]

0          Unknown
1          Unknown
2          Unknown
3        Scheduled
4        Scheduled
           ...    
99456    Scheduled
99457    Scheduled
99458    Scheduled
99459    Scheduled
99460    Scheduled
Name: time_arrival, Length: 99461, dtype: object

In [5]:
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Create a SparkSession
spark = SparkSession.builder \
    .appName("Read from Hadoop") \
    .getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# Set configurations 

arrival_data = spark.createDataFrame(dfs_arrival)


  if is_categorical_dtype(series.dtype):


In [17]:
from pyspark.sql.functions import when, create_map, lit, split, trim, regexp_extract, substring, concat, to_date, col
from itertools import chain  # Import chain function from itertools module
from pyspark.sql.types import TimestampType  # Add this import statement


def process_arrival_data(spark_df):
    # Drop rows with null values
    df = spark_df.na.drop()

    # Split the "date" column into "day", "month", and "month_number" columns
    df = df.withColumn("day", split(df["date"], ",")[0])
    df = df.withColumn("month", trim(split(df["date"], ",")[1]))
    df = df.withColumn("day_number", trim(split(df["date"], " ")[2]))
    df = df.withColumn('Origin Country', split(df['Origin Airport'], ',')[1])

    # Define a mapping dictionary for month names
    month_mapping = {
        "Jan": "January", "Feb": "February", "Mar": "March", "Apr": "April", 
        "May": "May", "Jun": "June", "Jul": "July", "Aug": "August", 
        "Sep": "September", "Oct": "October", "Nov": "November", "Dec": "December"
    }

    # Convert the dictionary to a map
    mapping_expr = create_map([lit(x) for x in chain(*month_mapping.items())])

    # Extract the first character of the "month" column
    first_word_month = split(col("month"), " ")[0]

    # Apply the mapping to the first character
    df = df.withColumn("complete_month", mapping_expr.getItem(first_word_month))

    # Drop the original "month" column
    df = df.drop("month")
    df = df.drop("Date")


    # Remove duplicate rows
    df = df.dropDuplicates()

    # Rename the "Aircraft" column to "Airline"
    df = df.withColumnRenamed("Aircraft", "Airline")
    df = df.withColumnRenamed("Destination Aeroport", "Destination Airport")


    # Add a new column "Airport country" based on filename content
    df = df.withColumn("Airport country",
                       when(df["Filename"].contains("_"),
                            split(col("Filename"), "_")[0])
                       .otherwise(regexp_extract(col("Filename"), "^(.*?)(?=[0-9])", 1)))
    df = df.drop("Filename")
    
    # Add a column "year" with constant value 2024
    df = df.withColumn("year", lit(2024))
    df = df.withColumn("Flight Time2", unix_timestamp(col("Flight Time"), "HH:mm").cast(TimestampType()))

    df = df.withColumn("time_arrival2", 
                       when(col("time_arrival").rlike("\\d"), 
                            unix_timestamp(col("time_arrival"), "HH:mm").cast(TimestampType()))
                       .otherwise(None))
    df = df.withColumn("delay", 
                               when(col("time_arrival2").isNotNull(), 
                                    expr("time_arrival2 - `Flight Time2`"))
                               .when(col("time_arrival") == "Scheduled", "Scheduled")
                               .when(col("time_arrival") == "Unknown", "Unknown")
                               .when(col("time_arrival") == "Canceled", "Canceled")
                               .otherwise("No"))
    df = df.withColumn("delay",
                               expr("CASE WHEN delay LIKE 'INTERVAL%' THEN regexp_extract(delay, '([0-9]+:[0-9]+:[0-9]+)', 1) ELSE delay END"))
    df =  df.drop("time_arrival2")
    df =  df.drop("Flight Time2")
    # Concatenate month, day, and year columns to create a date string

    df = df.filter(df["Flight Status"] == "Landed")
    df = df.withColumn("Flight Time", expr("CAST(`Flight Time` AS TIMESTAMP)")) \
       .withColumn("time_arrival", expr("CAST(`time_arrival` AS TIMESTAMP)"))

    df = df.withColumn("delay_minutes", (df["time_arrival"].cast("long") - df["Flight Time"].cast("long")) / 60)
    df = df.withColumn("delay_minutes", 
                   when(col("delay_minutes") < -1080, 1440 + col("delay_minutes"))
                   .when(col("delay_minutes") > 1080, col("delay_minutes") - 1440)
                   .otherwise(col("delay_minutes")))

    df= df.drop(*["delay","Flight Status" , "time_arrival" , ])

    return df



In [18]:
# Apply the function to your DataFrame
arrival_data_processed = process_arrival_data(arrival_data)

arrival_data_processed.toPandas()

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Unnamed: 0,Flight Time,Airline,Origin Airport,flight,Aircraft Model,Destination Airport,day,day_number,Origin Country,complete_month,Airport country,year,delay_minutes
0,2024-04-02 15:10:00,St Barth Commuter,"St. Jean Gustaf III Airport, Guadeloupe",PV49,C208,Antigua V. C. Bird International Airport,Monday,18,Guadeloupe,March,Antigua-And-Barbuda,2024,7.0
1,2024-04-02 09:30:00,Biman Bangladesh Airlines,"Abu Dhabi Zayed International Airport, United ...",BG128,B738,Chittagong Shah Amanat International Airport,Sunday,17,United Arab Emirates,March,Bangladesh,2024,5.0
2,2024-04-02 10:40:00,Belavia (World of Tanks Livery),"Moscow Vnukovo International Airport, Russia",B2982,E195,Minsk National Airport,Tuesday,19,Russia,March,Belarus,2024,-12.0
3,2024-04-02 17:25:00,Boliviana de Aviacion,"La Paz El Alto International Airport, Bolivia",OB617,B738,Cochabamba Jorge Wilstermann International Air...,Tuesday,12,Bolivia,March,Bolivia,2024,-3.0
4,2024-04-02 07:00:00,Boliviana de Aviacion,"Santa Cruz Viru Viru International Airport, Bo...",OB961,B738,La Paz El Alto International Airport,Sunday,24,Bolivia,March,Bolivia,2024,-11.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
24725,2024-04-02 21:00:00,EVA Air,"Penghu Airport, Taiwan",B78626,A321,Taipei Songshan Airport,Monday,25,Taiwan,March,Taiwan,2024,-16.0
24726,2024-04-02 21:15:00,EVA Air,"Kinmen Airport, Taiwan",B78836,A321,Taipei Songshan Airport,Monday,25,Taiwan,March,Taiwan,2024,-24.0
24727,2024-04-02 19:15:00,Uganda Airlines,"Bujumbura International Airport, Burundi",UR361,CRJ9,Entebbe International Airport,Monday,18,Burundi,March,Uganda,2024,-19.0
24728,2024-04-02 12:25:00,Air India Express (Ellora-Ajanta Caves Livery),"Kozhikode Calicut International Airport, India",IX331,B738,Ras Al Khaimah International Airport,Thursday,29,India,February,United-Arab-Emirates,2024,0.0


DataFrame[Flight Time: timestamp, Airline: string, Origin Airport: string, flight: string, Aircraft Model: string, Flight Status: string, time_arrival: timestamp, Destination Airport: string, day: string, day_number: string, Origin Country: string, complete_month: string, Airport country: string, year: int, delay_minutes: double]

In [15]:
arrival_data_processed.toPandas()["delay_minutes"].idxmax()

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


23562

In [16]:
arrival_data_processed.toPandas().loc[23562]

  if not is_datetime64tz_dtype(pser.dtype):
  if is_datetime64tz_dtype(s.dtype):


Flight Time                                   2024-04-02 07:00:00
Airline                                         Emirates SkyCargo
Origin Airport         Hong Kong International Airport, Hong Kong
flight                                                     EK9343
Aircraft Model                                               B77L
time_arrival                                  2024-04-02 22:41:00
Destination Airport     Dubai World Central International Airport
day                                                        Friday
day_number                                                     01
Origin Country                                          Hong Kong
complete_month                                              March
Airport country                              United-Arab-Emirates
year                                                         2024
delay_minutes                                               941.0
Name: 23562, dtype: object