In [1]:
!docker ps -a

CONTAINER ID        IMAGE                                                    COMMAND                  CREATED             STATUS                     PORTS                    NAMES
78cd0dd5344e        bde2020/hadoop-nodemanager:1.1.0-hadoop2.7.1-java8       "/entrypoint.sh /run…"   16 hours ago        Up 16 hours (healthy)      8042/tcp                 nodemanager1
f9b4ef7c4437        bde2020/hadoop-resourcemanager:1.1.0-hadoop2.7.1-java8   "/entrypoint.sh /run…"   16 hours ago        Up 16 hours (healthy)      0.0.0.0:8089->8088/tcp   resourcemanager
241cc6651043        sakha002/namenode:latest                                 "/entrypoint.sh /run…"   16 hours ago        Up 16 hours (healthy)      0.0.0.0:9870->9870/tcp   namenode
b61fbbd6c982        bde2020/hadoop-historyserver:1.1.0-hadoop2.7.1-java8     "/entrypoint.sh /run…"   17 hours ago        Up 17 hours (healthy)      8188/tcp                 historyserver
e62f0db47c80        bde2020/hadoop-datanode:1.1.0-hadoop2.7.1-java8

# Using Hadoop Streaming in container

https://medium.com/@rrfd/your-first-map-reduce-using-hadoop-with-python-and-osx-ca3b6f3dfe78

## Using the hadoop streaming with python



So looks like that hadoop provides the ability to run mapreduce tasks for non java tasks as well.
It is via an API called **hadoop-streaming**

Now the hadoop-streaming is itself a jar file, so we would again run some line like: 
```
hadoop jar /usr/local/Cellar/hadoop/3.1.0/libexec/share/hadoop/tools/lib/hadoop-*streaming*.jar 

-file /<path_to_mapper>/mapper.py \
-mapper /<path_to_mapper>/mapper.py \
-file /<path_to_reducer>/reducer.py  \
-reducer /<path_to_reducer>/reducer.py  \
-input daily/daily.csv \
-output daily/output
```

Now to me it looks like that the above line is generaly in the form of

```
hadoop jar path/to/file/hadoop-streaming.jar [options]
```

But the issue was that I could not find any such file in the docker build of the hadoop distribution.

When searching more in the web for this I could find that 
it would be in the hadoop distribution 

> the Hadoop streaming jar is still available in the latest release of EMR Hadoop. Starting with EMR release 4.0.0 it can be found at /usr/lib/hadoop-mapreduce/hadoop-streaming.jar.

https://stackoverflow.com/questions/32543734/how-to-find-jar-home-hadoop-contrib-streaming-hadoop-streaming-jar

or in 

> Hadoop streaming is a utility that comes with the Hadoop distribution. The utility allows you to create and run Map/Reduce jobs with any executable or script as the mapper and/or the reducer. For example:
$HADOOP_HOME/bin/hadoop  jar $HADOOP_HOME/hadoop-streaming.jar \

https://hadoop.apache.org/docs/r1.2.1/streaming.html

so I thought I would need to get the original hadoop distirbution.
(now all of these troubles may sound laughable for someone with Java expereicne but, these are in fact cnfusing for someone like me)

okay now I got the hadoop source from 

https://www.apache.org/dyn/closer.cgi/hadoop/common/hadoop-3.1.4/hadoop-3.1.4-src.tar.gz

But it did not have any hadoop-streaming.jar in it. (maybe it needs builidng or something I don't know). But anyway it did not seem usefull.

I thought I should use the exact jar file from somewhere, so I could find it here:

http://www.java2s.com/Code/Jar/h/Downloadhadoopstreamingjar.htm

https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-streaming/3.1.0/

now the plan is to copy it into the docker container along with the input options and see if it works. Will see in a bit!


## Before a tutorial example

We defenitely need more detail on how the hadoop-streaming api works.

Some references to look at 

https://hadoop.apache.org/docs/r1.2.1/streaming.pdf

https://hadoop.apache.org/docs/r1.2.1/streaming.html


http://www.devx.com/opensource/introduction-to-hadoop-streaming.html


But A key question to be addressed is that if we can define a Map Reduce Task in any kind of executable, including python and hadoop will do the job, then what is essentially this mapreduce task.
How the hadoop hdfs helps with running this task. 
Can it be applied on more than one file? 
how the files and Data in general are organzed and placed in the HDFS?


### What makes a python code a Map/Reduce task?


Again, the question was how the map reduce works with hadoop. If we can write our own map and reduce  scripts in python how they should look like ?

okay I read through the following sources and looked at the python codes to get an idea of what it means to have a map reduce task. The short answer is that the map task produces a set of (key, value)  pairs (which is from processing a chunk of input data, but we can assume that it is all the data for now).
and the reduce task is processing and somehow summing up the  outputs of the mappers ( i.e. the set of key, values). And I guess hadoop does some manipulation of the (key, value) paris of all mappers before handing them to reducers.

https://intellipaat.com/blog/tutorial/hadoop-tutorial/mapreduce-in-hadoop/


https://intellipaat.com/blog/tutorial/hadoop-tutorial/hadoop-streaming/

https://www.youtube.com/watch?v=qskfdqsK9fk&feature=emb_logo&ab_channel=Intellipaat

https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/



https://cwiki.apache.org/confluence/display/HADOOP2/HadoopMapReduce#app-switcher


https://en.wikipedia.org/wiki/MapReduce



https://cwiki.apache.org/confluence/display/HADOOP2/HadoopStreaming



https://blog.matthewrathbone.com/2013/11/17/python-map-reduce-on-hadoop-a-beginners-tutorial.html

Now among the source I could find in a shallow search, this one here was actually made better understand what is going on 

https://intellipaat.com/blog/tutorial/hadoop-tutorial/mapreduce-in-hadoop/

and this one has a good example in action 

https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/


which I will go through it more, in the rest of this.


#Writing a Map Reduce task in python

In [None]:
!ls

docker-hadoop				     hadoop_source
hadoop-mapreduce-examples-2.7.1-sources.jar  python_hadoop_example


In [None]:
%cd python_hadoop_examples/example2

/media/hossein/HDD/gdrive/Active_projects/my_dfs/python_hadoop_examples/example2


### Word Count example




https://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/



```
#!/usr/bin/env python3
"""mapper.py"""

import sys

# input comes from STDIN (standard input)
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()
    # split the line into words
    words = line.split()
    # increase counters
    for word in words:
        # write the results to STDOUT (standard output);
        # what we output here will be the input for the
        # Reduce step, i.e. the input for reducer.py
        #
        # tab-delimited; the trivial word count is 1
        print '%s\t%s' % (word, 1)```





```
#!/usr/bin/env python3
"""reducer.py"""

from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split('\t', 1)

    # convert count (currently a string) to int
    try:
        count = int(count)
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        continue

    # this IF-switch only works because Hadoop sorts map output
    # by key (here: word) before it is passed to the reducer
    if current_word == word:
        current_count += count
    else:
        if current_word:
            # write result to STDOUT
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word

# do not forget to output the last word if needed!
if current_word == word:
    print '%s\t%s' % (current_word, current_count)```



In [None]:
!ls
!wget http://www.gutenberg.org/ebooks/20417.txt.utf-8
!wget http://www.gutenberg.org/files/5000/5000-8.txt
!wget http://www.gutenberg.org/files/4300/4300-0.txt
!ls


In [None]:
!chmod +x ./mapper.py
!chmod +x ./reducer.py

In [None]:
!echo "foo foo quux labs foo bar quux" | ./mapper.py


foo	1
foo	1
quux	1
labs	1
foo	1
bar	1
quux	1


if it does not work, it could be due to the proper configuration of the python for ubuntu, this could help

http://openbookproject.net/thinkcs/python/english3e/app_c.html

in my case, it was only that I needed to replace the python in the first line of mapper and reducer to python3



In [None]:
!echo "foo foo quux labs foo bar quux" | ./mapper.py | sort -k1,1 | ./reducer.py


bar	1
foo	3
labs	1
quux	2


In [None]:
!cat ./4300-0.txt | ./mapper.py

okay next step would be to run the codes in hadoop.
okay first we copy the data and all into the container.

In [None]:
!docker cp . namenode:media/example2

we should repeat the steps for checking the py codes in the container as well.


okay now my container does not recognize either python of python3.

so the stuff in the link above does not work



export PATH=$PATH:/usr/lib/python3

okay so none of the games I played with the  path variables did work. looked like to me tht there is no python installed on the image.

So after a long and painfull course of actions I installed python on the image.
we need to change the dockerfile for the nodename and then change the docker compose file.

now we have the python on the container.
but we need to copy files again.


In [None]:
!ls


20417.txt.utf-8  4300-0.txt  5000-8.txt  mapper.py  reducer.py


In [None]:
!docker cp . namenode:media/example2

and .. yes... the mapper.py works in the container.


```
hadoop hdfs -copyFromLocal /media/example2 /user/gutenberg
```
```
WARNING: Use of this script to execute dfs is deprecated.
WARNING: Attempting to execute replacement "hdfs dfs" instead.

2020-10-10 03:50:58,131 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 03:50:58,240 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 03:50:58,269 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 03:50:58,304 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 03:50:58,332 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
root@241cc6651043:/media/example2# 
```




okay so this above command looks like did not work entirely. so at the end I used the following

```
hdfs dfs -copyFromLocal /media/example2 gutenberg
```

should learn some more on this hdfs manipulation

okay now we have the mapper code we have the data on hdfs, now we should copy the streemin jar file.


In [None]:
!ls
%cd ../hadoop_source/
!ls

example1  example2
/media/hossein/HDD/gdrive/Active_projects/my_dfs/hadoop_source
hadoop-3.1.4-src	 hadoop-streaming-2.1.0-beta.jar
hadoop-3.1.4-src.tar.gz  hadoop-streaming-2.1.0-beta.jar.zip


In [None]:
!docker cp ./hadoop-streaming-2.1.0-beta.jar namenode:hadoop-streaming-2.1.0-beta.jar

### Currency example

https://medium.com/@rrfd/your-first-map-reduce-using-hadoop-with-python-and-osx-ca3b6f3dfe78

Let’s look at our mapper.py and make sure to include #!/usr/bin/python at the top of you script. The #! is called a shebang and allows your script to be executed like a standalone executable without typing python in front of it. I’ve included it to ensure our script runs, because sometimes Hadoop can be fussy with executables.

```
#!/usr/bin/python
# The Mapper
import sys
import csv
# Set local variables
iteration = 0
currentCountry = None
previousCountry = None
currentFx = None
previousFx = None
percentChange = None
currentKey = None
fxMap = []
# print "Starting mapper.py"
infile = sys.stdin
next(infile) # skip first line of input file
for line in infile:
line = line.strip()
    line = line.split(',', 2)
try:
        # Get data from line
        currentCountry = line[1].rstrip()
        if len(line[2]) == 0:
            continue
        currentFx = float(line[2])
if currentCountry != previousCountry:
            previousCountry = currentCountry
            previousFx = currentFx
            previousLine = line
            continue
# If country same as previous, add to map
        elif currentCountry == previousCountry:
            percentChange = ((currentFx - previousFx) / previousFx) * 100.00
            percentChange = round(percentChange, 2)
            percentChange = percentChange
currentKey = "%s: %6.2f%%" % (currentCountry, percentChange)
# Set the array with tuple keys
            fxMap.append(tuple([currentKey, 1]))
# Update Values
        previousCountry = currentCountry
        previousFx = currentFx
        previousLine = line
# Uncomment if you want to see the output
#         if iteration % 50000 == 0:
#             print "Current iteration is %d" % iteration
#         iteration += 1
# Handle unexpected errors
    except Exception as e:
        template = "An exception of type {0} occurred. Arguments:\n{1!r}"
        message = template.format(type(e).__name__, e.args)
        print "currentFx: %.2f previousFx: %.2f" % (currentFx, previousFx)
        print message
        sys.exit(0)
#
# print "mapper.py has completed with %d iterations" % (iteration - 1)
# Show the returned values
for i in sorted(fxMap):
    print "%-20s - %d" % (i[0], i[1])
```

So looks like we ccan 

# Runnig the hadoop streaming job:


## First try of the steam task:

```
hadoop jar hadoop-streaming-2.1.0-beta.jar \
-file /media/example2/mapper.py    -mapper /media/example2/mapper.py \
-file /media/example2/reducer.py   -reducer /media/example2/reducer.py \
-input gutenberg/* -output gutenberg-output
```


Okay so this job failed so misrably.

with a very looooongggg1 error message:

```
root@241cc6651043:/# hadoop jar hadoop-streaming-2.1.0-beta.jar \
> -file /media/example2/mapper.py    -mapper /media/example2/mapper.py \
> -file /media/example2/reducer.py   -reducer /media/example2/reducer.py \
> -input gutenberg/* -output gutenberg-output
2020-10-10 17:15:38,928 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/media/example2/mapper.py, /media/example2/reducer.py, /tmp/hadoop-unjar1909972349476209814/] [] /tmp/streamjob6870132652541640349.jar tmpDir=null
2020-10-10 17:15:39,676 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.20.0.7:8032
2020-10-10 17:15:39,817 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.20.0.8:10200
2020-10-10 17:15:39,848 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.20.0.7:8032
2020-10-10 17:15:39,848 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.20.0.8:10200
2020-10-10 17:15:39,993 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1602301440091_0001
2020-10-10 17:15:40,085 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,180 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,216 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,280 INFO mapred.FileInputFormat: Total input files to process : 5
2020-10-10 17:15:40,329 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,354 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,370 INFO mapreduce.JobSubmitter: number of splits:5
2020-10-10 17:15:40,483 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-10 17:15:40,516 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1602301440091_0001
2020-10-10 17:15:40,516 INFO mapreduce.JobSubmitter: Executing with tokens: []
2020-10-10 17:15:40,648 INFO conf.Configuration: resource-types.xml not found
2020-10-10 17:15:40,648 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2020-10-10 17:15:40,939 INFO impl.YarnClientImpl: Submitted application application_1602301440091_0001
2020-10-10 17:15:40,965 INFO mapreduce.Job: The url to track the job: http://resourcemanager:8088/proxy/application_1602301440091_0001/
2020-10-10 17:15:40,966 INFO mapreduce.Job: Running job: job_1602301440091_0001
2020-10-10 17:15:47,111 INFO mapreduce.Job: Job job_1602301440091_0001 running in uber mode : false
2020-10-10 17:15:47,113 INFO mapreduce.Job:  map 0% reduce 0%
2020-10-10 17:15:51,185 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000000_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:52,218 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000001_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:53,229 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000002_0, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:54,235 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000000_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:56,245 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000001_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:58,257 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000002_1, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:15:59,267 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000000_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:16:00,278 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000001_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:16:01,287 INFO mapreduce.Job: Task Id : attempt_1602301440091_0001_m_000002_2, Status : FAILED
Error: java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 127
        at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:320)
        at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:533)
        at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:130)
        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
        at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)
        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:465)
        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
        at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
        at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)

2020-10-10 17:16:03,306 INFO mapreduce.Job:  map 100% reduce 100%
2020-10-10 17:16:03,325 INFO mapreduce.Job: Job job_1602301440091_0001 failed with state FAILED due to: Task failed task_1602301440091_0001_m_000000
Job failed as tasks failed. failedMaps:1 failedReduces:0 killedMaps:0 killedReduces: 0

2020-10-10 17:16:03,408 INFO mapreduce.Job: Counters: 14
        Job Counters 
                Failed map tasks=10
                Killed map tasks=4
                Killed reduce tasks=1
                Launched map tasks=12
                Other local map tasks=9
                Rack-local map tasks=3
                Total time spent by all maps in occupied slots (ms)=83972
                Total time spent by all reduces in occupied slots (ms)=0
                Total time spent by all map tasks (ms)=20993
                Total vcore-milliseconds taken by all map tasks=20993
                Total megabyte-milliseconds taken by all map tasks=85987328
        Map-Reduce Framework
                CPU time spent (ms)=0
                Physical memory (bytes) snapshot=0
                Virtual memory (bytes) snapshot=0
2020-10-10 17:16:03,408 ERROR streaming.StreamJob: Job not Successful!
Streaming Command Failed!

```

## debugging of the streaming failed job:

okay so I had a few thoughts on what could be the cause:
  * something about using -file instead of -files that showed in the message
  * something wrong on the mapper or reducer codes
  * something wrong on the streaming jar file especially that it was beta ver
  * something to look for on error code 127

some people said that this error code is mostly due to some error in execuation of python codes by hadoop.
the first suggestion was to look at the pyhon 2 and 3 differences such as for the print statements.
The other one which I think is more probable is due to the path of the python.





In [3]:
!ls
%cd ..
!ls

main  README.md
/media/hossein/HDD/gdrive/SkillsLearningTraining
 Battery	    Energy_Markets	  Optimization
'Colab Notebooks'   General_EE_topics	  PES_topics
 courses	    General_good_reads	  Programming
 Data_science	    Hot_research_works	  Skills_and_training
 docker-tutorials   Operations_research   tutorials-python


In [4]:
%cd ../Active_projects/my_dfs/
!ls

/media/hossein/HDD/gdrive/Active_projects/my_dfs
docker-hadoop				     hadoop_source
hadoop-mapreduce-examples-2.7.1-sources.jar  python_hadoop_examples


In [5]:
%cd hadoop_source/
!ls

/media/hossein/HDD/gdrive/Active_projects/my_dfs/hadoop_source
hadoop-3.1.4-src		 hadoop-streaming-2.1.0-beta.jar.zip
hadoop-3.1.4-src.tar.gz		 hadoop-streaming-3.1.0.jar
hadoop-streaming-2.1.0-beta.jar


In [6]:
!docker cp ./hadoop-streaming-3.1.0.jar namenode:hadoop-streaming-3.1.0.jar

In [15]:
!docker cp hadoop_source/hadoop-streaming-2.1.0-beta.jar namenode:hadoop-streaming-2.1.0-beta.jar


okay so I thought i need to install python on all worker nodes
so runnig it again

In [14]:
!ls

docker-hadoop				     hadoop_source
hadoop-mapreduce-examples-2.7.1-sources.jar  python_hadoop_examples


In [10]:
%cd ..
!ls

/media/hossein/HDD/gdrive/Active_projects/my_dfs
docker-hadoop				     hadoop_source
hadoop-mapreduce-examples-2.7.1-sources.jar  python_hadoop_examples


In [16]:
!docker cp ./python_hadoop_examples/example2/ namenode:media/example2

Actually still failed with python on both name nodes and data nodes.
SO this time installed python on all the nodes!
and repeating all above again.


### And Finally!


So yess!! this time worked!

speciall thanks to

https://github.com/Yelp/mrjob/issues/1564


Well of course the parts about the path to python and changing the commands etc. did not work, but he was correct that I need python on my Cluster and not just the interface node.

These ones were also somewhat related:

https://stackoverflow.com/questions/48916243/python-hadoop-streaming-on-windows-script-not-a-valid-win32-application

https://stackoverflow.com/questions/52904865/running-a-hadoop-streaming-and-mapreduce-job-pipemapred-waitoutputthreads-s


https://stackoverflow.com/questions/43048654/hadoop-python-subprocess-failed-with-code-127


Sp here is what the message looks like

```
hadoop jar hadoop-streaming-2.1.0-beta.jar -file /media/example2/mapper.py    -mapper /media/example2/mapper.py -file /media/example2/reducer.py   -reducer /media/example2/reducer.py -input gutenberg/* -output gutenberg-output12
2020-10-11 02:33:05,572 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/media/example2/mapper.py, /media/example2/reducer.py, /tmp/hadoop-unjar4236337157738795401/] [] /tmp/streamjob5367433111899547056.jar tmpDir=null
2020-10-11 02:33:06,246 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.20.0.8:8032
2020-10-11 02:33:06,368 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.20.0.6:10200
2020-10-11 02:33:06,392 INFO client.RMProxy: Connecting to ResourceManager at resourcemanager/172.20.0.8:8032
2020-10-11 02:33:06,392 INFO client.AHSProxy: Connecting to Application History server at historyserver/172.20.0.6:10200
2020-10-11 02:33:06,530 INFO mapreduce.JobResourceUploader: Disabling Erasure Coding for path: /tmp/hadoop-yarn/staging/root/.staging/job_1602383551624_0001
2020-10-11 02:33:06,611 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:06,698 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:06,736 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:06,811 INFO mapred.FileInputFormat: Total input files to process : 10
2020-10-11 02:33:06,855 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:06,882 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:06,898 INFO mapreduce.JobSubmitter: number of splits:10
2020-10-11 02:33:06,987 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2020-10-11 02:33:07,008 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1602383551624_0001
2020-10-11 02:33:07,009 INFO mapreduce.JobSubmitter: Executing with tokens: []
2020-10-11 02:33:07,165 INFO conf.Configuration: resource-types.xml not found
2020-10-11 02:33:07,165 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'.
2020-10-11 02:33:07,450 INFO impl.YarnClientImpl: Submitted application application_1602383551624_0001
2020-10-11 02:33:07,486 INFO mapreduce.Job: The url to track the job: http://resourcemanager:8088/proxy/application_1602383551624_0001/
2020-10-11 02:33:07,488 INFO mapreduce.Job: Running job: job_1602383551624_0001
2020-10-11 02:33:12,560 INFO mapreduce.Job: Job job_1602383551624_0001 running in uber mode : false
2020-10-11 02:33:12,562 INFO mapreduce.Job:  map 0% reduce 0%
2020-10-11 02:33:18,681 INFO mapreduce.Job:  map 10% reduce 0%
2020-10-11 02:33:19,687 INFO mapreduce.Job:  map 20% reduce 0%
2020-10-11 02:33:20,701 INFO mapreduce.Job:  map 30% reduce 0%
2020-10-11 02:33:23,714 INFO mapreduce.Job:  map 40% reduce 0%
2020-10-11 02:33:24,721 INFO mapreduce.Job:  map 50% reduce 0%
2020-10-11 02:33:25,724 INFO mapreduce.Job:  map 60% reduce 0%
2020-10-11 02:33:26,728 INFO mapreduce.Job:  map 70% reduce 0%
2020-10-11 02:33:27,741 INFO mapreduce.Job:  map 80% reduce 0%
2020-10-11 02:33:28,751 INFO mapreduce.Job:  map 90% reduce 0%
2020-10-11 02:33:30,764 INFO mapreduce.Job:  map 100% reduce 0%
2020-10-11 02:33:34,784 INFO mapreduce.Job:  map 100% reduce 100%
2020-10-11 02:33:34,794 INFO mapreduce.Job: Job job_1602383551624_0001 completed successfully
2020-10-11 02:33:34,857 INFO mapreduce.Job: Counters: 55
        File System Counters
                FILE: Number of bytes read=547367
                FILE: Number of bytes written=4176375
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=7384087
                HDFS: Number of bytes written=894164
                HDFS: Number of read operations=35
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
                HDFS: Number of bytes read erasure-coded=0
        Job Counters 
                Killed map tasks=1
                Launched map tasks=10
                Launched reduce tasks=1
                Rack-local map tasks=10
                Total time spent by all maps in occupied slots (ms)=86796
                Total time spent by all reduces in occupied slots (ms)=28816
                Total time spent by all map tasks (ms)=21699
                Total time spent by all reduce tasks (ms)=3602
                Total vcore-milliseconds taken by all map tasks=21699
                Total vcore-milliseconds taken by all reduce tasks=3602
                Total megabyte-milliseconds taken by all map tasks=88879104
                Total megabyte-milliseconds taken by all reduce tasks=29507584
        Map-Reduce Framework
                Map input records=157624
                Map output records=1260602
                Map output bytes=9677088
                Map output materialized bytes=1067856
                Input split bytes=1083
                Combine input records=0
                Combine output records=0
                Reduce input groups=82604
                Reduce shuffle bytes=1067856
                Reduce input records=1260602
                Reduce output records=82604
                Spilled Records=2521204
                Shuffled Maps =10
                Failed Shuffles=0
                Merged Map outputs=10
                GC time elapsed (ms)=493
                CPU time spent (ms)=12810
                Physical memory (bytes) snapshot=3636121600
                Virtual memory (bytes) snapshot=59547975680
                Total committed heap usage (bytes)=4254072832
                Peak Map Physical memory (bytes)=364384256
                Peak Map Virtual memory (bytes)=5112508416
                Peak Reduce Physical memory (bytes)=264335360
                Peak Reduce Virtual memory (bytes)=8454361088
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=7383004
        File Output Format Counters 
                Bytes Written=894164
2020-10-11 02:33:34,857 INFO streaming.StreamJob: Output directory: gutenberg-output12
```


can check out the results by 
```
hdfs dfs -cat gutenberg-output12/part-00000
hdfs dfs -ls gutenberg-output12
```


### hadoop streaming run options

```
hadoop jar hadoop-*streaming*.jar -files /media/example2/mapper.py    -mapper /media/example2/mapper.py -files /media/example2/reducer.py   -reducer /media/example2/reducer.py -input gutenberg/* -output gutenberg-output
2020-10-10 17:29:32,387 ERROR streaming.StreamJob: Unrecognized option: -files
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  -input          <path> DFS input file(s) for the Map step.
  -output         <path> DFS output directory for the Reduce step.
  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.
  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.
  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.
  -file           <file> Optional. File/dir to be shipped in the Job jar file.
                  Deprecated. Use generic option "-files" instead.
  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
                  Optional. The input format class.
  -outputformat   <TextOutputFormat(default)|JavaClassName>
                  Optional. The output format class.
  -partitioner    <JavaClassName>  Optional. The partitioner class.
  -numReduceTasks <num> Optional. Number of reduce tasks.
  -inputreader    <spec> Optional. Input recordreader spec.
  -cmdenv         <n>=<v> Optional. Pass env.var to streaming commands.
  -mapdebug       <cmd> Optional. To run this script when a map task fails.
  -reducedebug    <cmd> Optional. To run this script when a reduce task fails.
  -io             <identifier> Optional. Format to use for input to and output
                  from mapper/reducer commands
  -lazyOutput     Optional. Lazily create Output.
  -background     Optional. Submit the job and don't wait till it completes.
  -verbose        Optional. Print verbose output.
  -info           Optional. Print detailed usage.
  -help           Optional. Print help message.

Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]


For more details about these options:
Use $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar -info

Try -help for more information
Streaming Command Failed!
root@241cc6651043:/# hadoom-streaming.jar -infobash: hadoom-streaming.jar: command not found
root@241cc6651043:/# hadoop jar hadoop-streaming.jar -info
JAR does not exist or is not a normal file: /hadoop-streaming.jar
root@241cc6651043:/# hadoop jar hadoop-*streaming*.jar -info
Usage: $HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar [options]
Options:
  -input          <path> DFS input file(s) for the Map step.
  -output         <path> DFS output directory for the Reduce step.
  -mapper         <cmd|JavaClassName> Optional. Command to be run as mapper.
  -combiner       <cmd|JavaClassName> Optional. Command to be run as combiner.
  -reducer        <cmd|JavaClassName> Optional. Command to be run as reducer.
  -file           <file> Optional. File/dir to be shipped in the Job jar file.
                  Deprecated. Use generic option "-files" instead.
  -inputformat    <TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName>
                  Optional. The input format class.
  -outputformat   <TextOutputFormat(default)|JavaClassName>
                  Optional. The output format class.
  -partitioner    <JavaClassName>  Optional. The partitioner class.
  -numReduceTasks <num> Optional. Number of reduce tasks.
  -inputreader    <spec> Optional. Input recordreader spec.
  -cmdenv         <n>=<v> Optional. Pass env.var to streaming commands.
  -mapdebug       <cmd> Optional. To run this script when a map task fails.
  -reducedebug    <cmd> Optional. To run this script when a reduce task fails.
  -io             <identifier> Optional. Format to use for input to and output
                  from mapper/reducer commands
  -lazyOutput     Optional. Lazily create Output.
  -background     Optional. Submit the job and don't wait till it completes.
  -verbose        Optional. Print verbose output.
  -info           Optional. Print detailed usage.
  -help           Optional. Print help message.

Generic options supported are:
-conf <configuration file>        specify an application configuration file
-D <property=value>               define a value for a given property
-fs <file:///|hdfs://namenode:port> specify default filesystem URL to use, overrides 'fs.defaultFS' property from configurations.
-jt <local|resourcemanager:port>  specify a ResourceManager
-files <file1,...>                specify a comma-separated list of files to be copied to the map reduce cluster
-libjars <jar1,...>               specify a comma-separated list of jar files to be included in the classpath
-archives <archive1,...>          specify a comma-separated list of archives to be unarchived on the compute machines

The general command line syntax is:
command [genericOptions] [commandOptions]


Usage tips:
In -input: globbing on <path> is supported and can have multiple -input

Default Map input format: a line is a record in UTF-8 the key part ends at first
  TAB, the rest of the line is the value

To pass a Custom input format:
  -inputformat package.MyInputFormat

Similarly, to pass a custom output format:
  -outputformat package.MyOutputFormat

The files with extensions .class and .jar/.zip, specified for the -file
  argument[s], end up in "classes" and "lib" directories respectively inside
  the working directory when the mapper and reducer are run. All other files
  specified for the -file argument[s] end up in the working directory when the
  mapper and reducer are run. The location of this working directory is
  unspecified.

To set the number of reduce tasks (num. of output files) as, say 10:
  Use -numReduceTasks 10
To skip the sort/combine/shuffle/sort/reduce step:
  Use -numReduceTasks 0
  Map output then becomes a 'side-effect output' rather than a reduce input.
  This speeds up processing. This also feels more like "in-place" processing
  because the input filename and the map input order are preserved.
  This is equivalent to -reducer NONE

To speed up the last maps:
  -D mapreduce.map.speculative=true
To speed up the last reduces:
  -D mapreduce.reduce.speculative=true
To name the job (appears in the JobTracker Web UI):
  -D mapreduce.job.name='My Job'
To change the local temp directory:
  -D dfs.data.dir=/tmp/dfs
  -D stream.tmpdir=/tmp/streaming
Additional local temp directories with -jt local:
  -D mapreduce.cluster.local.dir=/tmp/local
  -D mapreduce.jobtracker.system.dir=/tmp/system
  -D mapreduce.cluster.temp.dir=/tmp/temp
To treat tasks with non-zero exit status as SUCCEDED:
  -D stream.non.zero.exit.is.failure=false
Use a custom hadoop streaming build along with standard hadoop install:
  $HADOOP_PREFIX/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\
    [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar
For more details about jobconf parameters see:
  http://wiki.apache.org/hadoop/JobConfFile
To set an environement variable in a streaming command:
   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/

Shortcut:
   setenv HSTREAMING "$HADOOP_PREFIX/bin/hadoop jar hadoop-streaming.jar"

Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl"
           -file /local/filter.pl -input "/logs/0604*/*" [...]
  Ships a script, invokes the non-shipped perl interpreter. Shipped files go to
  the working directory so filter.pl is found by perl. Input files are all the
  daily logs for days in month 2006-04
root@241cc6651043:/# 
```