## Q1. Descriptive Statistics: Weather Data (NCDC)

#### Introduction
This big data coursework summarizes the hourly weather data from the National Climate Centre (NCDC) for the month of July 2007 using distributed MapReduce framework and Hadoop File Systems (hdfs)

#### MapReduce methodology
MapReduce is a programming model for parallel processing of large datasets. The MapReduce model is based on two main functions: the map function, which processes input data and emits key-value pairs, and the reduce function, which aggregates the output from the map function. However, it has some limitations such as:

- Limited flexibility, it's not well-suited for real-time or interactive data processing.
- Data locality, it can be slow and costly in terms of network bandwidth and storage.
- Data storage, HDFS is not as performant as other storage systems.
- Resource management, it can be difficult to optimize resource usage and troubleshoot issues.
- Scalability, adding more nodes to a cluster can be a complex process.
- Complexity, it requires a high level of expertise to implement and maintain.

#### The psuedo code (Mapper & Reducer)

**Mapper**

**Reducer**

#### Explanation of the methodology

Beofre the code starts, the lena HDFS and Apache MapReduce framework are used to handle the big data, using a mapper and reducer 

This code is using Python and the pandas library to explore a dataset of weather data. The first step is to import the necessary libraries, which in this case are pandas, numpy, and functools. The data is then loaded from a file called '200707hourly.txt' using the pandas read_csv function. This function takes the path to the file as an argument, and in this case, the usecols parameter is also used to select only certain columns of the data to be included in the DataFrame.

Once the data is loaded, the columns of the DataFrame are cleaned by removing spaces from the column names. This is done by using the str.replace() method on the columns attribute of the DataFrame, which replaces all spaces with nothing. The shape of the DataFrame is then printed to show the number of rows and columns.

The next step is to sort the values in the DataFrame by the YearMonthDay column and reset the index. This is done using the sort_values() and reset_index() methods, respectively. The sort_values() method takes the by parameter, which is set to 'YearMonthDay', to specify which column to sort by. The reset_index() method is used to reset the index of the DataFrame so that it starts at 0 again.

After that, the code performs some data cleaning steps. It first creates a copy of the original DataFrame called weather_df_clean, and then removes any blank temperature values. This is done by using the str.strip() method on the DryBulbTemp column, which removes any leading or trailing whitespace, and then replacing any remaining blank values with '-'.

The next step is to replace the '-' values with NaN. This is done using the replace() method on the DataFrame, passing '-' as the first argument and np.NaN as the second argument. The isna() method is then used to filter the DataFrame for any rows that have NaN values in the DryBulbTemp column.

The code then removes any rows that have null values from the DataFrame using the dropna() method. This method removes any rows that have at least one NaN value. After that, the code cast the YearMonthDay column to a string and DryBulbTemp column to int. This is done by using the astype() method on the appropriate columns.

The code then calculates some summary statistics of the data using the describe() method on the DataFrame. This method calculates various statistics such as the count, mean, standard deviation, minimum, and quartiles of each column. The shape of the cleaned data is also printed.

After that, the code groups the data by day, month, and year using the groupby() method on the DataFrame, which takes the 'YearMonthDay' column as the grouping key. The code then queries the grouped data for the minimum temperature of the day of July 1st, 2007.

Finally, the code prints min, max, mean, median, and variance for the grouped data. This is done by using the various statistical methods such as min(), max(), mean(), median() and var() on the grouped data. The code is looking at the DryBulbTemp column and finding the minimum value for the day of July 1st, 2007 in the dataset.

Overall, this code is performing several data cleaning, manipulation, and analysis tasks on a dataset of weather data. It loads data from a file, selects certain columns, sorts the data, replaces and removes missing values, groups the data by day, month and year, and calculates various statistics such as min, max

In [1]:
# Import relevant libraries
import pandas as pd
import numpy as np
from functools import reduce

In [2]:
# Load data
data_path = './dsm010q1/200707hourly.txt'

In [4]:
# Select columns 0, 1, 2, 8 for DataFrame
data_cols = [0, 1, 2, 8]
weather_df = pd.read_csv(data_path, usecols=data_cols)

In [5]:
# Remove spaces from the column names
weather_df.columns = weather_df.columns.str.replace(' ', '')

weather_df.shape

(785806, 4)

In [6]:
# Sort values in the DataFrame
weather_df.sort_values(by='YearMonthDay', inplace=True)
weather_df.reset_index(drop=True, inplace=True)
weather_df.head(10)

Unnamed: 0,WbanNumber,YearMonthDay,Time,DryBulbTemp
0,3011,20070701,50,57
1,63846,20070701,159,-
2,63846,20070701,240,-
3,63846,20070701,359,-
4,63846,20070701,439,-
5,63846,20070701,540,-
6,63846,20070701,641,-
7,63846,20070701,59,-
8,63846,20070701,740,-
9,63846,20070701,943,-


In [7]:
# Clean the data 
weather_df_clean = weather_df.copy()

# Remove blank temperature values 
weather_df_clean.DryBulbTemp = weather_df_clean.DryBulbTemp.str.strip().replace('', '-')

# Replace blanks with empty NaN
weather_df_clean = weather_df_clean.replace('-', np.NaN)
weather_df_clean[weather_df_clean.DryBulbTemp.isna()]

Unnamed: 0,WbanNumber,YearMonthDay,Time,DryBulbTemp
1,63846,20070701,159,
2,63846,20070701,240,
3,63846,20070701,359,
4,63846,20070701,439,
5,63846,20070701,540,
...,...,...,...,...
785799,26546,20070719,249,
785800,26546,20070719,47,
785801,41414,20070719,1344,
785802,41414,20070719,1325,


In [8]:
# Remove null values
weather_df_clean = weather_df_clean.dropna()
weather_df_clean.YearMonthDay = weather_df_clean.YearMonthDay.astype('str')
weather_df_clean.DryBulbTemp = weather_df_clean.DryBulbTemp.astype('int')
weather_df_clean.shape

(609806, 4)

In [9]:
# Describe the data
weather_df_clean.describe()

Unnamed: 0,WbanNumber,Time,DryBulbTemp
count,609806.0,609806.0,609806.0
mean,37239.640819,1197.046356,73.879857
std,33913.47708,688.94317,11.203061
min,3011.0,10.0,3.0
25%,12897.0,556.0,66.0
50%,23187.0,1156.0,74.0
75%,54831.0,1756.0,81.0
max,94999.0,2359.0,133.0


In [10]:
# Group data by day, month and year
weather_df_clean.query('YearMonthDay == "20070401"').DryBulbTemp.min()

# Print min, max, mean, median and var
weather_df_grouped = weather_df_clean[['YearMonthDay', 'DryBulbTemp']].groupby(by='YearMonthDay')
weather_df_grouped
weather_df_grouped.agg(['min', 'max', 'mean', 'median', 'var']).reset_index()

Unnamed: 0_level_0,YearMonthDay,DryBulbTemp,DryBulbTemp,DryBulbTemp,DryBulbTemp,DryBulbTemp
Unnamed: 0_level_1,Unnamed: 1_level_1,min,max,mean,median,var
0,20070701,32,115,70.718628,72.0,136.160669
1,20070702,32,115,70.939578,72.0,130.522957
2,20070703,33,115,72.774852,73.0,116.107109
3,20070704,29,120,73.916241,74.0,113.224549
4,20070705,33,120,74.60091,74.0,117.130201
5,20070706,38,133,75.045506,75.0,125.278083
6,20070707,32,116,75.892737,77.0,132.609187
7,20070708,36,111,76.51774,77.0,124.487372
8,20070709,36,110,75.992811,77.0,120.985716
9,20070710,35,115,74.959891,75.0,118.159122


#### Mapper

In [11]:
class Mapper():
    
    def map_input(self, input_stream) -> list:
       
        output = []

        for item in input_stream:
            mapped_item = self.map(item)
            if mapped_item:
                print(mapped_item)

        return output

    def map(self, item:str) -> str:
       
        # Disregard the file headers
        if item.startswith('Wban Number'):
            return None

        # Disregard the empty lines
        if item == '\n':
            return None

        # Tokenize the input line
        tokens = item.split(',')

        # Get the day's value
        day_value = tokens[1].strip()

        # Get the temperature
        temperature_value = tokens[8].strip()
        
        # Do not process empty temperature values
        if temperature_value == '-':
            return None
        elif temperature_value == '':
            return None
        else:
            temperature_value = int(temperature_value)

        # return the key and value as a comma seperated string
        return '%s,%s' % (day_value, temperature_value)

In [12]:
# Input the text file

with open('./dsm010q1/resultcw.txt') as input_file:
    mapper = Mapper()
    mapper.map_input(input_file)

20070401,32
20070401,32
20070401,32
20070403,34
20070401,34
20070401,32
20070402,34
20070401,37
20070401,41
20070401,45
20070401,50
20070401,52
20070402,55
20070403,-54
20070402,54
20070402,54
20070402,52
20070402,50
20070402,46
20070402,45


In [13]:
# Data from resultcw text file

resultcw_path = './dsm010q1/resultcw.txt'
with open(resultcw_path) as input_file:
    mapper = Mapper()
    output = list(map(mapper.map, input_file))

output[:20]

[None,
 '20070401,32',
 '20070401,32',
 '20070401,32',
 '20070403,34',
 '20070401,34',
 '20070401,32',
 '20070402,34',
 '20070401,37',
 '20070401,41',
 '20070401,45',
 None,
 '20070401,50',
 '20070401,52',
 '20070402,55',
 '20070403,-54',
 '20070402,54',
 '20070402,54',
 '20070402,52',
 '20070402,50']

#### Reducer

In [14]:
class ReducerValues():
    
    def __init__(self, day:str) -> None:
        self.values = []
        self.day = day
        self.max = None
        self.min = None
        self.sum = 0
        self.squared_sum = 0
        self.n = 0

    def add_value(self, value:int) -> None:
       
        # Add the value to the collection
        self.values.append(value)

        # Update the min and max values
        if self.max is None:
            self.min = value
            self.max = value
            
        else:
            if value > self.max: self.max = value
            if value < self.min: self.min = value

        # Update the "running" values
        self.n += 1
        self.sum += value
        self.squared_sum += value * value

    def get_median(self) -> float:
       
        self.values.sort()
        middle = int(self.n / 2)

        if self.n % 2 == 0:
            return (self.values[middle - 1] + self.values[middle]) / 2.0
        else:
            return self.values[middle]

    def get_variance(self, population:bool=True) -> float:
      
        mean = self.sum / float(self.n)

        if population:
            return  1.0 / self.n * (self.squared_sum - self.n * mean*mean)
        else:
            return  1.0 / (self.n - 1) * (self.squared_sum - self.n * mean*mean)
        

    def print_output(self, population:bool=True) -> None:
       
        # Calculate the mean
        mean = self.sum / float(self.n)

        # Print the output
        print('%s,"%d,%d,%.6f,%.1f,%.6f"' % (
            self.day, 
            self.max, 
            self.min,
            mean,
            self.get_median(),
            self.get_variance(population)))

class Reducer():
  
    def reduce_input(self, input_stream, population:bool=True) -> None:
      
        current_day = ReducerValues(None)

        for item in input_stream:
            if item:
                # Get the day and temperature values
                day_value, temperature_value = item.split(',')
                temperature_value = int(temperature_value)

                if current_day.day == day_value:
                    current_day.add_value(temperature_value)
                else:
                    # If the current day exists, show the output
                    if current_day.day:
                        current_day.print_output(population)

                    # The current day has changed, create the new day
                    current_day = ReducerValues(day_value)
                    current_day.add_value(temperature_value)

        # Print
        if current_day.day == day_value:
            current_day.print_output(population)

In [15]:
with open(data_path) as input_file:
    mapper = Mapper()
    reducer = Reducer()
    
    # Perform the mapping task
    mapper_output = map(mapper.map, input_file)

    # Remove the null values and sort the mapped output
    mapper_output = list(filter(None, mapper_output))
    mapper_output.sort()
    
    # Perform the reduce function
    reducer.reduce_input(mapper_output, population=False)

20070701,"115,32,70.718628,72.0,136.160669"
20070702,"115,32,70.939578,72.0,130.522957"
20070703,"115,33,72.774852,73.0,116.107109"
20070704,"120,29,73.916241,74.0,113.224549"
20070705,"120,33,74.600910,74.0,117.130201"
20070706,"133,38,75.045506,75.0,125.278083"
20070707,"116,32,75.892737,77.0,132.609187"
20070708,"111,36,76.517740,77.0,124.487372"
20070709,"110,36,75.992811,77.0,120.985716"
20070710,"115,35,74.959891,75.0,118.159122"
20070711,"115,33,72.972403,73.0,119.794278"
20070712,"111,3,72.124993,73.0,127.922534"
20070713,"111,35,71.838316,72.0,127.548649"
20070714,"110,40,73.139704,73.0,120.412153"
20070715,"111,36,73.859110,73.0,124.772278"
20070716,"114,38,74.339734,74.0,125.147510"
20070717,"114,39,74.879950,75.0,116.844285"
20070718,"113,39,75.746743,75.0,107.504686"
20070719,"90,74,80.974026,81.0,16.709843"


In [17]:
# List of files 
import os
current_path = './dsm010q1/'
for file in os.listdir(current_path):
        if 'checkpoints' in file:
            continue
        elif 'Store' in file:
            continue
        else:
            print(file)

200707hourly.txt
.part-00000.txt.crc
resultcw.txt
mappercw1.py
reducercw1.py


#### References

"Hadoop MapReduce: How to handle missing or null values in the mapper" - https://stackoverflow.com/questions/37741699/hadoop-mapreduce-how-to-handle-missing-or-null-values-in-the-mapper

"Hadoop MapReduce - How to pass parameters to mapper and reducer" - https://stackoverflow.com/questions/13902920/hadoop-mapreduce-how-to-pass-parameters-to-mapper-and-reducer

"Hadoop: How to handle large key-value pairs in the mapper" - https://stackoverflow.com/questions/28131437/hadoop-how-to-handle-large-key-value-pairs-in-the-mapper

"HDFS performance" - https://stackoverflow.com/questions/16491774/compare-the-performance-of-hdfs-and-other-distributed-file-systems

"MapReduce limitations" - https://stackoverflow.com/questions/305529/what-are-the-limitations-of-mapreduce

"How to run Hadoop MapReduce job in Jupyter Notebook" https://stackoverflow.com/questions/54898056/how-to-run-hadoop-mapreduce-job-in-jupyter-notebook

"How to run Hadoop MapReduce job on a specific node" - https://stackoverflow.com/questions/27077491/how-to-run-hadoop-mapreduce-job-on-a-specific-node

"Pandas data cleaning" - https://stackoverflow.com/questions/20868394/using-pandas-how-do-i-remove-rows-of-a-dataframe-that-do-not-meet-a-certain-cond

Dean, J., & Ghemawat, S. (2008). MapReduce: Simplified data processing on large clusters. Communications of the ACM, 51(1), 107-113.

Vavilapalli, V., Murthy, A., Douglas, C., Agarwal, S., Konar, S., Evans, R., ... & Li, J. (2013). Apache Hadoop YARN: Yet another resource negotiator. Proceedings of the 4th annual Symposium on Cloud Computing - SoCC ’13, 5-16.

Wes McKinney. (2012). Python for Data Analysis. O'Reilly Media, Inc.