<a href="https://colab.research.google.com/github/vietkhangvn/vietkhangvn/blob/main/4_2_Introduction_to_Hadoop.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Introduction to Hadoop


## The building blocks of Hadoop

- NameNode
- DataNode
- Secondary NameNode
- JobTracker
- TaskTracker

## NameNode

- The NameNode is the master of HDFS that directs the worker DataNode daemons to perform the low-level I/O tasks.

- The NameNode keeps track of how your files are broken down into file blocks, which nodes store those blocks, and the overall health of the distributed file system.

- The NameNode is a single point of failure of your Hadoop cluster

## Secondary NameNode

- An assistant daemon for monitoring the state of the cluster HDFS.

- Each cluster has one Secondary NameNode.

- The secondary NameNode snapshots help minimize the downtime and loss of data due to the failure of NameNode

## DataNode

- Each worker machine in your cluster will host a DataNode daemon to perform the grunt work of the distributed file system -- reading and writing HDFS blocks to actual files on the local file system.

- DataNodes are constantly reporting to the NameNode.

- Each of the DataNodes informs the NameNode of the blocks it’s currently storing. After this mapping is complete, the DataNodes continually poll the NameNode to provide information regarding local changes as well as receive instructions to create, move, or delete blocks from the local disk.

## JobTracker

- There is only one JobTracker daemon per Hadoop cluster. It’s typically run on a server as a master node of the cluster.

- The JobTracker determines the execution plan by determining which fi les to process, assigns nodes to different tasks, and monitors all tasks as they’re running. Should a task fail, the JobTracker will automatically relaunch the task, possibly on a different node, up to a predefi ned limit of retries.

## TaskTracker


- Each TaskTracker is responsible for executing the individual tasks that the JobTracker assigns.

- If the JobTracker fails to receive a heartbeat from a TaskTracker within a specified amount of time, it will assume the TaskTracker has crashed and will resubmit the corresponding tasks to other nodes in the cluster.

## Setting up Hadoop

- Basic requirements

    - Linux server
    - SSH
    - Setup environment variables: `JAVA_HOME` , `HADOOP_HOME`

- Cluster setup ref: https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/ClusterSetup.html

- Core Hadoop configuration files

    - `core-site.xml`
    - `mapred-site.xml`
    - `hdfs-site.xml`

But, our scope here is how to use it in `colab`

#### First, we have to clone hadoop to our working session

In [None]:
!wget https://downloads.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz

--2023-07-01 07:59:11--  https://downloads.apache.org/hadoop/common/hadoop-3.3.6/hadoop-3.3.6.tar.gz
Resolving downloads.apache.org (downloads.apache.org)... 135.181.214.104, 88.99.95.219, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|135.181.214.104|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 730107476 (696M) [application/x-gzip]
Saving to: ‘hadoop-3.3.6.tar.gz’


2023-07-01 07:59:39 (24.7 MB/s) - ‘hadoop-3.3.6.tar.gz’ saved [730107476/730107476]



In [None]:
!ls

hadoop-3.3.6.tar.gz  sample_data


In [None]:
!tar -xzvf hadoop-3.3.6.tar.gz

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
hadoop-3.3.6/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs-client/build/source/hadoop-hdfs-project/hadoop-hdfs-client/target/api/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.html
hadoop-3.3.6/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs-client/build/source/hadoop-hdfs-project/hadoop-hdfs-client/target/api/org/apache/hadoop/hdfs/web/resources/DeleteOpParam.Op.html
hadoop-3.3.6/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs-client/build/source/hadoop-hdfs-project/hadoop-hdfs-client/target/api/org/apache/hadoop/hdfs/web/resources/PutOpParam.html
hadoop-3.3.6/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs-client/build/source/hadoop-hdfs-project/hadoop-hdfs-client/target/api/org/apache/hadoop/hdfs/web/resources/RenameOptionSetParam.html
hadoop-3.3.6/share/doc/hadoop/hadoop-project-dist/hadoop-hdfs-client/build/source/hadoop-hdfs-project/hadoop-hdfs-client/target/api/org/apache/hadoop/hdfs/web/resources/XAttr

In [None]:
!ls

hadoop-3.3.6  hadoop-3.3.6.tar.gz  sample_data


#### Find feasible Java path

In [None]:
# To find the default Java path
!readlink -f /usr/bin/java | sed "s:bin/java::"

/usr/lib/jvm/java-11-openjdk-amd64/


#### Setup Java path and Hadoop Home

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["HADOOP_HOME"] = "/content/hadoop-3.3.6"

In [None]:
!echo $JAVA_HOME & echo $HADOOP_HOME

/content/hadoop-3.3.6
/usr/lib/jvm/java-11-openjdk-amd64


In [None]:
!cat ./hadoop-3.3.6/etc/hadoop/hadoop-env.sh

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Set Hadoop-specific environment variables here.

##
## THIS FILE ACTS AS THE MASTER FILE FOR ALL HADOOP PROJECTS.
## SETTINGS HERE WILL BE READ BY ALL HADOOP COMMANDS.  THEREFORE,
## ONE CAN USE THIS FILE TO SET

In [None]:
# Running Hadoop
!./hadoop-3.3.6/bin/hadoop

Usage: hadoop [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
 or    hadoop [OPTIONS] CLASSNAME [CLASSNAME OPTIONS]
  where CLASSNAME is a user-provided Java class

  OPTIONS is none or any of:

buildpaths                       attempt to add class files from build tree
--config dir                     Hadoop config directory
--debug                          turn on shell script debug mode
--help                           usage information
hostnames list[,of,host,names]   hosts to use in worker mode
hosts filename                   list of hosts to use in worker mode
loglevel level                   set the log4j level for this command
workers                          turn on worker mode

  SUBCOMMAND is one of:


    Admin Commands:

daemonlog     get/set the log level for each daemon

    Client Commands:

archive       create a Hadoop archive
checknative   check native Hadoop and compression libraries availability
classpath     prints the class path needed to get the Hadoop jar and the
  

### Download data for later examples

In [None]:
from google.colab import files
uploaded = files.upload()

Saving bill_authentication.csv to bill_authentication.csv
Saving loan_data.csv to loan_data.csv
Saving mapper_stock.py to mapper_stock.py
Saving mapper.py to mapper.py
Saving reducer_stock.py to reducer_stock.py
Saving reducer.py to reducer.py
Saving shakespeare.txt to shakespeare.txt
Saving stocks.txt to stocks.txt


In [None]:
!wget http://qwone.com/~jason/20Newsgroups/20news-18828.tar.gz
!tar -xzvf 20news-18828.tar.gz

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
20news-18828/sci.space/61079
20news-18828/sci.space/60809
20news-18828/sci.space/61497
20news-18828/sci.space/60156
20news-18828/sci.space/61475
20news-18828/sci.space/59873
20news-18828/sci.space/61541
20news-18828/sci.space/61489
20news-18828/sci.space/61133
20news-18828/sci.space/62418
20news-18828/sci.space/61289
20news-18828/sci.space/60864
20news-18828/sci.space/61443
20news-18828/sci.space/60189
20news-18828/sci.space/60251
20news-18828/sci.space/61356
20news-18828/sci.space/61255
20news-18828/sci.space/61345
20news-18828/sci.space/61382
20news-18828/sci.space/60847
20news-18828/sci.space/62481
20news-18828/sci.space/60783
20news-18828/sci.space/60854
20news-18828/sci.space/62131
20news-18828/sci.space/60921
20news-18828/sci.space/62317
20news-18828/sci.space/61372
20news-18828/sci.space/61406
20news-18828/sci.space/60247
20news-18828/sci.space/62114
20news-18828/sci.space/60791
20news-18828/sci.space/61362
20news-

In [None]:
!ls

20news-18828		 hadoop-3.3.6.tar.gz  reducer.py	stocks.txt
20news-18828.tar.gz	 loan_data.csv	      reducer_stock.py
bill_authentication.csv  mapper.py	      sample_data
hadoop-3.3.6		 mapper_stock.py      shakespeare.txt


## Work with Hadoop File System

In [None]:
%cd ./hadoop-3.3.6/bin
!./hadoop

/content/hadoop-3.3.6/bin
Usage: hadoop [OPTIONS] SUBCOMMAND [SUBCOMMAND OPTIONS]
 or    hadoop [OPTIONS] CLASSNAME [CLASSNAME OPTIONS]
  where CLASSNAME is a user-provided Java class

  OPTIONS is none or any of:

buildpaths                       attempt to add class files from build tree
--config dir                     Hadoop config directory
--debug                          turn on shell script debug mode
--help                           usage information
hostnames list[,of,host,names]   hosts to use in worker mode
hosts filename                   list of hosts to use in worker mode
loglevel level                   set the log4j level for this command
workers                          turn on worker mode

  SUBCOMMAND is one of:


    Admin Commands:

daemonlog     get/set the log level for each daemon

    Client Commands:

archive       create a Hadoop archive
checknative   check native Hadoop and compression libraries availability
classpath     prints the class path needed to get

In [None]:
!./hadoop version

Hadoop 3.3.6
Source code repository https://github.com/apache/hadoop.git -r 1be78238728da9266a4f88195058f08fd012bf9c
Compiled by ubuntu on 2023-06-18T08:22Z
Compiled on platform linux-x86_64
Compiled with protoc 3.7.1
From source with checksum 5652179ad55f76cb287d9c633bb53bbd
This command was run using /content/hadoop-3.3.6/share/hadoop/common/hadoop-common-3.3.6.jar


In [None]:
!echo $JAVA_HOME

/usr/lib/jvm/java-11-openjdk-amd64


In [None]:
!echo $HADOOP_HOME

/content/hadoop-3.3.6


In [None]:
!./hadoop fs

Usage: hadoop fs [generic options]
	[-appendToFile [-n] <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum [-v] <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-concat <target path> <src path> <src path> ...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] [-t <thread count>] [-q <thread pool queue size>] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate] [-fs <path>]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-crc] [-ignoreCrc] [-t <thread c

In [None]:
!./hadoop fs -ls /

Found 27 items
-rwxr-xr-x   1 root root          0 2023-07-01 07:57 /.dockerenv
-rw-r--r--   1 root root      17294 2023-06-21 00:39 /NGC-DL-CONTAINER-LICENSE
drwxr-xr-x   - root root       4096 2023-06-29 13:28 /bin
drwxr-xr-x   - root root       4096 2020-04-15 11:09 /boot
drwxr-xr-x   - root root       4096 2023-07-01 08:53 /content
drwxr-xr-x   - root root       4096 2023-06-29 13:45 /datalab
drwxr-xr-x   - root root        360 2023-07-01 07:57 /dev
drwxr-xr-x   - root root       4096 2023-07-01 07:57 /etc
drwxr-xr-x   - root root       4096 2020-04-15 11:09 /home
drwxr-xr-x   - root root       4096 2023-06-29 13:24 /lib
drwxr-xr-x   - root root       4096 2023-06-29 13:20 /lib32
drwxr-xr-x   - root root       4096 2023-06-05 14:05 /lib64
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /libx32
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /media
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /mnt
drwxr-xr-x   - root root       4096 2023-06-29 13:46 /opt
dr-xr-xr-x

In [None]:
!./hadoop fs -ls /usr

Found 14 items
drwxr-xr-x   - root root       4096 2023-06-29 13:28 /usr/bin
drwxr-xr-x   - root root       4096 2023-06-29 13:46 /usr/colab
drwxr-xr-x   - root root       4096 2020-04-15 11:09 /usr/games
drwxr-xr-x   - root root       4096 2023-06-29 13:24 /usr/grte
drwxr-xr-x   - root root       4096 2023-06-29 13:24 /usr/include
drwxr-xr-x   - root root       4096 2023-06-29 13:24 /usr/lib
drwxr-xr-x   - root root       4096 2023-06-29 13:20 /usr/lib32
drwxr-xr-x   - root root       4096 2023-06-05 14:05 /usr/lib64
drwxr-xr-x   - root root       4096 2023-06-29 13:18 /usr/libexec
drwxr-xr-x   - root root       4096 2023-06-05 14:03 /usr/libx32
drwxr-xr-x   - root root       4096 2023-06-29 13:46 /usr/local
drwxr-xr-x   - root root       4096 2023-07-01 07:57 /usr/sbin
drwxr-xr-x   - root root       4096 2023-06-29 13:27 /usr/share
drwxr-xr-x   - root root       4096 2023-06-29 13:23 /usr/src


In [None]:
!pwd

/content/hadoop-3.3.6/bin


In [None]:
!mkdir ~/input
!cp /content/hadoop-3.3.6/etc/hadoop/*.xml ~/input

In [None]:
!ls ~/input

capacity-scheduler.xml	hdfs-rbf-site.xml  kms-acls.xml     yarn-site.xml
core-site.xml		hdfs-site.xml	   kms-site.xml
hadoop-policy.xml	httpfs-site.xml    mapred-site.xml


In [None]:
!./hadoop jar /content/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar grep ~/input ~/grep_example 'allowed[.]*'

2023-07-01 08:55:18,711 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 08:55:19,188 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 08:55:19,188 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 08:55:19,807 INFO input.FileInputFormat: Total input files to process : 10
2023-07-01 08:55:19,860 INFO mapreduce.JobSubmitter: number of splits:10
2023-07-01 08:55:20,484 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local48655754_0001
2023-07-01 08:55:20,484 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 08:55:20,852 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-07-01 08:55:20,853 INFO mapreduce.Job: Running job: job_local48655754_0001
2023-07-01 08:55:20,863 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-07-01 08:55:20,874 INFO output.PathOutputCommitterFactory: No output committer factory defined, defaultin

In [None]:
!ls ~

grep_example  input


In [None]:
!cat ~/grep_example/*

22	allowed.
1	allowed


### One more counting example

In [None]:
from google.colab import files
uploaded = files.upload()

Saving alice.txt to alice.txt


In [None]:
!ls ../../

20news-18828		 hadoop-3.3.6.tar.gz  reducer.py	stocks.txt
20news-18828.tar.gz	 loan_data.csv	      reducer_stock.py
bill_authentication.csv  mapper.py	      sample_data
hadoop-3.3.6		 mapper_stock.py      shakespeare.txt


In [None]:
%cd ../../

/content


In [None]:
!ls

20news-18828		 hadoop-3.3.6.tar.gz  reducer.py	stocks.txt
20news-18828.tar.gz	 loan_data.csv	      reducer_stock.py
bill_authentication.csv  mapper.py	      sample_data
hadoop-3.3.6		 mapper_stock.py      shakespeare.txt


In [None]:
# Copy file to HDFS.
!./hadoop-3.3.6/bin/hadoop fs -copyFromLocal alice.txt

copyFromLocal: `alice.txt': File exists


In [None]:
!ls /content/hadoop-3.3.6/share/hadoop/mapreduce

hadoop-mapreduce-client-app-3.3.6.jar
hadoop-mapreduce-client-common-3.3.6.jar
hadoop-mapreduce-client-core-3.3.6.jar
hadoop-mapreduce-client-hs-3.3.6.jar
hadoop-mapreduce-client-hs-plugins-3.3.6.jar
hadoop-mapreduce-client-jobclient-3.3.6.jar
hadoop-mapreduce-client-jobclient-3.3.6-tests.jar
hadoop-mapreduce-client-nativetask-3.3.6.jar
hadoop-mapreduce-client-shuffle-3.3.6.jar
hadoop-mapreduce-client-uploader-3.3.6.jar
hadoop-mapreduce-examples-3.3.6.jar
jdiff
lib-examples
sources


In [None]:
# Run WordCount for alice.txt with "wordcount". As WordCount executes, the
# Hadoop prints the progress in terms of Map and Reduce. When the `WordCount` is
# complete, both will say 100%.
!./hadoop-3.3.6/bin/hadoop jar /content/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar wordcount alice.txt count

2023-07-01 08:58:44,378 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 08:58:44,572 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 08:58:44,572 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 08:58:44,917 INFO input.FileInputFormat: Total input files to process : 1
2023-07-01 08:58:45,000 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-01 08:58:45,356 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local528499928_0001
2023-07-01 08:58:45,356 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 08:58:45,686 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-07-01 08:58:45,687 INFO mapreduce.Job: Running job: job_local528499928_0001
2023-07-01 08:58:45,701 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-07-01 08:58:45,713 INFO output.PathOutputCommitterFactory: No output committer factory defined, defaultin

In [None]:
!ls

20news-18828		 count		      mapper.py		sample_data
20news-18828.tar.gz	 hadoop-3.3.6	      mapper_stock.py	shakespeare.txt
alice.txt		 hadoop-3.3.6.tar.gz  reducer.py	stocks.txt
bill_authentication.csv  loan_data.csv	      reducer_stock.py


In [None]:
# Copy WordCount results to local file system. Here, the file part-r-00000
# contains the results from WordCount.
!./hadoop-3.3.6/bin/hadoop fs –copyToLocal count/part-r-00000 count.txt

–copyToLocal: Unknown command
Usage: hadoop fs [generic options]
	[-appendToFile [-n] <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum [-v] <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-concat <target path> <src path> <src path> ...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] [-t <thread count>] [-q <thread pool queue size>] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate] [-fs <path>]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-c

In [None]:
# View the WordCount results.
!more count/part-r-00000 count.txt

::::::::::::::
count/part-r-00000
::::::::::::::
"'TIS	1
"--SAID	1
"Come	1
"Coming	1
"Defects,"	1
"Edwin	1
"French,	1
"HOW	1
"He's	1
"How	1
"I	7
"I'll	2
"Information	1
"Keep	1
"Let	1
"Plain	2
"Project	5
"Right	1
"Such	1
"THEY	1
[K"There	2
[K
...skipping 1 line
"Too	1
"Turtle	1
"Twinkle,	1
"Uglification,"'	1
"Up	1
"What	2
"Who	1
"William	1
"With	1
"YOU	1
"You	2
"come	1
"it"	2
"much	1
"poison"	1
"purpose"?'	1
#11]	1
$5,000)	1
'"--found	1
'"Miss	1
'"WE	1
'"What	1
'"Will	1
[7m--More--(0%)[m
Most commands optionally preceded by integer argument k.  Defaults in brackets.
Star (*) indicates argument becomes new default.
-------------------------------------------------------------------------------
<space>                 Display next k lines of text [current screen size]
z                       Display next k lines of text [current screen size]*
<return>                Display next k lines of text [1]*
d or ctrl-D             Scroll k lines [current scroll size, initially 11]*
q or Q o

### More example

In [None]:
from google.colab import files
uploaded = files.upload()

Saving shakespeare.txt to shakespeare (1).txt


In [None]:
!./hadoop-3.3.6/bin/hadoop fs -copyFromLocal shakespeare.txt

!./hadoop-3.3.6/bin/hadoop jar /content/hadoop-3.3.6/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.3.6.jar wordmedian shakespeare.txt wordmedian

copyFromLocal: `shakespeare.txt': File exists
2023-07-01 09:01:39,670 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 09:01:39,842 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 09:01:39,842 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 09:01:40,115 INFO input.FileInputFormat: Total input files to process : 1
2023-07-01 09:01:40,162 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-01 09:01:40,534 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local453669557_0001
2023-07-01 09:01:40,534 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 09:01:40,853 INFO mapreduce.Job: The url to track the job: http://localhost:8080/
2023-07-01 09:01:40,855 INFO mapreduce.Job: Running job: job_local453669557_0001
2023-07-01 09:01:40,867 INFO mapred.LocalJobRunner: OutputCommitter set in config null
2023-07-01 09:01:40,878 INFO output.PathOutputCommitterFactory: 

In [None]:
!./hadoop-3.3.6/bin/hadoop fs –copyToLocal wordmedian/part-r-00000 wordmedian.txt

–copyToLocal: Unknown command
Usage: hadoop fs [generic options]
	[-appendToFile [-n] <localsrc> ... <dst>]
	[-cat [-ignoreCrc] <src> ...]
	[-checksum [-v] <src> ...]
	[-chgrp [-R] GROUP PATH...]
	[-chmod [-R] <MODE[,MODE]... | OCTALMODE> PATH...]
	[-chown [-R] [OWNER][:[GROUP]] PATH...]
	[-concat <target path> <src path> <src path> ...]
	[-copyFromLocal [-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>] <localsrc> ... <dst>]
	[-copyToLocal [-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>] [-q <thread pool queue size>] <src> ... <localdst>]
	[-count [-q] [-h] [-v] [-t [<storage type>]] [-u] [-x] [-e] [-s] <path> ...]
	[-cp [-f] [-p | -p[topax]] [-d] [-t <thread count>] [-q <thread pool queue size>] <src> ... <dst>]
	[-createSnapshot <snapshotDir> [<snapshotName>]]
	[-deleteSnapshot <snapshotDir> <snapshotName>]
	[-df [-h] [<path> ...]]
	[-du [-s] [-h] [-v] [-x] <path> ...]
	[-expunge [-immediate] [-fs <path>]]
	[-find <path> ... <expression> ...]
	[-get [-f] [-p] [-c

In [None]:
!cat wordmedian/part-r-00000

1	547
2	2871
3	3215
4	4012
5	2715
6	1744
7	1075
8	692
9	394
10	190
11	70
12	31
13	15
14	13
15	2
16	1
17	1
18	1


## Our own Hadoop MapReduce program - Words count

In [None]:
!find / -name 'hadoop-streaming*.jar'

find: ‘/proc/59/task/59/net’: Invalid argument
find: ‘/proc/59/net’: Invalid argument
/content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar
/content/hadoop-3.3.6/share/hadoop/tools/sources/hadoop-streaming-3.3.6-sources.jar
/content/hadoop-3.3.6/share/hadoop/tools/sources/hadoop-streaming-3.3.6-test-sources.jar


In [None]:
# print out the content of `mapper.py`
!cat ./mapper.py

# -*- coding: utf-8 -*-
"""mapper.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1yCwGyMXJT2qt3_58aLOOiJXO0GIaPcJd
"""



import sys
import io
import re
import nltk
nltk.download('stopwords',quiet=True)
from nltk.corpus import stopwords
punctuations = '''!()-[]{};:'"\,<>./?@#$%^&*_~'''

stop_words = set(stopwords.words('english'))
input_stream = io.TextIOWrapper(sys.stdin.buffer, encoding='latin1')
for line in input_stream:
  line = line.strip()
  line = re.sub(r'[^\w\s]', '',line)
  line = line.lower()
  for x in line:
    if x in punctuations:
      line=line.replace(x, " ") 

  words=line.split()
  for word in words: 
    if word not in stop_words:
      print('%s\t%s' % (word, 1))

In [None]:
# print out the content of `reducer.py`
!cat ./reducer.py

# -*- coding: utf-8 -*-
"""reducer.ipynb

Automatically generated by Colaboratory.

Original file is located at
    https://colab.research.google.com/drive/1YzJ-vUsO5VYCyMrfPMow3s2IdxXkyQ0i
"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    line=line.lower()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)
    try:
      count = int(count)
    except ValueError:
      #count was not a number, so silently
      #ignore/discard this line
      continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print ('%s\t%s' % (current_word, current_count))
        current_count 

In [None]:
!chmod u+rwx /content/mapper.py
!chmod u+rwx /content/reducer.py

In [None]:
!./hadoop-3.3.6/bin/hadoop jar /content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar -input /content/20news-18828/alt.atheism/49960 -output /content/output -file /content/mapper.py  -file /content/reducer.py  -mapper 'python mapper.py'  -reducer 'python reducer.py'

2023-07-01 09:06:08,253 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper.py, /content/reducer.py] [] /tmp/streamjob9318214195873376200.jar tmpDir=null
2023-07-01 09:06:09,144 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 09:06:09,402 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 09:06:09,402 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 09:06:09,433 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-07-01 09:06:09,672 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-01 09:06:09,712 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-01 09:06:10,061 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1125106200_0001
2023-07-01 09:06:10,061 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 09:06:10,736 INFO mapred.Loc

In [None]:
!ls ./output

part-00000  _SUCCESS


In [None]:
!cat ./output/part-00000

034529887x	1
0511211216	1
071	5
080182494x	1
0801834074	1
0877226423	1
0877227675	1
0908	1
0910309264	1
1	1
10	1
11	1
1266	1
1271	1
14	1
140195	1
14215	1
14226	1
142282197	1
17701900	1
1881	1
1977	1
1981	1
1986	1
1988	1
1989	1
1990	1
1992	1
20th	1
226	1
24hour	1
2568900	1
272	1
273	1
2nd	1
3005	1
316	1
372	1
3d	1
3nl	1
4	1
41	2
430	2
4581244	1
4679525	1
490	1
495	2
4rh	1
4rl	1
512	2
53701	1
541	1
59	1
608	1
664	1
700	1
702	1
7119	1
716	1
7215	1
7251	1
750	1
7723	1
787140195	1
787522973	1
831	1
8372475	1
88	1
880	2
8964079	1
8ew	1
91605	1
aah	1
aap	2
abortions	1
absurdities	1
accompanied	1
accounts	1
address	1
addresses	2
adulteries	1
aesthetics	1
african	3
africanamericans	1
agnostic	1
al	1
alien	1
allen	2
also	3
altatheism	1
altatheismarchivename	1
altatheismmoderated	1
alternate	1
alternative	1
although	2
america	1
american	5
americans	2
amherst	1
amongst	1
amusing	1
ancient	1
andor	1
another	1
anselm	1
anthology	3
anyone	1
appendix	2
approachable	1
archive	1
archivename	1
archives	1

### Next example

In [None]:
!pwd

/content


In [None]:
!ls ../../

20news-18828		 hadoop-3.3.0	      mapper_stock.py	sample_data
20news-18828.tar.gz	 hadoop-3.3.0.tar.gz  output		shakespeare.txt
alice.txt		 loan_data.csv	      reducer.py	stocks.txt
bill_authentication.csv  mapper.py	      reducer_stock.py


In [None]:
!./hadoop-3.3.6/bin/hadoop jar /content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar -input /content/stocks.txt -output /content/output_stock -file /content/mapper_stock.py -file /content/reducer_stock.py -mapper 'python mapper_stock.py' -reducer 'python reducer_stock.py'

2023-07-01 09:08:05,916 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/mapper_stock.py, /content/reducer_stock.py] [] /tmp/streamjob15080647743453414545.jar tmpDir=null
2023-07-01 09:08:06,807 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 09:08:06,991 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 09:08:06,991 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 09:08:07,045 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-07-01 09:08:07,328 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-01 09:08:07,355 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-01 09:08:07,691 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local256495990_0001
2023-07-01 09:08:07,691 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 09:08:08,271 INF

In [None]:
!ls ./output_stock

part-00000  _SUCCESS


In [None]:
!cat ./output_stock/part-00000

AAPL	10
CSCO	10
GOOG	5
MSFT	10
YHOO	10


# Next example

## Another way - write python mapper/reducer directly in colab

In [None]:
%%writefile stock_mapper.py
# -*- coding: utf-8 -*-
"""mapper functions"""

import sys

for line in sys.stdin:
    part = line.split(',')
    print (part[0], 1)

Writing stock_mapper.py


In [None]:
%%writefile stock_reducer.py
# -*- coding: utf-8 -*-
"""reducer functions"""

import sys
from operator import itemgetter
wordcount = {}

for line in sys.stdin:
    word,count = line.split(' ')
    count = int(count)
    wordcount[word] = wordcount.get(word,0) + count

sorted_wordcount = sorted(wordcount.items(), key=itemgetter(0))

for word, count in sorted_wordcount:
    print ('%s\t%s'% (word,count))

Writing stock_reducer.py


In [None]:
!ls

 20news-18828		   loan_data.csv      sample_data
 20news-18828.tar.gz	   mapper.py	     'shakespeare (1).txt'
 alice.txt		   mapper_stock.py    shakespeare.txt
 bill_authentication.csv   output	      stock_mapper.py
 count			   output_stock       stock_reducer.py
 hadoop-3.3.6		   reducer.py	      stocks.txt
 hadoop-3.3.6.tar.gz	   reducer_stock.py   wordmedian


In [None]:
from google.colab import files
uploaded = files.upload()

Saving stocks.txt to stocks.txt


In [None]:
!ls ../../

20news-18828		 hadoop-3.3.0	      mapper_stock.py  reducer_stock.py
20news-18828.tar.gz	 hadoop-3.3.0.tar.gz  output	       sample_data
alice.txt		 loan_data.csv	      output_stock     shakespeare.txt
bill_authentication.csv  mapper.py	      reducer.py       stocks.txt


In [None]:
!pwd
!ls .

/content/hadoop-3.3.0/bin
container-executor  hdfs.cmd	     oom-listener      test-container-executor
count		    mapper.py	     reducer.py        wordmedian
hadoop		    mapper_stock.py  reducer_stock.py  yarn
hadoop.cmd	    mapred	     stock_mapper.py   yarn.cmd
hdfs		    mapred.cmd	     stock_reducer.py


In [None]:
%%bash

./hadoop-3.3.6/bin/hadoop jar /content/hadoop-3.3.6/share/hadoop/tools/lib/hadoop-streaming-3.3.6.jar \
        -input /content/stocks.txt \
        -output /content/new_output2 \
        -file stock_mapper.py \
        -file stock_reducer.py \
        -mapper 'python stock_mapper.py' \
        -reducer 'python stock_reducer.py'

packageJobJar: [stock_mapper.py, stock_reducer.py] [] /tmp/streamjob6559096818612108963.jar tmpDir=null


2023-07-01 09:11:06,685 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
2023-07-01 09:11:07,508 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2023-07-01 09:11:07,685 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2023-07-01 09:11:07,685 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2023-07-01 09:11:07,713 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2023-07-01 09:11:07,985 INFO mapred.FileInputFormat: Total input files to process : 1
2023-07-01 09:11:08,013 INFO mapreduce.JobSubmitter: number of splits:1
2023-07-01 09:11:08,373 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_local1592926162_0001
2023-07-01 09:11:08,373 INFO mapreduce.JobSubmitter: Executing with tokens: []
2023-07-01 09:11:08,925 INFO mapred.LocalDistributedCacheManager: Localized file:/content/stock_mapper.py as file:/tmp/hadoop-root/mapred/local/job_l

In [None]:
!ls ./new_output2

part-00000  _SUCCESS


/content/hadoop-3.3.0/bin


#### Or we can write/save and run a shell file

In [None]:
sh = """
./hadoop-3.3.0/bin/hadoop jar /content/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar \
        -input /content/stocks.txt \
        -output /content/new_output3 \
        -file stock_mapper.py \
        -file stock_reducer.py \
        -mapper 'python stock_mapper.py' \
        -reducer 'python stock_reducer.py'
"""
with open('run_stock_script.sh', 'w') as file:
  file.write(sh)

In [None]:
!ls
!bash run_stock_script.sh

20news-18828		 mapper.py	      sample_data
20news-18828.tar.gz	 mapper_stock.py      shakespeare.txt
alice.txt		 new_output2	      stock_mapper.py
bill_authentication.csv  output		      stock_reducer.py
count			 output_stock	      stocks.txt
hadoop-3.3.0		 reducer.py	      wordmedian
hadoop-3.3.0.tar.gz	 reducer_stock.py
loan_data.csv		 run_stock_script.sh
2021-11-20 07:46:29,469 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [stock_mapper.py, stock_reducer.py] [] /tmp/streamjob15155124206237022169.jar tmpDir=null
2021-11-20 07:46:30,222 INFO impl.MetricsConfig: Loaded properties from hadoop-metrics2.properties
2021-11-20 07:46:30,383 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
2021-11-20 07:46:30,384 INFO impl.MetricsSystemImpl: JobTracker metrics system started
2021-11-20 07:46:30,411 WARN impl.MetricsSystemImpl: JobTracker metrics system already initialized!
2021-11-20 07:46:30,552 INF

In [None]:
!ls ./new_output3
!cat ./new_output3/part-00000

part-00000  _SUCCESS
AAPL	10
CSCO	10
GOOG	5
MSFT	10
YHOO	10
