In [1]:
from collections import defaultdict  
from concurrent.futures import ThreadPoolExecutor  
  
# Simulates the Map phase that processes a single row of data  
def process_line(line):  
    parts = line.strip().split(',')  
    passenger_id = parts[0]  
    return passenger_id, 1  # Returns increments of the passenger ID and flight count 
  
def map_phase_parallel(passenger_data_file, max_workers):  
    passenger_flights = defaultdict(int)  
    with open(passenger_data_file, 'r') as file:  
        with ThreadPoolExecutor(max_workers=max_workers) as executor:   
            tasks = (line for line in file) 
            for result in executor.map(process_line, tasks):  
                passenger_id, increment = result  
                passenger_flights[passenger_id] += increment  
    return passenger_flights  

def reduce_phase(passenger_flights):  
    max_flights = 0  
    max_flights_passengers = []  
    for passenger_id, flight_count in passenger_flights.items():  
        if flight_count > max_flights:  
            max_flights = flight_count  
            max_flights_passengers = [passenger_id]  
        elif flight_count == max_flights:  
            max_flights_passengers.append(passenger_id)  
    return max_flights, max_flights_passengers  
   
def main():  
    passenger_data_file = 'C:/Users/Administrator/Desktop/李锦怡研一（下）/大数据与云计算/cw/coursework/AComp_Passenger_data_no_error(1).csv' 
    max_workers = 4  # Suppose I want four worker threads  
    passenger_flights = map_phase_parallel(passenger_data_file, max_workers) # Execute the Map phase (parallelization) 
    max_flights, max_flights_passengers = reduce_phase(passenger_flights) # Perform Reduce phase 
    print(f"Passenger(s) with the highest number of flights: {max_flights_passengers}") # Output result 
    print(f"Total flights: {max_flights}")  
  
if __name__ == '__main__':  
    main()

Passenger(s) with the highest number of flights: ['UES9151GS5']
Total flights: 25
