In [2]:
#Import pandas library
import pandas as pd
import numpy as np

#Read the data file for the MapReduce job

df = pd.read_csv('AComp_Passenger_data_no_error.csv')
df

Unnamed: 0,UES9151GS5,SQU6245R,DEN,FRA,1420564460,1049
0,UES9151GS5,XXQ4064B,JFK,FRA,1420563917,802
1,EZC9678QI6,SOH3431A,ORD,MIA,1420563649,250
2,ONL0812DH1,SOH3431A,ORD,MIA,1420563649,250
3,CYJ0225CH1,PME8178S,DEN,PEK,1420564409,1322
4,POP2875LH3,MBA8071P,KUL,PEK,1420563856,572
...,...,...,...,...,...,...
494,BWI0520BG6,BER7172M,KUL,LAS,1420565167,1848
495,LLZ3798PE3,EWH6301Y,CAN,DFW,1420564967,1683
496,KKP5277HZ7,KJR6646J,IAH,BKK,1420565203,1928
497,JJM4724RF7,XXQ4064B,JFK,FRA,1420563917,802


In [3]:
df.shape

(499, 6)

In [4]:
# Define column names
column_names = ['Passenger id', 'Flight id', 'From airport IATA/FAA code', 'Destination airport IATA/FAA code', 
                'Departure time (GMT)', 'Total flight time (mins)']

# Read the CSV file without a header and assign column names
df = pd.read_csv('AComp_Passenger_data_no_error.csv', header=None, names=column_names)

In [5]:
df

Unnamed: 0,Passenger id,Flight id,From airport IATA/FAA code,Destination airport IATA/FAA code,Departure time (GMT),Total flight time (mins)
0,UES9151GS5,SQU6245R,DEN,FRA,1420564460,1049
1,UES9151GS5,XXQ4064B,JFK,FRA,1420563917,802
2,EZC9678QI6,SOH3431A,ORD,MIA,1420563649,250
3,ONL0812DH1,SOH3431A,ORD,MIA,1420563649,250
4,CYJ0225CH1,PME8178S,DEN,PEK,1420564409,1322
...,...,...,...,...,...,...
495,BWI0520BG6,BER7172M,KUL,LAS,1420565167,1848
496,LLZ3798PE3,EWH6301Y,CAN,DFW,1420564967,1683
497,KKP5277HZ7,KJR6646J,IAH,BKK,1420565203,1928
498,JJM4724RF7,XXQ4064B,JFK,FRA,1420563917,802


In [6]:
#Checking for duplicates in the dataset

duplicates = df[df.duplicated()]

# Count for duplicates in the dataset
num_duplicates = duplicates.shape[0]

print(f"Number of duplicates: {num_duplicates}")

Number of duplicates: 111


In [7]:
# Removing duplicates
df_nd = df.drop_duplicates()

In [8]:
df_nd.shape

(389, 6)

In [9]:
from concurrent.futures import ThreadPoolExecutor

# Mapper function
def mapper(rows):
    return [map_row(row) for row in rows]

def map_row(row):
    passenger_id = row['Passenger id']
    return (passenger_id, 1)

# Shuffle function
def shuffle(mapper_output):
    shuffled_data = {}
    for chunk in mapper_output:
        for k, v in chunk:
            if k not in shuffled_data:
                shuffled_data[k] = [v]
            else:
                shuffled_data[k].append(v)
    return shuffled_data

# Reducer function
def reducer(kv_pair):
    k, v = kv_pair
    return (k, sum(v))

def split_data(data, num_splits):
    return [data[i::num_splits] for i in range(num_splits)]

# Parallelize the mapper step using ThreadPoolExecutor
num_threads = 4
chunks = split_data(df_nd.to_dict(orient='records'), num_threads)
with ThreadPoolExecutor(max_workers=num_threads) as executor:
    mapper_output = list(executor.map(mapper, chunks))

shuffled_data = shuffle(mapper_output)

# Parallelize the reducer step using ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
    reduced_data = dict(executor.map(reducer, shuffled_data.items()))


In [10]:
print(f"Mapper output: {mapper_output}")

print(f"Shuffle output: {shuffled_data}")

print(f"Reducer output: {reduced_data}")

Mapper output: [[('UES9151GS5', 1), ('CYJ0225CH1', 1), ('UES9151GS5', 1), ('PAJ3974RK1', 1), ('MXU9187YC7', 1), ('BWI0520BG6', 1), ('YMH6360YP0', 1), ('MXU9187YC7', 1), ('EZC9678QI6', 1), ('CXN7304ER2', 1), ('IEG9308EA5', 1), ('HCA3158QA6', 1), ('KKP5277HZ7', 1), ('DAZ3029XA0', 1), ('VZY2993ME1', 1), ('UES9151GS5', 1), ('SPR4484HA6', 1), ('PUD8209OG3', 1), ('POP2875LH3', 1), ('HGO4350KK1', 1), ('LLZ3798PE3', 1), ('WBE6935NU3', 1), ('POP2875LH3', 1), ('CDC0302NN5', 1), ('BWI0520BG6', 1), ('KKP5277HZ7', 1), ('EDV2089LK5', 1), ('HCA3158QA6', 1), ('SPR4484HA6', 1), ('HGO4350KK1', 1), ('UES9151GS5', 1), ('ONL0812DH1', 1), ('SPR4484HA6', 1), ('WTC9125IE5', 1), ('DAZ3029XA0', 1), ('BWI0520BG6', 1), ('CDC0302NN5', 1), ('JJM4724RF7', 1), ('WYU2010YH8', 1), ('VZY2993ME1', 1), ('HGO4350KK1', 1), ('YMH6360YP0', 1), ('BWI0520BG6', 1), ('HGO4350KK1', 1), ('POP2875LH3', 1), ('JBE2302VO4', 1), ('UES9151GS5', 1), ('CKZ3132BR4', 1), ('CYJ0225CH1', 1), ('POP2875LH3', 1), ('MXU9187YC7', 1), ('CXN7304ER2',

In [11]:
# Find the passenger(s) with the highest number of flights
max_flights = max(reduced_data.values())
highest_flight_passengers = [k for k, v in reduced_data.items() if v == max_flights]

print("Passenger(s) with the highest number of flights:")
for passenger in highest_flight_passengers:
    print(f"Passenger ID: {passenger}, Number of flights: {max_flights}")

Passenger(s) with the highest number of flights:
Passenger ID: UES9151GS5, Number of flights: 17
Passenger ID: EZC9678QI6, Number of flights: 17
Passenger ID: HCA3158QA6, Number of flights: 17
Passenger ID: DAZ3029XA0, Number of flights: 17
Passenger ID: SPR4484HA6, Number of flights: 17
