MapReduce Simulation in Python to Find The Passenger with the Most Number of Flights

Student: Nadeem Al-Buqaeen 32838974

Date: 19/06/2025

Course: CSMBD

Description: This script simulates a MapReduce prototype to determine the passenger who have taken the highest number of flights based on a CSV dataset.

In [1]:
# importing the required libraries
import pandas as pd
from collections import defaultdict
import threading

In [15]:
# load the dataset
passenger_df = pd.read_csv('/Users/nadeembaqaeen/Desktop/CSMBD_Coursework/AComp_Passenger_data_no_error.csv', header=None)

In [25]:
# Map Phase : declare (passenger_id, 1) for each flight record
def map_phase(data_chunk):
    mapped = []
    for _, row in data_chunk.iterrows():
        passenger_id = row[0]
        mapped.append((passenger_id, 1))
    return mapped

In [45]:
# Shuffle Phase : grouping all the values by passenger_id
def shuffle_phase(mapped_data):
    shuffled = defaultdict(list)
    for key, value in mapped_data:
        shuffled[key].append(value)
    return shuffled

In [47]:
# Reduce Phase : Summation of all flights for each passenger_id
def reduce_phase(shuffled_data):
    reduced = {}
    for key in shuffled_data:
        reduced[key] = sum(shuffled_data[key])
    return reduced

In [49]:
# Thread worker for map phase
def threaded_map(data, results, index):
    results[index] = map_phase(data)

In [51]:
# Split data into chunks for parallel processing
num_threads = 4
chunk_size = len(passenger_df) // num_threads
data_chunks = [passenger_df.iloc[i*chunk_size : (i+1)*chunk_size] for i in range(num_threads)]

In [53]:
# Add any leftover data to the last chunk
if len(passenger_df) % num_threads != 0:
    data_chunks[-1] = pd.concat([data_chunks[-1], passenger_df.iloc[num_threads*chunk_size:]])

In [55]:
# Perform map phase using threads
mapped_results = [None] * num_threads
threads = []
for i in range(num_threads):
    t = threading.Thread(target=threaded_map, args=(data_chunks[i], mapped_results, i))
    threads.append(t)
    t.start()

In [57]:
# Wait for all threads to finish
for t in threads:
    t.join()

In [59]:
# Combine all mapped results
combined_mapped = [item for sublist in mapped_results for item in sublist]

In [61]:
# Shuffle and Reduce
shuffled_data = shuffle_phase(combined_mapped)
reduced_data = reduce_phase(shuffled_data)

In [69]:
# Find passenger with the highest number of flights
max_flights = max(reduced_data.values())
top_passengers = [pid for pid, count in reduced_data.items() if count == max_flights]

In [71]:
# Output result
print("The Passenger with the highest number of flights:")
for pid in top_passengers:
    print(f"Passenger ID: {pid}, Flights: {max_flights}")

The Passenger with the highest number of flights:
Passenger ID: UES9151GS5, Flights: 25
