<a href="https://colab.research.google.com/github/shaikhjavedofficial/IITJ/blob/main/Assignment2-G24AI2039-ShaikhJaved.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [69]:
# --- Initialization ---
%%capture
# Install Hadoop (pseudo-distributed) & mrjob, fetch datasets for reproducibility.
!pip install mrjob
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-8-openjdk-amd64'
# --- Download datasets ---
!wget -O cruise.csv "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/cruise.csv"
!wget -O customer_churn.csv "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/customer_churn.csv"
!wget -O ecom_customer.csv "https://raw.githubusercontent.com/TakMashhido/PGD-BigData-Tutorial/refs/heads/main/Dataset/e-com_customer.csv"


### Q1. Cruiseline Aggregations

For each Cruise Line in `cruise.csv`, compute:
- Total number of ships,
- Average Tonnage (rounded to 2 decimals),
- Maximum crew size.

Implemented using `mrjob`, with Combiner for efficiency.


In [70]:
q1_test_data = """Ship_name,Cruise_line,Age,Tonnage,passengers,length,cabins,passenger_density,crew
Journey,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55
Quest,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55
Celebration,Carnival,26,47.262,14.86,7.22,7.43,31.8,6.7
Conquest,Carnival,11,110.0,29.74,9.53,14.88,36.99,19.1
Destiny,Carnival,17,101.353,26.42,8.92,13.21,38.36,10.0
Ecstasy,Carnival,22,70.367,20.52,8.55,10.2,34.29,9.2
Elation,Carnival,15,70.367,20.52,8.55,10.2,34.29,9.2
Fantasy,Carnival,23,70.367,20.56,8.55,10.22,34.23,9.2
Fascination,Carnival,19,70.367,20.52,8.55,10.2,34.29,9.2
Freedom,Carnival,6,110.23899999999999,37.0,9.51,14.87,29.79,11.5
"""
with open("q1_test.csv", "w") as f: f.write(q1_test_data)


In [71]:
%%file CruiseLineAgg.py

from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CruiseLineAgg(MRJob):
    """
    MapReduce job to compute:
    - Total ships, average tonnage, and max crew per Cruise Line.
    Usage:
      !python CruiseLineAgg.py cruise.csv
    """

    def mapper(self, _, line):
        row = next(csv.reader([line]))
        if row[0] == "Ship_name":  # skip header
            return
        cruise_line = row[1]
        tonnage = float(row[2])
        crew = float(row[4])
        yield cruise_line, (1, tonnage, crew)

    def combiner(self, cruise_line, values):
        count = 0
        ton_sum = 0.0
        max_crew = 0.0
        for ships, ton, crew in values:
            count += ships
            ton_sum += ton
            max_crew = max(max_crew, crew)
        yield cruise_line, (count, ton_sum, max_crew)

    def reducer(self, cruise_line, values):
        count = 0
        ton_sum = 0.0
        max_crew = 0.0
        for ships, ton, crew in values:
            count += ships
            ton_sum += ton
            max_crew = max(max_crew, crew)
        avg_ton = round(ton_sum / count, 2)
        yield cruise_line, {"ships": count, "avg_tonnage": avg_ton, "max_crew": max_crew}

if __name__ == '__main__':
    CruiseLineAgg.run()


Overwriting CruiseLineAgg.py


In [72]:
# TEST RUN ON SMALL DATA
!python3 CruiseLineAgg.py q1_test.csv

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/CruiseLineAgg.root.20250729.180521.520229
Running step 1 of 1...
job output is in /tmp/CruiseLineAgg.root.20250729.180521.520229/output
Streaming final output from /tmp/CruiseLineAgg.root.20250729.180521.520229/output...
"Azamara"	{"ships": 2, "avg_tonnage": 6.0, "max_crew": 6.94}
"Carnival"	{"ships": 8, "avg_tonnage": 17.38, "max_crew": 37.0}
Removing temp directory /tmp/CruiseLineAgg.root.20250729.180521.520229...


### Q2. Company Churn Rate
Description
Input: `customer_churn.csv`

For each company, output: churn rate = CHURNED / TOTAL, float (4 decimals)

Only output for companies in VIP list.

Use MultiStepJob:

Step1: Map company, (churned and total)

Step2: Compute rates, filter using VIP file in distributed cache

In [73]:
q2_test_data = """Names,Age,Total_Purchase,Account_Manager,Years,Num_Sites,Onboard_date,Location,Company,Churn
Cameron Williams,42.0,11066.8,0,7.22,8.0,2013-08-30 07:00:40,"10265 Elizabeth Mission Barkerburgh, AK 89518",Harvey LLC,1
Kevin Mueller,41.0,11916.22,0,6.5,11.0,2013-08-13 00:38:46,"6157 Frank Gardens Suite 019 Carloshaven, RI 17756",Wilson PLC,1
Eric Lozano,38.0,12884.75,0,6.67,12.0,2016-06-29 06:20:07,"1331 Keith Court Alyssahaven, DE 90114","Miller, Johnson and Wallace",1
Phillip White,42.0,8010.76,0,6.71,10.0,2014-04-22 12:43:12,"13120 Daniel Mount Angelabury, WY 30645-4695",Smith Inc,1
Cynthia Norton,37.0,9191.58,0,5.56,9.0,2016-01-19 15:31:15,"765 Tricia Row Karenshire, MH 71730",Love-Jones,1
Michael Lam,40.0,7675.52,1,4.31,11.0,2012-05-27 00:16:23,"63714 Sawyer Glens New Tonimouth, FL 73286-2490",Olson-Davis,0
Nancy Thompson,36.0,13308.01,0,5.07,8.0,2009-01-20 00:52:46,"847 Holly Loaf Apt. 839 East Nancyberg, DC 64194-2557",Garcia-Mckinney,0
Steve Lewis,37.0,7961.21,0,2.91,9.0,2014-06-12 12:47:15,"865 Mitchell Causeway Suite 183 Meredithshire, AR 80502-3040",Clarke-Gonzalez,0
Jessica Williams,48.0,10356.02,0,5.12,8.0,2009-03-03 23:13:37,"6187 Olson Mountains East Vincentborough, PR 74359",Kelly-Warren,1
Eric Butler,44.0,11331.58,1,5.23,11.0,2016-12-05 03:35:43,"4846 Savannah Road West Justin, IA 87713-3460",Reynolds-Sheppard,1
Zachary Walsh,32.0,9885.12,1,6.92,9.0,2006-03-09 14:50:20,"25271 Roy Expressway Suite 147 Brownport, FM 59852-6150",Singh-Cole,1
"""
with open("q2_test.csv", "w") as f: f.write(q2_test_data)
with open("VIP_companies.txt", "w") as f: f.write("Harvey LLC\nWilson PLC\nSmith Inc\nMiller, Johnson and Wallace\nReynolds-Sheppard")


In [95]:
%%file CompanyChurnRate.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class CompanyChurnRate(MRJob):
    """For each company in VIP_companies.txt, output churn rate to 4 decimals."""
    FILES = ['VIP_companies.txt']

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_counts,
                   reducer=self.reducer_aggregate),
            MRStep(reducer_init=self.reducer_init,
                   reducer=self.reducer_churn_rate)
        ]

    def mapper_get_counts(self, _, line):
        try:
            row = next(csv.reader([line]))
            # skip lines that do not have at least 3 columns
            if len(row) < 10:
                return
            # skip header or any rows where churn is not '0' or '1'
            if row[9] not in ('0', '1'):
                return
            company = row[8]
            churn = int(row[9])
            # yield company with counts: (1 for total, churn status 0 or 1)
            yield company, (1, churn)
        except Exception:
            return

    def reducer_aggregate(self, company, values):
        total = 0
        churned = 0
        for cnt, ch in values:
            total += cnt
            churned += ch
        # yield company with total customers and churned customers
        yield company, (total, churned)

    def reducer_init(self):
        # Load VIP companies in set for filtering results
        with open("VIP_companies.txt") as f:
            self.vipset = set(line.strip() for line in f if line.strip())

    def reducer_churn_rate(self, company, vals):
        for total, churned in vals:
            if company in self.vipset:
                rate = churned / total if total else 0.0
                # yield company with churn rate rounded to 4 decimals
                yield company, round(rate, 4)

if __name__ == '__main__':
    import sys
    from io import StringIO
    import contextlib
    import json

    if len(sys.argv) > 1:
      mr_job = CompanyChurnRate(args=sys.argv[1:])
      with mr_job.make_runner() as runner:
          runner.run()  # Run the MRJob
          for line in runner.cat_output():
            line = line.decode('utf-8')  # convert bytes → string
            line = line.strip()
            if not line or '\t' not in line:
              continue  # skip empty lines
            line = line.decode('utf-8') if isinstance(line, bytes) else line
            try:
                key, value = line.split('\t',1)
                print(f"{key}: {value}")
            except Exception as e:
                print(f"Failed to decode line: {line!r} — {e}")

    else:
        print("Please provide input file(s) as command line arguments.")


Overwriting CompanyChurnRate.py


In [96]:
!python3 CompanyChurnRate.py q2_test.csv


No configs specified for inline runner
"Miller, Johnson and Wallace": 1.0
"Reynolds-Sheppard": 1.0
"Smith Inc": 1.0
"Wilson PLC": 1.0
"Harvey LLC": 1.0


### Q3. State-wise Spending

Extract state from address (e.g., "123 Main St, Houston, TX 77002")

For each STATE, sum Yearly Amount Spent, output top 5 by total spending.

In [76]:
q3_test_data = """Email,Address,Avatar,Avg Session Length,Time on App,Time on Website,Length of Membership,Yearly Amount Spent
mstephenson@fernandez.com,"835 Frank TunnelWrightmouth, MI 82180-9605",Violet,34.49726772511229,12.65565114916675,39.57766801952616,4.0826206329529615,587.9510539684005
hduke@hotmail.com,"4547 Archer CommonDiazchester, CA 06566-8576",DarkGreen,31.92627202636016,11.109460728682564,37.268958868297744,2.66403418213262,392.2049334443264
pallen@yahoo.com,"24645 Valerie Unions Suite 582Cobbborough, DC 99414-7564",Bisque,33.000914755642675,11.330278057777512,37.110597442120856,4.104543202376424,487.54750486747207
riverarebecca@gmail.com,"1414 David ThroughwayPort Jason, OH 22070-1220",SaddleBrown,34.30555662975554,13.717513665142507,36.72128267790313,3.120178782748092,581.8523440352177
mstephens@davidson-herman.com,"14023 Rodriguez PassagePort Jacobville, PR 37242-1057",MediumAquaMarine,33.33067252364639,12.795188551078114,37.53665330059473,4.446308318351434,599.4060920457634
alvareznancy@lucas.biz,"645 Martha Park Apt. 611Jeffreychester, MN 67218-7250",FloralWhite,33.871037879341976,12.026925339755056,34.47687762925054,5.493507201364199,637.102447915074
katherine20@yahoo.com,"68388 Reyes Lights Suite 692Josephbury, WV 92213-0247",DarkSlateBlue,32.02159550138701,11.366348309710526,36.68377615286961,4.685017246570912,521.5721747578274
awatkins@yahoo.com,Unit 6538 Box 8980DPO AP 09026-4941,Aqua,32.739142938380326,12.35195897300293,37.37335885854755,4.4342734348999375,549.9041461052942
vchurch@walter-martinez.com,"860 Lee KeyWest Debra, SD 97450-0495",Salmon,33.98777289568564,13.386235275676436,37.534497341555735,3.2734335777477144,570.2004089636196
bonnie69@lin.biz,"PSC 2734, Box 5255APO AA 98456-7482",Brown,31.936548618448917,11.814128294972196,37.14516822352819,3.202806071553459,427.1993848953282
"""
with open("q3_test.csv", "w") as f: f.write(q3_test_data)


In [88]:
%%file StateSpendingTop5.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class StateSpendingTop5(MRJob):

    def mapper(self, _, line):
        row = next(csv.reader([line]))
        if row[0] == "Email": return
        address = row[1]
        try:
            # State code is second-last after ','
            state = address.strip().split(",")[-2].strip().split()[-1]
            spent = float(row[7])
            yield state, spent
        except:
            pass

    def reducer(self, state, amounts):
        yield None, (state, sum(amounts))

    def reducer_find_top5(self, _, state_amounts):
        # state_amounts: iterable of (state, total)
        top5 = sorted(state_amounts, key=lambda x: x[1], reverse=True)[:5]
        for state, total in top5:
            yield state, round(total, 2)

    def steps(self):
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer),
            MRStep(reducer=self.reducer_find_top5)
        ]
if __name__ == '__main__':
    import sys
    from io import StringIO
    import contextlib
    import json

    if len(sys.argv) > 1:
      mr_job = StateSpendingTop5(args=sys.argv[1:])
      with mr_job.make_runner() as runner:
          runner.run()  # Run the MRJob
          for line in runner.cat_output():
            line = line.decode('utf-8')  # convert bytes → string
            line = line.strip()
            if not line or '\t' not in line:
              continue  # skip empty lines
            line = line.decode('utf-8') if isinstance(line, bytes) else line
          try:
            key, value = line.split('\t',1)
            print(f"{key}: {value}")
          except Exception as e:
            print(f"Failed to decode line: {line!r} — {e}")

    else:
        print("Please provide input file(s) as command line arguments.")

Overwriting StateSpendingTop5.py


In [89]:
!python3 StateSpendingTop5.py q3_test.csv

No configs specified for inline runner
"Debra": 570.2


###Q4. Two-step Ship Filter & Median Length

Step 1: Filter all ships with passenger density > 35.0, emit ⟨cruise line, length⟩

Step 2: For each cruise line, compute median length

Output 2 decimals; handle even/odd number of values



In [79]:
q4_test_data = """Ship_name,Cruise_line,Age,Tonnage,passengers,length,cabins,passenger_density,crew
Journey,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55
Quest,Azamara,6,30.276999999999997,6.94,5.94,3.55,42.64,3.55
Celebration,Carnival,26,47.262,14.86,7.22,7.43,31.8,6.7
Conquest,Carnival,11,110.0,29.74,9.53,14.88,36.99,19.1
Destiny,Carnival,17,101.353,26.42,8.92,13.21,38.36,10.0
Ecstasy,Carnival,22,70.367,20.52,8.55,10.2,34.29,9.2
Elation,Carnival,15,70.367,20.52,8.55,10.2,34.29,9.2
Fantasy,Carnival,23,70.367,20.56,8.55,10.22,34.23,9.2
Fascination,Carnival,19,70.367,20.52,8.55,10.2,34.29,9.2
Freedom,Carnival,6,110.23899999999999,37.0,9.51,14.87,29.79,11.5
"""
## Compute density = passengers/(tonnage/100)
## line A: 200/10=20
## line C: 60/15=4
## line E: 36/8=4.5
## line B: 30/13=2.31
## line D: 30/6=5
with open("q4_test.csv", "w") as f: f.write(q4_test_data)


In [92]:
%%file ShipDensityMedian.py
from mrjob.job import MRJob
from mrjob.step import MRStep
import csv

class ShipDensityMedian(MRJob):
    def steps(self):
        return [
            MRStep(mapper=self.mapper_filter_density,
                   reducer=self.reducer_gather_lengths),
            MRStep(reducer=self.reducer_median)
        ]

    def mapper_filter_density(self, _, line):
        row = next(csv.reader([line]))
        if row[0] == "Ship_name": return
        cruise_line = row[1]
        tonnage = float(row[3])
        passengers = float(row[4])
        length = float(row[5])
        density = passengers / (tonnage/100)
        if density > 35.0:
            yield cruise_line, length

    def reducer_gather_lengths(self, cruise_line, lengths):
        # Pass all lengths as a list for median calculation
        yield cruise_line, list(lengths)

    def reducer_median(self, cruise_line, lengths_list):
        # lengths_list will be (list_of_lengths,)
        for lengths in lengths_list:
            L = sorted(lengths)
            n = len(L)
            if n==0:
                continue
            if n%2==1:
                median = L[n//2]
            else:
                median = (L[n//2-1]+L[n//2])/2
            yield cruise_line, round(median,2)
if __name__ == '__main__':
    import sys
    from io import StringIO
    import contextlib
    import json

    if len(sys.argv) > 1:
      mr_job = ShipDensityMedian(args=sys.argv[1:])
      with mr_job.make_runner() as runner:
          runner.run()  # Run the MRJob
          for line in runner.cat_output():
            line = line.decode('utf-8')  # convert bytes → string
            line = line.strip()
            if not line or '\t' not in line:
              continue  # skip empty lines
            line = line.decode('utf-8') if isinstance(line, bytes) else line
            try:
              key, value = line.split('\t',1)
              print(f"{key}: {value}")
            except Exception as e:
              print(f"Failed to decode line: {line!r} — {e}")

    else:
        print("Please provide input file(s) as command line arguments.")

Overwriting ShipDensityMedian.py


In [93]:
!python3 ShipDensityMedian.py q4_test.csv

No configs specified for inline runner


In [97]:
print("Q1 Final Output:")
!python3 CruiseLineAgg.py cruise.csv | head -20

print("\nQ2 Final Output (provide your real VIP companies list!):")
!python3 CompanyChurnRate.py customer_churn.csv | head -10

print("\nQ3 Final Output:")
!python3 StateSpendingTop5.py ecom_customer.csv

print("\nQ4 Final Output:")
!python3 ShipDensityMedian.py cruise.csv


Q1 Final Output:
No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/CruiseLineAgg.root.20250729.181722.521079
Running step 1 of 1...
job output is in /tmp/CruiseLineAgg.root.20250729.181722.521079/output
Streaming final output from /tmp/CruiseLineAgg.root.20250729.181722.521079/output...
"Disney"	{"ships": 2, "avg_tonnage": 14.5, "max_crew": 17.5}
"Holland_American"	{"ships": 14, "avg_tonnage": 17.07, "max_crew": 21.04}
"MSC"	{"ships": 8, "avg_tonnage": 15.12, "max_crew": 39.59}
"Norwegian"	{"ships": 13, "avg_tonnage": 17.46, "max_crew": 23.94}
"Oceania"	{"ships": 3, "avg_tonnage": 14.33, "max_crew": 6.84}
"Orient"	{"ships": 1, "avg_tonnage": 48.0, "max_crew": 8.26}
"P&O"	{"ships": 6, "avg_tonnage": 14.0, "max_crew": 35.74}
"Princess"	{"ships": 17, "avg_tonnage": 12.94, "max_crew": 37.82}
"Regent_Seven_Seas"	{"ships": 5, "avg_tonnage": 15.8, "max_crew": 7.0}
"Royal_Caribbean"	{"ships": 23, "avg_tonnage": 13.65, "max_