In [None]:
!pip install --quiet mrjob==0.7.4

[?25l     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m0.0/439.6 kB[0m [31m?[0m eta [36m-:--:--[0m[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m439.6/439.6 kB[0m [31m13.2 MB/s[0m eta [36m0:00:00[0m
[?25h

In [None]:
#mount my google drive
from google.colab import drive
drive.mount("/content/gdrive")

Mounted at /content/gdrive


1) Find the most popular browser (made most requests) from the “web access log” file of lab01.

In [None]:
%%file q1.py
from mrjob.job import MRJob       #Each line in the log file represents one HTTP request-response cycle, with detailed information about the request, the server's response, and additional metadata like the client's IP address, timestamp, referrer, and user-agent.

class BrowserCounter(MRJob):

    def mapper(self, key, line):
        request = line.split(' ')
        if len(request) >= 12:
            browser = request[11]
            yield browser, 1

    def reducer(self, browser, count):
        yield browser, sum(count)

if __name__ == '__main__':
    BrowserCounter.run()


Overwriting browser_counter.py


In [None]:
%%file browser_counter.py
from mrjob.job import MRJob

class BrowserCounter(MRJob):

    def mapper(self, key, line):
        request = line.split(' ')
        if len(request) >= 12:
            browser = request[11]
            yield browser, 1

    def reducer_init(self):
        self.most_popular_browser = None
        self.max_requests = 0

    def reducer(self, browser, counts):
        total_requests = sum(counts)
        if total_requests > self.max_requests:
            self.max_requests = total_requests
            self.most_popular_browser = browser

    def reducer_final(self):
        if self.most_popular_browser is not None:
            yield self.most_popular_browser, self.max_requests

if __name__ == '__main__':
    BrowserCounter.run()


Overwriting browser_counter.py


In [None]:
!python browser_counter.py "/content/gdrive/My Drive/datasets/mr/web_access_log.txt"


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/browser_counter.root.20230915.054305.203069
Running step 1 of 1...
job output is in /tmp/browser_counter.root.20230915.054305.203069/output
Streaming final output from /tmp/browser_counter.root.20230915.054305.203069/output...
"\"Mozilla/5.0"	86479
"\"python-requests/1.2.3"	1733
Removing temp directory /tmp/browser_counter.root.20230915.054305.203069...


2) Suppose you are given a set of data vectors in the file iris.csv. Read each row in data
file as a data vector. You are also given a file containing class vectors in iris_classes.
Read each row in this file as a class vector. Your program should output ID of class vector for each data vector. Let ID of class vector be 1,2,3 in the order of their occurrence in the class file.


In [None]:

%%file classification.py
from mrjob.job import MRJob
import math
import csv
import re

def is_float(num):
    try:
        float(num)
        return True
    except ValueError:
        return False

class Classify(MRJob):

    def mapper_init(self):
        # Initialize a dictionary to store class vectors
        self.classes = {}
        i = 0
        with open('/content/gdrive/My Drive/datasets/iris/iris_classes.csv', mode='r') as file:
            for data in csv.reader(file):
                if is_float(data[0]):
                    i += 1
                    data = [float(k) for k in data]
                    self.classes[i] = data

    def mapper(self, key, line):
        # Split the input line into a list of values
        record = re.split(',', line)
        min_distance = math.inf
        cls = 0
        if is_float(record[0]):
            record = [float(i) for i in record]
            for class_no, class_vector in self.classes.items():
                d = 0
                for i in range(len(class_vector)):
                    # Calculate the squared difference for each dimension
                    d += (class_vector[i] - record[i]) ** 2
                if math.sqrt(d) < min_distance:
                    min_distance = math.sqrt(d)
                    cls = class_no
            # Emit the original data vector and its determined class
            yield record, cls

if __name__ == '__main__':
    Classify.run()


Overwriting classification.py


In [None]:
!python classification.py "/content/gdrive/My Drive/datasets/iris/iris_classes.csv"


No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/classification.root.20230915.055219.289266
Running step 1 of 1...
job output is in /tmp/classification.root.20230915.055219.289266/output
Streaming final output from /tmp/classification.root.20230915.055219.289266/output...
[6.8462, 3.0821, 5.7026, 2.0795]	3
[5.8885, 2.7377, 4.3967, 1.418]	1
[5.006, 3.418, 1.464, 0.244]	2
Removing temp directory /tmp/classification.root.20230915.055219.289266...


3) Compute top 10 earners from the “employee.csv” of lab01. You can only list employee numbers.
For computing top-10, you can maintain “sorted map” with size of 10 at mappers. Can refer to “lecture slides”.


In [None]:

%%file toptenearners.py
from heapq import heappush, heappop
from mrjob.job import MRJob
import re

class toptenearners(MRJob):

  def mapper_init(self):
    self.N = 10
    self.min_heap = []      # Initialize a min-heap to store the top earners

  def mapper(self, _, line):
    record = re.split(',', line)
    salary = int(record[3])
    emp_no = record[0]

    if emp_no.isdigit():      # Checking if the employee number is numeric (handle non-numeric cases)
      emp_no = int(emp_no)
    else:
      emp_no = int(emp_no[-5:])

    heappush(self.min_heap, (salary, emp_no))       # Push the salary and employee number into the min-heap

    if len(self.min_heap) > self.N:         # If the heap size exceeds N, remove the smallest element
      x = heappop(self.min_heap)

  def mapper_final(self):
    yield None, self.min_heap     # Emit a single key-value pair with None as the key and the min-heap as the value

  def reducer_init(self):
    self.N = 10
    self.min_heap = []

  def reducer(self, _, heaps):
    for heap in heaps:          # Combine the min-heaps from different mappers into a single min-heap
      for value in heap:
        heappush(self.min_heap, value)
        if len(self.min_heap) > self.N:             # If the heap size exceeds N, remove the smallest element
          x = heappop(self.min_heap)

  def reducer_final(self):
    top_earners = reversed(sorted(self.min_heap))         # Sort the min-heap in descending order to get the top earners
    for salary, emp_no in top_earners:
      yield emp_no, salary

if __name__ == '__main__':
  toptenearners.run()


Overwriting toptenearners.py


In [None]:
!python toptenearners.py "/content/gdrive/My Drive/datasets/mr/employee.csv"

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/toptenearners.root.20230915.055717.076672
Running step 1 of 1...
job output is in /tmp/toptenearners.root.20230915.055717.076672/output
Streaming final output from /tmp/toptenearners.root.20230915.055717.076672/output...
10089	250000
10010	220450
10272	180000
10015	178000
10019	170500
10288	157000
10086	150290
10222	148999
10198	140920
10028	138888
Removing temp directory /tmp/toptenearners.root.20230915.055717.076672...


4) Compute the standard deviation of salary for each department from “employee.csv”

In [None]:
%%file sd.py
from mrjob.job import MRJob
import math
import re
class EmployeeSd(MRJob):
  def mapper(self, _, line):
    record = re.split(',', line)
    dno = record[2]
    salary = int(record[3])
    yield dno, salary
  def reducer(self, dno, salaries):
    sal_list = list(salaries)
    n = len(sal_list)
    std = 0
    mean = sum(sal_list)/n
    for salary in sal_list:
      std+=(salary - mean)**2
    yield dno, math.sqrt(std/n)
if __name__ == '__main__':
  EmployeeSd.run()

Writing sd.py


In [None]:
!python sd.py "/content/gdrive/My Drive/datasets/mr/employee.csv"

No configs found; falling back on auto-configuration
No configs specified for inline runner
Creating temp directory /tmp/sd.root.20230915.055756.186692
Running step 1 of 1...
job output is in /tmp/sd.root.20230915.055756.186692/output
Streaming final output from /tmp/sd.root.20230915.055756.186692/output...
"1"	19722.68484258672
"2"	0.0
"3"	32875.83877242373
"4"	8754.93245490792
"5"	11420.800779947584
"6"	20702.734864531638
Removing temp directory /tmp/sd.root.20230915.055756.186692...
