The task is to find the ten-year average over the daily maximum temperatures for all days of a year and all weather stations in the given weather dataset.

Start the Hadoop cluster

In [1]:
%%bash
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [p-16a0bd9b-db45-47f9-9da1-1597770ccaed]
Starting resourcemanager
Starting nodemanagers


Upload the weather data set to HDFS

In [2]:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host='localhost', port=9000)
hdfs.put("1901.gz", "1901.gz")
hdfs.put("1902.gz", "1902.gz")
hdfs.put("1903.gz", "1903.gz")
hdfs.put("1904.gz", "1904.gz")

Check that the data set arrived at its final location

In [3]:
hdfs.ls(".")

['/user/jovyan/1901.gz',
 '/user/jovyan/1902.gz',
 '/user/jovyan/1903.gz',
 '/user/jovyan/1904.gz']

Implementation of a MapReduce multi-step job.

In [14]:
%%file average_max_daily_temperatures_mrjob.py

from mrjob.job import MRJob
from mrjob.step import MRStep

class MRMeanMaxTemp(MRJob):

    def mapper_get_temperatures(self, _, line):
        # yield each word in the line
        date = line[4:9] + "-" + line[15:18] + "-" + line[19:20] + "-" + line[21:22]
        temp = value[87:92]
        if (temp != "+9999"):
         yield(date, int(temp))

    def reducer_max_temperature(self, date, temperatures):
        # sum the words we've seen so far
        yield (date, max(temperatures))

    def mapper_get_new_dates(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        old_date_segments = word.split("-");
        new_date = old_date_segments[0] + "-" + old_date_segments[2] + "-" + old_date_segments[3]
        yield (new_key, counts)

    # discard the key; it is just None
    def reducer_find_mean_temperatures(self, dates, counts):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield (new_key, mean(counts))

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_temperatures,
                   reducer=self.reducer_max_temperature),
            MRStep(mapper=self.mapper_get_new_dates,
                    reducer=self.reducer_find_mean_temperatures)
        ]

if __name__ == '__main__':
    MRMeanMaxTemp.run()

Overwriting average_max_daily_temperatures_mrjob.py


Run MapReduce analysis job on cluster

In [15]:
!python average_max_daily_temperatures_mrjob.py -r hadoop hdfs://localhost:9000/user/jovyan/19*.gz

No configs found; falling back on auto-configuration
No configs specified for hadoop runner
Looking for hadoop binary in /home/jovyan/hadoop-3.3.0/bin...
Found hadoop binary: /home/jovyan/hadoop-3.3.0/bin/hadoop
Using Hadoop version 3.3.0
Looking for Hadoop streaming jar in /home/jovyan/hadoop-3.3.0...
Found Hadoop streaming jar: /home/jovyan/hadoop-3.3.0/share/hadoop/tools/lib/hadoop-streaming-3.3.0.jar
Creating temp directory /tmp/average_max_daily_temperatures_mrjob.jovyan.20201109.112716.566487
uploading working dir files to hdfs:///user/jovyan/tmp/mrjob/average_max_daily_temperatures_mrjob.jovyan.20201109.112716.566487/files/wd...
Copying other local files to hdfs:///user/jovyan/tmp/mrjob/average_max_daily_temperatures_mrjob.jovyan.20201109.112716.566487/files/
Running step 1 of 2...
  packageJobJar: [/tmp/hadoop-unjar16864624525493226709/] [] /tmp/streamjob13674769702331051782.jar tmpDir=null
  Connecting to ResourceManager at /0.0.0.0:8032
  Connecting to ResourceManager at /0.0

Shutdown Hadoop cluster

In [None]:
%%bash
$HADOOP_HOME/sbin/stop-yarn.sh
$HADOOP_HOME/sbin/stop-dfs.sh