# Assignment 2 - Fundamentals of Distributed Systems
**Student:** Sanjay Kumar Ranjan  
**Roll No:** G24AI2080

---
## Setup
Install required packages and setup Hadoop environment.

In [None]:
# Install mrjob
!pip install mrjob

# Setup Hadoop environment (Google Colab compatible)
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
!wget -q https://downloads.apache.org/hadoop/common/hadoop-3.3.1/hadoop-3.3.1.tar.gz
!tar -xzf hadoop-3.3.1.tar.gz
os.environ["HADOOP_HOME"] = os.path.join(os.getcwd(), "hadoop-3.3.1")
os.environ["PATH"] += os.pathsep + os.path.join(os.environ["HADOOP_HOME"], "bin")


## Load Datasets
Download all datasets using `wget`.

In [None]:
!wget https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv -O cruise.csv
!wget https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv -O customer_churn.csv
!wget https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/e-com_customer.csv -O ecom_customer.csv


## Question 1: Cruiseline Aggregations (5 marks)

**Objective:** Using `cruise.csv`, compute the following per Cruise line:
- Total number of ships
- Average tonnage (to two decimals)
- Maximum crew size

We use `mrjob` to implement this as a MapReduce job with an optional combiner.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CruiseAggregation(MRJob):
    def mapper(self, _, line):
        if "Cruise_line" in line:
            return
        fields = next(csv.reader([line]))
        cruise_line = fields[-1]
        try:
            tonnage = float(fields[3])
            crew = float(fields[5])
            yield cruise_line, (1, tonnage, crew)
        except:
            pass

    def combiner(self, key, values):
        count, tonnage_sum, max_crew = 0, 0, 0
        for v in values:
            count += v[0]
            tonnage_sum += v[1]
            max_crew = max(max_crew, v[2])
        yield key, (count, tonnage_sum, max_crew)

    def reducer(self, key, values):
        count, tonnage_sum, max_crew = 0, 0, 0
        for v in values:
            count += v[0]
            tonnage_sum += v[1]
            max_crew = max(max_crew, v[2])
        avg_tonnage = round(tonnage_sum / count, 2)
        yield key, (count, avg_tonnage, max_crew)

if __name__ == "__main__":
    CruiseAggregation.run()


### ✅ Test Run for Question 1
We use a small test file `cruise_test.csv` with mock data to validate the MapReduce job.

In [None]:
# Create a small inline test CSV for CruiseAggregation
with open("cruise_test.csv", "w") as f:
    f.write("Ship_name,Line_name,Age,Tonnage,Cabins,Crew,Passengers,Cruise_line\n")
    f.write("ShipA,LineA,10,30000,500,1200,1500,Carnival\n")
    f.write("ShipB,LineB,8,50000,700,1300,2000,Carnival\n")
    f.write("ShipC,LineC,15,20000,400,800,1200,Norwegian\n")


In [None]:
# Save the CruiseAggregation class into a temporary .py script
with open("cruise_agg.py", "w") as f:
    f.write("""from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CruiseAggregation(MRJob):
    def mapper(self, _, line):
        if "Cruise_line" in line:
            return
        fields = next(csv.reader([line]))
        cruise_line = fields[-1]
        try:
            tonnage = float(fields[3])
            crew = float(fields[5])
            yield cruise_line, (1, tonnage, crew)
        except:
            pass

    def combiner(self, key, values):
        count, tonnage_sum, max_crew = 0, 0, 0
        for v in values:
            count += v[0]
            tonnage_sum += v[1]
            max_crew = max(max_crew, v[2])
        yield key, (count, tonnage_sum, max_crew)

    def reducer(self, key, values):
        count, tonnage_sum, max_crew = 0, 0, 0
        for v in values:
            count += v[0]
            tonnage_sum += v[1]
            max_crew = max(max_crew, v[2])
        avg_tonnage = round(tonnage_sum / count, 2)
        yield key, (count, avg_tonnage, max_crew)

if __name__ == "__main__":
    CruiseAggregation.run()
""")

# Run the job on the test CSV
!python cruise_agg.py cruise_test.csv


### ✅ Fixed Test Runner for CruiseAggregation
This block uses the already defined class above and runs the job directly on `cruise_test.csv`.

In [None]:
# Create test CSV
with open("cruise_test.csv", "w") as f:
    f.write("Ship_name,Line_name,Age,Tonnage,Cabins,Crew,Passengers,Cruise_line\n")
    f.write("ShipA,LineA,10,30000,500,1200,1500,Carnival\n")
    f.write("ShipB,LineB,8,50000,700,1300,2000,Carnival\n")
    f.write("ShipC,LineC,15,20000,400,800,1200,Norwegian\n")

# Run the CruiseAggregation job inline
mr_job = CruiseAggregation(args=["cruise_test.csv"])
with mr_job.make_runner() as runner:
    runner.run()
    for line in runner.stream_output():
        key, value = mr_job.parse_output_line(line)
        print(key, value)


## Question 2: Company Churn Rate (5 marks)

**Objective:** From `customer_churn.csv`, compute the churn rate for each company using a two-step MapReduce job.

- **Step 1:** Emit `(Company, TOTAL)` and `(Company, CHURNED)` based on `Churn == 1`.
- **Step 2:** Reducer calculates `CHURNED / TOTAL` (rounded to 4 decimals).
- **Constraint:** Only include companies listed in `VIP_companies.txt`, loaded via distributed cache.

We demonstrate the implementation using `mrjob.MultiStepJob`.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CompanyChurnRate(MRJob):

    def configure_args(self):
        super(CompanyChurnRate, self).configure_args()
        self.add_file_arg('--vip', help='Path to VIP companies file')

    def load_vip_companies(self):
        self.vip_companies = set()
        with open(self.options.vip, 'r') as f:
            for line in f:
                self.vip_companies.add(line.strip())

    def mapper_init(self):
        self.load_vip_companies()

    def mapper(self, _, line):
        if line.startswith("Name"):  # skip header
            return
        fields = next(csv.reader([line]))
        try:
            company = fields[8]
            churned = int(fields[3])
            if company in self.vip_companies:
                yield company, ("TOTAL", 1)
                if churned == 1:
                    yield company, ("CHURNED", 1)
        except:
            pass

    def reducer(self, key, values):
        total = churned = 0
        for label, count in values:
            if label == "TOTAL":
                total += count
            elif label == "CHURNED":
                churned += count
        if total > 0:
            churn_rate = round(churned / total, 4)
            yield key, churn_rate

    def steps(self):
        return [MRStep(mapper_init=self.mapper_init,
                       mapper=self.mapper,
                       reducer=self.reducer)]

if __name__ == "__main__":
    CompanyChurnRate.run()


### ✅ Test Run for Question 2
Use a sample `customer_churn_test.csv` and `VIP_companies.txt` to validate the churn rate computation.

In [None]:
# Create sample VIP companies file
with open("VIP_companies.txt", "w") as f:
    f.write("Kelly-Warren\n")
    f.write("Smith Inc\n")
    f.write("Wilson PLC\n")

# Create sample customer churn test CSV
with open("customer_churn_test.csv", "w") as f:
    f.write("Name,Age,Total_Purchase,Churn,Account_Manager,Years,Num_Sites,Onboard_date,Company,Location\n")
    f.write("Jessica Williams,48.0,10356.02,0,5.12,8.0,2009-03-03 23:13:37,Kelly-Warren,PR\n")
    f.write("Tom Brown,35.0,9876.45,1,4.50,5.0,2011-05-21 18:44:12,Smith Inc,CA\n")
    f.write("Anita Rao,42.0,7890.00,1,3.90,7.0,2012-07-15 14:10:25,Smith Inc,NY\n")
    f.write("Ravi Kumar,50.0,12000.00,0,5.70,10.0,2010-11-01 08:35:55,Wilson PLC,TX\n")


In [None]:
# Save the CompanyChurnRate class into a .py script
with open("company_churn.py", "w") as f:
    f.write("""from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CompanyChurnRate(MRJob):

    def configure_args(self):
        super(CompanyChurnRate, self).configure_args()
        self.add_file_arg('--vip', help='Path to VIP companies file')

    def load_vip_companies(self):
        self.vip_companies = set()
        with open(self.options.vip, 'r') as f:
            for line in f:
                self.vip_companies.add(line.strip())

    def mapper_init(self):
        self.load_vip_companies()

    def mapper(self, _, line):
        if line.startswith("Name"):
            return
        fields = next(csv.reader([line]))
        try:
            company = fields[8]
            churned = int(fields[3])
            if company in self.vip_companies:
                yield company, ("TOTAL", 1)
                if churned == 1:
                    yield company, ("CHURNED", 1)
        except:
            pass

    def reducer(self, key, values):
        total = churned = 0
        for label, count in values:
            if label == "TOTAL":
                total += count
            elif label == "CHURNED":
                churned += count
        if total > 0:
            churn_rate = round(churned / total, 4)
            yield key, churn_rate

    def steps(self):
        return [MRStep(mapper_init=self.mapper_init,
                       mapper=self.mapper,
                       reducer=self.reducer)]

if __name__ == "__main__":
    CompanyChurnRate.run()
""")

# Run the job
!python company_churn.py customer_churn_test.csv --vip VIP_companies.txt


## Question 3: State-wise Spending (5 marks)

**Objective:** From `ecom_customer.csv`, extract the state code from the `Address` field and compute:

- Total yearly amount spent per state
- Output **top 5 states** by total spending

This task involves sorting the final results in descending order of spending.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class StateSpending(MRJob):

    def mapper(self, _, line):
        if line.startswith("Email"):
            return
        fields = next(csv.reader([line]))
        try:
            address = fields[4]
            state = address.strip().split()[-2]  # Extract 2-letter state code
            amount = float(fields[3])
            yield state, amount
        except:
            pass

    def combiner(self, state, amounts):
        yield state, sum(amounts)

    def reducer(self, state, amounts):
        yield None, (sum(amounts), state)

    def final_reducer(self, _, state_amount_pairs):
        top_states = sorted(state_amount_pairs, reverse=True)[:5]
        for total, state in top_states:
            yield state, round(total, 2)

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer),
            MRStep(reducer=self.final_reducer)
        ]

if __name__ == "__main__":
    StateSpending.run()


### ✅ Test Run for Question 3
We use a sample `ecom_customer_test.csv` to validate the top 5 states by yearly spending.

In [None]:
# Create a test CSV for state-wise spending
with open("ecom_customer_test.csv", "w") as f:
    f.write("Email,Address,Avatar,Avg Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent\n")
    f.write("a@example.com,"123 Elm St Springfield IL 62704",avatar1,34.5,12.5,15.2,3.4,5000.00\n")
    f.write("b@example.com,"456 Oak St Albany NY 12207",avatar2,33.8,13.0,14.1,2.1,7000.00\n")
    f.write("c@example.com,"789 Pine St Austin TX 73301",avatar3,35.0,12.2,14.9,5.0,6500.00\n")
    f.write("d@example.com,"101 Maple St Miami FL 33101",avatar4,34.0,13.5,15.5,4.3,7200.00\n")
    f.write("e@example.com,"202 Birch St Seattle WA 98101",avatar5,33.0,13.8,14.0,3.7,8000.00\n")
    f.write("f@example.com,"303 Cedar St Buffalo NY 14201",avatar6,32.5,12.0,13.8,2.9,6800.00\n")


In [None]:
# Save the StateSpending class into a .py script
with open("state_spending.py", "w") as f:
    f.write("""from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class StateSpending(MRJob):

    def mapper(self, _, line):
        if line.startswith("Email"):
            return
        fields = next(csv.reader([line]))
        try:
            address = fields[1]
            state = address.strip().split()[-2]
            amount = float(fields[-1])
            yield state, amount
        except:
            pass

    def combiner(self, state, amounts):
        yield state, sum(amounts)

    def reducer(self, state, amounts):
        yield None, (sum(amounts), state)

    def final_reducer(self, _, state_amount_pairs):
        top_states = sorted(state_amount_pairs, reverse=True)[:5]
        for total, state in top_states:
            yield state, round(total, 2)

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   combiner=self.combiner,
                   reducer=self.reducer),
            MRStep(reducer=self.final_reducer)
        ]

if __name__ == "__main__":
    StateSpending.run()
""")

# Run the job
!python state_spending.py ecom_customer_test.csv


## Question 4: Two-step Ship Filter & Median Length (5 marks)

**Objective:** From `cruise.csv`, perform the following using a two-step MapReduce job:
- **Step 1:** Filter ships with **passenger density > 35.0** and emit `(Cruise Line, Length)`.
- **Step 2:** Compute the **median length** per Cruise Line, handling both odd and even counts.


In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class MedianShipLength(MRJob):

    def mapper(self, _, line):
        if "Cruise_line" in line:
            return
        fields = next(csv.reader([line]))
        try:
            cruise_line = fields[-1]
            length = float(fields[6])
            passengers = float(fields[7])
            density = passengers / length
            if density > 35.0:
                yield cruise_line, length
        except:
            pass

    def reducer(self, key, values):
        lengths = sorted(values)
        n = len(lengths)
        if n % 2 == 1:
            median = round(lengths[n // 2], 2)
        else:
            median = round((lengths[n // 2 - 1] + lengths[n // 2]) / 2.0, 2)
        yield key, median

    def steps(self):
        return [MRStep(mapper=self.mapper, reducer=self.reducer)]

if __name__ == "__main__":
    MedianShipLength.run()


### ✅ Test Run for Question 4
We use a sample `cruise_test_median.csv` to filter ships by passenger density and compute median length per cruise line.

In [None]:
# Create a test CSV for cruise ship lengths
with open("cruise_test_median.csv", "w") as f:
    f.write("Ship_name,Line_name,Age,Tonnage,Cabins,Crew,Length,Passengers,Cruise_line\n")
    f.write("Ship1,Line1,5,30000,400,1000,200,8000,Carnival\n")   # density = 40.0
    f.write("Ship2,Line1,6,31000,420,1100,250,8500,Carnival\n")   # density = 34.0
    f.write("Ship3,Line1,7,32000,450,1200,190,7000,Carnival\n")   # density = 36.84
    f.write("Ship4,Line2,8,35000,480,1250,210,7500,Norwegian\n")  # density = 35.71
    f.write("Ship5,Line2,9,36000,500,1300,240,7000,Norwegian\n")  # density = 29.17
    f.write("Ship6,Line2,10,37000,520,1350,195,7100,Norwegian\n") # density = 36.41


In [None]:
# Save the MedianShipLength class into a .py script
with open("median_length.py", "w") as f:
    f.write("""from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class MedianShipLength(MRJob):

    def mapper(self, _, line):
        if "Cruise_line" in line:
            return
        fields = next(csv.reader([line]))
        try:
            cruise_line = fields[-1]
            length = float(fields[6])
            passengers = float(fields[7])
            density = passengers / length
            if density > 35.0:
                yield cruise_line, length
        except:
            pass

    def reducer(self, key, values):
        lengths = sorted(values)
        n = len(lengths)
        if n % 2 == 1:
            median = round(lengths[n // 2], 2)
        else:
            median = round((lengths[n // 2 - 1] + lengths[n // 2]) / 2.0, 2)
        yield key, median

    def steps(self):
        return [MRStep(mapper=self.mapper, reducer=self.reducer)]

if __name__ == "__main__":
    MedianShipLength.run()
""")

# Run the job
!python median_length.py cruise_test_median.csv
