In [None]:
from google.colab import files
uploaded = files.upload()


In [None]:
import csv
import math
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
import pandas as pd

# === Multi-threaded MapReduce Simulation
# This solution mimics the parallel nature of MapReduce using Python threads.

# Map Phase: Processes a chunk of rows and maps passenger flights
def map_chunk(chunk):
    """
    Map function: Converts a list of rows into key-value pairs (PassengerID, 1)
    """
    return [(row[0], 1) for row in chunk]

def multithreaded_map(filepath, num_threads=4):
    """
    Split the dataset into chunks and process them in parallel using threads
    """
    with open(filepath, mode='r') as file:
        reader = list(csv.reader(file))

    # Divide data into chunks for parallel mapping
    chunk_size = math.ceil(len(reader) / num_threads)
    chunks = [reader[i:i + chunk_size] for i in range(0, len(reader), chunk_size)]

    mapped_results = []
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = executor.map(map_chunk, chunks)
        for result in futures:
            mapped_results.extend(result)

    return mapped_results

In [None]:
#Reduce Phase: Processes a chunk of mapped pairs
def reduce_chunk(mapped_data):
    """
    Reduce function: Aggregates counts for each PassengerID in a chunk
    """
    local_counts = defaultdict(int)
    for passenger_id, count in mapped_data:
        local_counts[passenger_id] += count
    return local_counts

def merge_counts(partials):
    """
    Merge function: Combines partial counts from different threads
    """
    final_counts = defaultdict(int)
    for partial in partials:
        for pid, count in partial.items():
            final_counts[pid] += count
    return final_counts

def multithreaded_reduce(mapped_data, num_threads=4):
    """
    Run reduce step in parallel across chunks of mapped data
    """
    chunk_size = math.ceil(len(mapped_data) / num_threads)
    chunks = [mapped_data[i:i + chunk_size] for i in range(0, len(mapped_data), chunk_size)]

    partial_results = []
    with ThreadPoolExecutor(max_workers=num_threads) as executor:
        futures = executor.map(reduce_chunk, chunks)
        for result in futures:
            partial_results.append(result)

    return merge_counts(partial_results)