The task is to find the ten-year average over the daily maximum temperatures for all days of a year and all weather stations in the given weather dataset.

Start the Hadoop cluster

In [None]:
%%bash
$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Starting namenodes on [localhost]
Starting datanodes
Starting secondary namenodes [p-aeed57c6-9140-472c-9e00-6a465bcdfb51]
Starting resourcemanager
Starting nodemanagers


Upload the weather data set to HDFS

In [None]:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host='localhost', port=9000)
for i in range(1,11):
    filename = "19%02d.gz" % i
    hdfs.put(filename, filename)

hdfs.disconnect()

Check that the data set arrived at its final location

In [None]:
from hdfs3 import HDFileSystem
hdfs = HDFileSystem(host='localhost', port=9000)
print(hdfs.ls("."))
hdfs.disconnect()

['/user/root/1901.gz', '/user/root/1902.gz', '/user/root/1903.gz', '/user/root/1904.gz', '/user/root/1905.gz', '/user/root/1906.gz', '/user/root/1907.gz', '/user/root/1908.gz', '/user/root/1909.gz', '/user/root/1910.gz']


Implementation of a MapReduce multi-step job.

In [None]:
%%file average_max_daily_temperatures_mrjob.py

from mrjob.job import MRJob
from mrjob.step import MRStep

import re
import sys

class AverageMaxDailyTemperatures(MRJob):

   def mapper_max_daily_station_date(self, key, value):
      station = value[4:15]
      date = value[15:23]
      temp = value[87:92]
      q = value[92:93]
      if (temp != "+9999" and re.match("[01459]", q)):
         yield(station+date, int(temp))

   def reducer_max_daily_station_date(self, key, values):
      yield(key, max(values))

   def mapper_average_daily_station_temp(self, key, value):
      station = key[0:11]
      date = key[11:19]
      max_temp = value
      station_day_month = "%s-%s" % (station,date[4:8])
      yield(station_day_month, max_temp)

   def reducer_average_daily_station_temp(self, key, values):
      temp_sum = 0.0
      count = 0
      for val in values:
         temp_sum = temp_sum+val
         count = count + 1
      yield(key, (temp_sum/10.0) / count)

   def steps(self):
      return [
         MRStep(mapper=self.mapper_max_daily_station_date,
               combiner=self.reducer_max_daily_station_date,
               reducer=self.reducer_max_daily_station_date
               )
               ,
         MRStep(mapper=self.mapper_average_daily_station_temp,
               reducer=self.reducer_average_daily_station_temp)
               ]


if __name__ == '__main__':
   AverageMaxDailyTemperatures.run()

Writing average_max_daily_temperatures_mrjob.py


Run MapReduce analysis job on cluster

In [None]:
!python average_max_daily_temperatures_mrjob.py -r hadoop hdfs://localhost:9000/user/root/19*.gz

"02982099999-0411"	3.44
"02982099999-0412"	4.220000000000001
"02982099999-0413"	4.5600000000000005
"02982099999-0414"	5.76
"02982099999-0415"	4.88
"02982099999-0416"	5.32
"02982099999-0417"	4.220000000000001
"02982099999-0418"	4.66
"02982099999-0419"	3.78
"02982099999-0420"	2.7800000000000002
"02982099999-0421"	3.22
"02982099999-0422"	3.54
"02982099999-0423"	3.7600000000000002
"02982099999-0424"	4.220000000000001
"02982099999-0425"	5.0200000000000005
"02982099999-0426"	5.34
"02982099999-0427"	6.34
"02982099999-0428"	4.88
"02982099999-0429"	5.5600000000000005
"02982099999-0430"	8.08
"02982099999-0501"	6.56
"02982099999-0502"	7.44
"02982099999-0503"	5.9
"02982099999-0504"	8.120000000000001
"02982099999-0505"	9.78
"02982099999-0506"	10.66
"02982099999-0507"	8.440000000000001
"02982099999-0508"	7.540000000000001
"02982099999-0509"	9.32
"02982099999-0510"	9.9
"02982099999-0511"	10.2
"02982099999-0512"	8.879999999999999
"02982099999-0513"	10.32
"02982099999-0514"	8.24
"02982099999-0515"	9.78

Shutdown Hadoop cluster

In [None]:
%%bash
$HADOOP_HOME/sbin/stop-yarn.sh
$HADOOP_HOME/sbin/stop-dfs.sh

Stopping nodemanagers
localhost: ERROR: Unable to kill 865
Stopping resourcemanager
Stopping namenodes on [localhost]
localhost: ERROR: Unable to kill 265
Stopping datanodes
localhost: ERROR: Unable to kill 361
Stopping secondary namenodes [p-aeed57c6-9140-472c-9e00-6a465bcdfb51]
p-aeed57c6-9140-472c-9e00-6a465bcdfb51: ERROR: Unable to kill 521
ERROR: Unable to kill 776


<a style='text-decoration:none;line-height:16px;display:flex;color:#5B5B62;padding:10px;justify-content:end;' href='https://deepnote.com?utm_source=created-in-deepnote-cell&projectId=aeed57c6-9140-472c-9e00-6a465bcdfb51' target="_blank">
 </img>
Created in <span style='font-weight:600;margin-left:4px;'>Deepnote</span></a>