<a href="https://colab.research.google.com/github/subho99/Computational-Data-Science/blob/main/SubhajitBasistha_M5_AST_03_Hadoop_MapReduce_C.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Advanced Certification Program in Computational Data Science
## A program by IISc and TalentSprint
### Assignment 3: Hadoop MapReduce

## Learning Objectives

At the end of the experiment, you will be able to

* understand what is Hadoop and its components
* perform various Hadoop-HDFS shell commands
* perform MapReduce operation on data

## Information

### Introduction

Hadoop is an open-source framework that allows the storage and processing of big data in a distributed environment across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage.

Apache Software Foundation is the developer of Hadoop, and its co-founders are Doug Cutting and Mike Cafarella.
Its co-founder Doug Cutting named it on his son’s toy elephant. In October 2003 the first paper release was Google File System. In January 2006, MapReduce development started on the Apache Nutch which consisted of around 6000 lines of code for it and around 5000 lines of code for HDFS. In April 2006 Hadoop 0.1.0 was released.

Apache Hadoop is a collection of open-source software utilities that facilitates using a network of many computers to solve problems involving massive amounts of data and computation. It provides a software framework for distributed storage and processing of big data using the MapReduce programming model. Hadoop was originally designed for computer clusters built from commodity hardware, which is still common use. It has since also found use on clusters of higher-end hardware. All the modules in Hadoop are designed with a fundamental assumption that hardware failures are common occurrences and should be automatically handled by the framework.

### Components of Hadoop

The core of Apache Hadoop consists of a storage part, known as Hadoop Distributed File System (HDFS), and a processing part which is a MapReduce programming model. Hadoop splits files into large blocks and distributes them across nodes in a cluster. It then transfers the packaged code into nodes to process the data in parallel. This approach takes advantage of data locality, where nodes manipulate the data they have access to and allows the dataset to be processed faster and more efficiently.

The components of Hadoop are as follows:

* Storage unit - Hadoop HDFS(Hadoop Distributed File System)
* Processing unit - Hadoop MapReduce
* Resource management unit - Hadoop YARN(Yet Another Resource Framework)

#### Hadoop HDFS

Hadoop File System was developed using distributed file system design. It is run on commodity hardware. Unlike other distributed systems, HDFS is highly fault tolerant and designed using low-cost hardware.

It holds very large amount of data and provides easier access. To store such huge data, the files are stored across multiple machines. These files are stored in a redundant fashion to rescue the system from possible data losses in case of failure. HDFS also makes applications available to parallel processing.

**Features of HDFS**

* Suitable for the distributed storage and processing
* Hadoop provides a command interface to interact with HDFS
* The built-in servers of namenode and datanode help users to easily check the status of the cluster
* Streaming access to file system data
* HDFS provides file permissions and authentication

**Name Node**

HDFS consists of only one Name Node that is called the Master Node. The master node can track files, manage the file system and has the metadata of all of the stored data within it. In particular, the name node contains the details of the number of blocks, locations of the data node that the data is stored in, where the replications are stored, and other details. The name node has direct contact with the client.

**Data Node**

A Data Node stores data in it as blocks. This is also known as the slave node and it stores the actual data into HDFS which is responsible for the client to read and write. These are slave daemons. Every Data node sends a Heartbeat message to the Name node every 3 seconds and conveys that it is alive. In this way when Name Node does not receive a heartbeat from a data node for 2 minutes, it will take that data node as dead and start the process of block replications on some other Data node.
<figure>
<img src= 'https://cdn.iisc.talentsprint.com/CDS/Images/Datanode.png' width= 500 px/>
<figure/>

#### Hadoop MapReduce

It is the processing unit of Hadoop. In map reduce approach, the processing is done at the slave nodes, and the final result is sent to the master node.
<figure>
<img src="https://cdn.iisc.talentsprint.com/CDS/Images/MapReduce.jpg" />
<figure/>
    
A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then used as input to the reduce tasks. At the reduce phase, the aggregation takes place, and the final output is obtained.

#### Hadoop YARN

Hadoop YARN stands for Yet Another Resource Negotiator and is the resource management unit of Hadoop, available as a component of Hadoop version 2. It is a file system that is built on top of HDFS and responsible for managing cluster resources to prevent overloading. It performs job scheduling to make sure that the jobs are scheduled in the right place

Whenever a client machine wants to do a query or fetch some code this job request goes to the resource manager (Hadoop Yarn), which is responsible for resource allocation and management.

<figure>
<img src='https://cdn.iisc.talentsprint.com/CDS/Images/yarn_architecture.gif' />
<figure/>
    
In the node section, each of the nodes has its node managers. These node managers manage the nodes and monitor the resource usage in the node. The containers contain a collection of physical resources, which could be RAM, CPU, or hard drives. Whenever a job request comes in, the application master requests the container from the node manager. Once the node manager gets the resource, it goes back to the Resource Manager.

### Setup Steps:

In [1]:
#@title Please enter your registration id to start: { run: "auto", display-mode: "form" }
Id = "2236624" #@param {type:"string"}

In [2]:
#@title Please enter your password (your registered phone number) to continue: { run: "auto", display-mode: "form" }
password = "8240187807" #@param {type:"string"}

In [3]:
#@title Run this cell to complete the setup for this Notebook
from IPython import get_ipython

ipython = get_ipython()

notebook= "M5_AST_03_Hadoop_MapReduce_C" #name of the notebook

def setup():
#  ipython.magic("sx pip3 install torch")
    ipython.magic("sx wget https://cdn.iisc.talentsprint.com/CDSE_experiments_data/ecommerce_uci.csv")
    ipython.magic("sx wget http://qwone.com/~jason/20Newsgroups/20news-18828.tar.gz")
    ipython.magic("sx tar -xzvf 20news-18828.tar.gz")
    from IPython.display import HTML, display
    display(HTML('<script src="https://dashboard.talentsprint.com/aiml/record_ip.html?traineeId={0}&recordId={1}"></script>'.format(getId(),submission_id)))
    print("Setup completed successfully")
    return

def submit_notebook():
    ipython.magic("notebook -e "+ notebook + ".ipynb")

    import requests, json, base64, datetime

    url = "https://dashboard.talentsprint.com/xp/app/save_notebook_attempts"
    if not submission_id:
      data = {"id" : getId(), "notebook" : notebook, "mobile" : getPassword()}
      r = requests.post(url, data = data)
      r = json.loads(r.text)

      if r["status"] == "Success":
          return r["record_id"]
      elif "err" in r:
        print(r["err"])
        return None
      else:
        print ("Something is wrong, the notebook will not be submitted for grading")
        return None

    elif getAnswer() and getComplexity() and getAdditional() and getConcepts() and getComments() and getMentorSupport():
      f = open(notebook + ".ipynb", "rb")
      file_hash = base64.b64encode(f.read())

      data = {"complexity" : Complexity, "additional" :Additional,
              "concepts" : Concepts, "record_id" : submission_id,
              "answer" : Answer, "id" : Id, "file_hash" : file_hash,
              "notebook" : notebook,
              "feedback_experiments_input" : Comments,
              "feedback_mentor_support": Mentor_support}
      r = requests.post(url, data = data)
      r = json.loads(r.text)
      if "err" in r:
        print(r["err"])
        return None
      else:
        print("Your submission is successful.")
        print("Ref Id:", submission_id)
        print("Date of submission: ", r["date"])
        print("Time of submission: ", r["time"])
        print("View your submissions: https://cds.iisc.talentsprint.com/notebook_submissions")
        #print("For any queries/discrepancies, please connect with mentors through the chat icon in LMS dashboard.")
        return submission_id
    else: submission_id


def getAdditional():
  try:
    if not Additional:
      raise NameError
    else:
      return Additional
  except NameError:
    print ("Please answer Additional Question")
    return None

def getComplexity():
  try:
    if not Complexity:
      raise NameError
    else:
      return Complexity
  except NameError:
    print ("Please answer Complexity Question")
    return None

def getConcepts():
  try:
    if not Concepts:
      raise NameError
    else:
      return Concepts
  except NameError:
    print ("Please answer Concepts Question")
    return None


# def getWalkthrough():
#   try:
#     if not Walkthrough:
#       raise NameError
#     else:
#       return Walkthrough
#   except NameError:
#     print ("Please answer Walkthrough Question")
#     return None

def getComments():
  try:
    if not Comments:
      raise NameError
    else:
      return Comments
  except NameError:
    print ("Please answer Comments Question")
    return None


def getMentorSupport():
  try:
    if not Mentor_support:
      raise NameError
    else:
      return Mentor_support
  except NameError:
    print ("Please answer Mentor support Question")
    return None

def getAnswer():
  try:
    if not Answer:
      raise NameError
    else:
      return Answer
  except NameError:
    print ("Please answer Question")
    return None


def getId():
  try:
    return Id if Id else None
  except NameError:
    return None

def getPassword():
  try:
    return password if password else None
  except NameError:
    return None

submission_id = None
### Setup
if getPassword() and getId():
  submission_id = submit_notebook()
  if submission_id:
    setup()
else:
  print ("Please complete Id and Password cells before running setup")



Setup completed successfully


**Note: Hadoop installation in this assignment is performed on top of the linux file system available in Google Colab, therefore it is strictly recommended to execute it only on Google Colab and it may not work in any local environment.**

### Installing Hadoop

In [4]:
# Downloading the hadoop zip file
!wget -qq https://downloads.apache.org/hadoop/common/hadoop-3.3.3/hadoop-3.3.3.tar.gz

In [5]:
# Unzipping the hadoop zip file
!tar -xzvf hadoop-3.3.3.tar.gz

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
hadoop-3.3.3/share/doc/hadoop/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/apidocs/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/package-tree.html
hadoop-3.3.3/share/doc/hadoop/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/apidocs/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceMappingManager.html
hadoop-3.3.3/share/doc/hadoop/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/apidocs/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/DeviceResourceHandlerImpl.html
hadoop-3.3.3/share/doc/hadoop/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/apidocs/org/apache/hadoop/yarn/server/nodemanager/containermanager/resourceplugin/deviceframework/ShellWrapper.html
hadoop-3.3.3/share/doc/hadoop/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-no

Since Colaboratory is built on top of Ubuntu. All the Ubuntu files are available in the Colab file section (left panel of Colab notebook).

Copying the Hadoop file to `user/local` directory

In [6]:
# copy  hadoop file to user/local
!cp -r hadoop-3.3.3/ /usr/local/

### Configuring Hadoop’s Java Home

Hadoop requires that you set the path to Java, either as an environment variable or in the Hadoop configuration file.

In [7]:
#To find the default Java path
!readlink -f /usr/bin/java | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


set the java environmental variable using os

In [8]:
import os
os.environ['JAVA_HOME'] = '/usr/lib/jvm/java-11-openjdk-amd64/'

### Running Hadoop

From the `user/local` directory where we copied the hadoop file, run the hadoop command.

In [9]:
#Running Hadoop
!/usr/local/hadoop-3.3.3/bin/hadoop

Usage: hadoop [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
 or    hadoop [OPTIONS] CLASSNAME [CLASSNAME OPTIONS]
  where CLASSNAME is a user-provided Java class

  OPTIONS is none or any of:

buildpaths                       attempt to add class files from build tree
--config dir                     Hadoop config directory
--debug                          turn on shell script debug mode
--help                           usage information
hostnames list[,of,host,names]   hosts to use in slave mode
hosts filename                   list of hosts to use in slave mode
loglevel level                   set the log4j level for this command
workers                          turn on worker mode

  SUBCOMMAND is one of:


    Admin Commands:

daemonlog     get/set the log level for each daemon

    Client Commands:

archive       create a Hadoop archive
checknative   check native Hadoop and compression libraries availability
classpath     prints the class path needed to get the Hadoop jar and the
    

### Perform various Hadoop-HDFS shell commands

#### Check the version of Hadoop



In [10]:
!/usr/local/hadoop-3.3.3/bin/hadoop version

Hadoop 3.3.3
Source code repository https://github.com/apache/hadoop.git -r d37586cbda38c338d9fe481addda5a05fb516f71
Compiled by stevel on 2022-05-09T16:36Z
Compiled with protoc 3.7.1
From source with checksum eb96dd4a797b6989ae0cdb9db6efc6
This command was run using /usr/local/hadoop-3.3.3/share/hadoop/common/hadoop-common-3.3.3.jar


#### List all the files/directories for the given hdfs destination path

In [11]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -ls /

Found 28 items
-rwxr-xr-x   1 root root          0 2023-07-27 16:12 /.dockerenv
-rw-r--r--   1 root root      17294 2023-06-21 00:39 /NGC-DL-CONTAINER-LICENSE
drwxr-xr-x   - root root       4096 2023-07-27 16:12 /bin
drwxr-xr-x   - root root       4096 2022-04-18 10:28 /boot
drwxr-xr-x   - root root       4096 2023-07-27 16:15 /content
-rw-r--r--   1 root root       4332 2023-06-21 00:40 /cuda-keyring_1.0-1_all.deb
drwxr-xr-x   - root root       4096 2023-07-25 13:51 /datalab
drwxr-xr-x   - root root        460 2023-07-27 16:12 /dev
drwxr-xr-x   - root root       4096 2023-07-27 16:12 /etc
drwxr-xr-x   - root root       4096 2022-04-18 10:28 /home
drwxr-xr-x   - root root       4096 2023-07-25 13:26 /lib
drwxr-xr-x   - root root       4096 2023-07-25 13:24 /lib32
drwxr-xr-x   - root root       4096 2023-06-05 14:05 /lib64
drwxr-xr-x   - root root       4096 2023-06-05 14:02 /libx32
drwxr-xr-x   - root root       4096 2023-06-05 14:02 /media
drwxr-xr-x   - root root       4096 2023-06-0

#### Display free space at given hdfs destination

In [12]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -df /

Filesystem         Size         Used    Available  Use%
file:///    83955703808  29519650816  54436052992   35%


#### HDFS Command to create the directory in HDFS

In [13]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -mkdir /hadoop

#### HDFS command to remove the entire directory and all of its content from HDFS

In [14]:
!/usr/local/hadoop-3.3.3/bin/hdfs dfs -rm -r /hadoop

2023-07-27 16:17:05,172 INFO Configuration.deprecation: io.bytes.per.checksum is deprecated. Instead, use dfs.bytes-per-checksum
Deleted /hadoop


### Perform MapReduce operation

We will use Ecommerce sales dataset from the UCI Machine Learning Repository  containing real-life transaction data from a UK retailer.

Here we will perform MapReduce operation to calculate total sales for each country.

In [15]:
import pandas as pd
df = pd.read_csv('ecommerce_uci.csv')
df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Total_Sales,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,12-01-2010 8.26,2.55,17850,15.3,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,12-01-2010 8.26,3.39,17850,20.34,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,12-01-2010 8.26,2.75,17850,22.0,United Kingdom
3,536370,21913,VINTAGE SEASIDE JIGSAW PUZZLES,12,12-01-2010 8.45,3.75,12583,45.0,France
4,536370,22540,MINI JIGSAW CIRCUS PARADE,24,12-01-2010 8.45,0.42,12583,10.08,France


### Hadoop Streaming

Here we will use HadoopStreaming for helping us pass data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output). We will simply use python’s sys.stdin to read input data and print output to sys.stdout. That’s all we need to do and HadoopStreaming will take care of everything else.

In [16]:
!find / -name 'hadoop-streaming*.jar'

/usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar
/usr/local/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-sources.jar
/usr/local/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-test-sources.jar
find: ‘/proc/53/task/53/net’: Invalid argument
find: ‘/proc/53/net’: Invalid argument
/content/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar
/content/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-sources.jar
/content/hadoop-3.3.3/share/hadoop/tools/sources/hadoop-streaming-3.3.3-test-sources.jar


### Write the mapper and reducer files

Mapper will

* read the data
* convert it into a proper format
* print output as key-value pair i.e, Country Name, Sales.

In [17]:
%%writefile mapper.py
import sys
line_number = 1
# csv is file is passed as command line input
for line in sys.stdin:
    line = line.strip()           # remove trailing spaces
    InvoiceNo, StockCode, Description, Quantity, InvoiceDate, UnitPrice, CustomerID, Total_Sales, Country = line.split(',')
    #print(InvoiceNo)
    if line_number>=2:
        print('{0}\t{1}'.format(Country.replace(' ','_'),Total_Sales))
    line_number += 1


Writing mapper.py


Reducer will

* read input from the mapper
* check for existing country key in the dictionary
* add the total to an existing value
* print all key-value pairs

In [18]:
%%writefile reducer.py
from operator import itemgetter
import sys
word2count = {}
# csv is file is passed as command line input
for line in sys.stdin:
    line = line.strip()
    try:
        country_name, sales = line.split('\t')
        sales = float(sales)
        # get() method takes maximum of two parameters: Value(0) to be returned if the key is not found
        word2count[country_name] = word2count.get(country_name, 0) + sales
    except ValueError:
        pass
# itemgetter(1) will specify that we need to sort based on dictionary values
sorted_word2count = sorted(word2count.items(), key=itemgetter(1), reverse=True)

for word, count in sorted_word2count:
    print('{0}\t{1}'.format(word, count))

Writing reducer.py


In [19]:
# Restoring the access permissions for user including read, write and execute
!chmod u+rwx /content/mapper.py
!chmod u+rwx /content/reducer.py

#### Running the Python Code on Hadoop cluster

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As mentioned earlier, we use Hadoop streaming for passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

In [20]:
!/usr/local/hadoop-3.3.3/bin/hadoop jar /usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar -input /content/ecommerce_uci.csv -output /content/output1 -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-07-27 16:18:19,092 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob7024708467496272536.jar tmpDir=null
2023-07-27 16:18:19,783 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-27 16:18:20,007 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-27 16:18:20,007 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-27 16:18:20,028 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-07-27 16:18:20,210 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-27 16:18:20,237 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-27 16:18:20,588 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1869172079_0001
2023-07-27 16:18:20,588 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-27 16:18:20,918 INFO mapred.Loc

In [21]:
# Locating output directory
!ls /content/output1

part-00000  _SUCCESS


In [22]:
# Display output
!cat /content/output1/part-00000

United_Kingdom	119.64
Australia	77.0
France	55.08


Now we will conssider 20-newsgroups dataset. This dataset is a collection of newsgroup documents. There are 20 files that contain all of the documents, one document per newsgroup. In this dataset, duplicate messages have been removed and the original messages only contain "From" and "Subject" headers (18828 messages total).

Each newsgroup file in the bundle represents a single newsgroup. Each message in a file is the text of some newsgroup document that was posted to that newsgroup.

Some of the newsgroups are as follows:

* comp.graphics
* comp.os.ms-windows.misc
* comp.sys.ibm.pc.hardware
* rec.motorcycles
* rec.sport.baseball
* rec.sport.hockey sci.crypt
* sci.electronics
* sci.med
* misc.forsale talk.politics.misc
* talk.politics.guns
* talk.politics.mideast talk.religion.misc
* alt.atheism
* soc.religion.christian

Here we will be using `alt.atheism` newsgroup and perform MapReduce operation to calculate the count of words in it.

To know more about the dataset click [here](http://qwone.com/~jason/20Newsgroups/).

In [23]:
!cat /content/20news-18828/alt.atheism/49960

From: mathew <mathew@mantis.co.uk>
Subject: Alt.Atheism FAQ: Atheist Resources

Archive-name: atheism/resources
Alt-atheism-archive-name: resources
Last-modified: 11 December 1992
Version: 1.0

                              Atheist Resources

                      Addresses of Atheist Organizations

                                     USA

FREEDOM FROM RELIGION FOUNDATION

Darwin fish bumper stickers and assorted other atheist paraphernalia are
available from the Freedom From Religion Foundation in the US.

Write to:  FFRF, P.O. Box 750, Madison, WI 53701.
Telephone: (608) 256-8900

EVOLUTION DESIGNS

Evolution Designs sell the "Darwin fish".  It's a fish symbol, like the ones
Christians stick on their cars, but with feet and the word "Darwin" written
inside.  The deluxe moulded 3D plastic fish is $4.95 postpaid in the US.

Write to:  Evolution Designs, 7119 Laurel Canyon #4, North Hollywood,
           CA 91605.

People in the San Francisco Bay area can get Darwin Fish from Lynn Gold

#### Write the Mapper file

In [24]:
%%writefile mapper_news.py
import sys
import io
import re
import nltk
nltk.download('stopwords',quiet=True)
from nltk.corpus import stopwords
import string
# list possible punctuations
punctuations = string.punctuation

# configure english stopwords
stop_words = set(stopwords.words('english'))
# convert text to lines of string
input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding='latin1')
for line in input_stream:
  line = line.strip()                      # remove trailing spaces
  line = re.sub(r'[^\w\s]', '',line)       # replace apostrophe with empty space
  line = line.lower()                      # convert line to lowercase
  for x in line:
    if x in punctuations:
      line=line.replace(x, " ")           # replace punctuations with space

  words=line.split()                      # split line into words
  for word in words:
    if word not in stop_words:
      print('%s\t%s' % (word, 1))

Writing mapper_news.py


#### Write the Reducer file

In [25]:
%%writefile reducer_news.py
from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    line=line.lower()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    try:
      count = int(count)
    except ValueError:
      #count was not a number, so silently
      #ignore/discard this line
      continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print( '%s\t%s' % (current_word, current_count))

Writing reducer_news.py


In [26]:
# Restoring the access permissions for user including read, write and execute
!chmod u+rwx /content/mapper_news.py
!chmod u+rwx /content/reducer_news.py

#### Running the Python Code on Hadoop cluster

Now that everything is prepared, we can finally run our Python MapReduce job on the Hadoop cluster. As mentioned earlier, we use Hadoop streaming for passing data between our Map and Reduce code via STDIN (standard input) and STDOUT (standard output).

In [27]:
!/usr/local/hadoop-3.3.3/bin/hadoop jar /usr/local/hadoop-3.3.3/share/hadoop/tools/lib/hadoop-streaming-3.3.3.jar -input /content/20news-18828/alt.atheism/49960 -output /content/output_news -file /content/mapper_news.py  -file /content/reducer_news.py  -mapper 'python mapper_news.py'  -reducer 'python reducer_news.py'

2023-07-27 16:18:55,540 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper_news.py, /content/reducer_news.py] [] /tmp/streamjob18261274974686418129.jar tmpDir=null
2023-07-27 16:18:56,111 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-27 16:18:56,295 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-27 16:18:56,295 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-27 16:18:56,313 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-07-27 16:18:56,464 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-27 16:18:56,487 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-27 16:18:56,738 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1296418146_0001
2023-07-27 16:18:56,738 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-27 16:18:57,179 INFO

In [28]:
# Locating output directory
!ls /content/output_news

part-00000  _SUCCESS


In [29]:
# Display output
!cat /content/output_news/part-00000

034529887x	1
0511211216	1
071	5
080182494x	1
0801834074	1
0877226423	1
0877227675	1
0908	1
0910309264	1
1	1
10	1
11	1
1266	1
1271	1
14	1
140195	1
14215	1
14226	1
142282197	1
17701900	1
1881	1
1977	1
1981	1
1986	1
1988	1
1989	1
1990	1
1992	1
20th	1
226	1
24hour	1
2568900	1
272	1
273	1
2nd	1
3005	1
316	1
372	1
3d	1
3nl	1
4	1
41	2
430	2
4581244	1
4679525	1
490	1
495	2
4rh	1
4rl	1
512	2
53701	1
541	1
59	1
608	1
664	1
700	1
702	1
7119	1
716	1
7215	1
7251	1
750	1
7723	1
787140195	1
787522973	1
831	1
8372475	1
88	1
880	2
8964079	1
8ew	1
91605	1
aah	1
aap	2
abortions	1
absurdities	1
accompanied	1
accounts	1
address	1
addresses	2
adulteries	1
aesthetics	1
african	3
africanamericans	1
agnostic	1
al	1
alien	1
allen	2
also	3
altatheism	1
altatheismarchivename	1
altatheismmoderated	1
alternate	1
alternative	1
although	2
america	1
american	5
americans	2
amherst	1
amongst	1
amusing	1
ancient	1
andor	1
another	1
anselm	1
anthology	3
anyone	1
appendix	2
approachable	1
archive	1
archivename	1
archives	1

### Please answer the questions below to complete the experiment:




In [30]:
# @title Which of the following takes a set of data and converts it into another set of data, where individual elements are broken down into tuples? { run: "auto", form-width: "500px", display-mode: "form" }
Answer = "Map" #@param ["","Reduce","Node","Map"]

In [31]:
#@title How was the experiment? { run: "auto", form-width: "500px", display-mode: "form" }
Complexity = "Good, But Not Challenging for me" #@param ["","Too Simple, I am wasting time", "Good, But Not Challenging for me", "Good and Challenging for me", "Was Tough, but I did it", "Too Difficult for me"]


In [36]:
#@title If it was too easy, what more would you have liked to be added? If it was very difficult, what would you have liked to have been removed? { run: "auto", display-mode: "form" }
Additional = "Perfect for practice" #@param {type:"string"}


In [37]:
#@title Can you identify the concepts from the lecture which this experiment covered? { run: "auto", vertical-output: true, display-mode: "form" }
Concepts = "Yes" #@param ["","Yes", "No"]


In [38]:
#@title  Text and image description/explanation and code comments within the experiment: { run: "auto", vertical-output: true, display-mode: "form" }
Comments = "Very Useful" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [39]:
#@title Mentor Support: { run: "auto", vertical-output: true, display-mode: "form" }
Mentor_support = "Very Useful" #@param ["","Very Useful", "Somewhat Useful", "Not Useful", "Didn't use"]


In [40]:
#@title Run this cell to submit your notebook for grading { vertical-output: true }
try:
  if submission_id:
      return_id = submit_notebook()
      if return_id : submission_id = return_id
  else:
      print("Please complete the setup first.")
except NameError:
  print ("Please complete the setup first.")

Your submission is successful.
Ref Id: 6053
Date of submission:  27 Jul 2023
Time of submission:  21:52:12
View your submissions: https://cds.iisc.talentsprint.com/notebook_submissions
