# Q1) Find the descriptive statistics for temperature of each day of a given month for the year 2007.

## 1.1 Introduction

I chose to find the descriptive statistics for the NCDC weather data for April 2007. I created a separate reducer for each statistic.


**It is helpful to consider the following formulae for variance and correlation:** $variance =  \frac{1}{N}(\sum_{i=1}^{N}(x_i^2) -N\overline{x}^2 )$. 


## 1.2 Pseudocode

Open the file "200704hourly.txt" in read mode.
Skip the header row.
Definition of mappers for every task:

1. Iterate through each line in the file.
2. Split the line by commas to extract the date and parameters asked in the task.
3. Convert the used variables to a float and yield the date and variable.


Define a reducer function for Task 1:
a. Accept the date and a generator of wind speeds.
b. Convert the generator to a list for easier processing.
c. If wind speeds are available:
i. Find the maximum and minimum wind speeds.
ii. Calculate the difference between the maximum and minimum wind speeds.
iii. Return the date and the difference.
d. If no wind speed data is available, yield the date and None.
Initialize an empty dictionary task1_output.
Iterate through each line in the file:
a. For each date and difference yielded by the mapper function:
i. Append the difference to the list corresponding to the date in task1_output.
Iterate through each date and difference in task1_output:
a. For each date and difference yielded by the reducer function:
i. Print the date and difference for Task 1.

Define a reducer function for Task 2:
a. Accept the date and a generator of relative humidities.
b. If relative humidities are available:
i. Find the minimum relative humidity.
ii. Yield the date and the minimum relative humidity.
c. If no relative humidity data is available, yield the date and None.
Initialize an empty dictionary task2_output.
Reset the file pointer to the beginning of the file.
Skip the header row again.
Iterate through each line in the file:
a. For each date and minimum relative humidity yielded by the mapper function:
i. Append the minimum relative humidity to the list corresponding to the date in task2_output.
Iterate through each date and minimum relative humidity in task2_output:
a. For each date and minimum relative humidity yielded by the reducer function:
i. Print the date and minimum relative humidity for Task 2


Define a reducer function for Task 3:
a. Accept the date and a generator of dew point temperatures.
b. If dew point temperatures are available:
i. Calculate the mean and variance of the dew point temperatures.
ii. Yield the date, mean, and variance.
c. If no dew point temperature data is available, yield the date and None for both mean and variance.
Initialize an empty dictionary task3_output.
Reset the file pointer to the beginning of the file.
Skip the header row again.
Iterate through each line in the file:
a. For each date and dew point temperature yielded by the mapper function:
i. Append the dew point temperature to the list corresponding to the date in task3_output.
Iterate through each date and dew point temperature list in task3_output:
a. For each date and tuple of mean and variance yielded by the reducer function:
i. Print the date, mean, and variance for Task 3.

Define a reducer function for Task 4:
a. Accept the date and a generator of tuples containing relative humidity, wind speed, and dry bulb temperature.
b. Convert the generator to a list.
c. If data is available:
i. Calculate various statistics including means, sums, covariances, standard deviations, and correlations.
ii. Yield the date along with the correlation matrix among relative humidity, wind speed, and dry bulb temperature.
d. If no data is available, yield the date with None values for all correlations.
Initialize an empty dictionary task4_output.
Reset the file pointer to the beginning of the file.
Skip the header row again.
Iterate through each line in the file:
a. For each date and correlation tuple yielded by the mapper function:
i. Append the correlation tuple to the list corresponding to the date in task4_output.
Iterate through each date and correlation list in task4_output:
a. For each date and tuple of correlations yielded by the reducer function:
i. Print the date along with the correlation matrix among relative humidity, wind speed, and dry bulb temperature for Task 4.

## 1.3 The difference between the maximum and the minimum “Wind Speed” from all the weather stations for each day in the month

In [7]:
# Open the file
with open("200704hourly.txt", "r") as file:
    # Skip the header row
    next(file)

    # Mapper function: Difference between maximum and minimum "Wind Speed"
    def mapper_task1(line):
        data = line.strip().split(',')
        date = data[1]
        try:
            wind_speed = float(data[12])  # Wind Speed (kt)
            yield date, wind_speed
        except ValueError:
            pass  # Skip non-numeric values

    # Reducer function 
    def reducer_task1(date, wind_speeds):
        wind_speeds = list(wind_speeds)  
        if wind_speeds:  # Check if there is data available
            max_wind_speed = max(wind_speeds)
            min_wind_speed = min(wind_speeds)
            difference = max_wind_speed - min_wind_speed
            yield date, difference
        else:
            yield date, None  

    # Difference between maximum and minimum "Wind Speed"
    task1_output = {}
    for line in file:
        for date, difference in mapper_task1(line):
            task1_output.setdefault(date, []).append(difference)

    for date, difference in task1_output.items():
        for date, diff in reducer_task1(date, difference):
            print(f"Task 1 - Date: {date}, Difference: {diff}")

Task 1 - Date: 20070401, Difference: 43.0
Task 1 - Date: 20070402, Difference: 56.0
Task 1 - Date: 20070403, Difference: 40.0
Task 1 - Date: 20070404, Difference: 96.0
Task 1 - Date: 20070405, Difference: 42.0
Task 1 - Date: 20070406, Difference: 41.0
Task 1 - Date: 20070407, Difference: 43.0
Task 1 - Date: 20070408, Difference: 64.0
Task 1 - Date: 20070409, Difference: 71.0
Task 1 - Date: 20070410, Difference: 57.0
Task 1 - Date: 20070411, Difference: 50.0
Task 1 - Date: 20070412, Difference: 60.0
Task 1 - Date: 20070413, Difference: 67.0
Task 1 - Date: 20070414, Difference: 90.0
Task 1 - Date: 20070415, Difference: 73.0
Task 1 - Date: 20070416, Difference: 97.0
Task 1 - Date: 20070417, Difference: 59.0
Task 1 - Date: 20070418, Difference: 71.0
Task 1 - Date: 20070419, Difference: 65.0
Task 1 - Date: 20070420, Difference: 36.0
Task 1 - Date: 20070421, Difference: 96.0
Task 1 - Date: 20070422, Difference: 60.0
Task 1 - Date: 20070423, Difference: 81.0
Task 1 - Date: 20070424, Differenc

## 1.4 The daily minimum “Relative Humidity” from all the weather stations

In [59]:
with open("200704hourly.txt", "r") as file:
    next(file)
# Mapper function: Daily minimum "Relative Humidity"
    def mapper_task2(line):
        data = line.strip().split(',')
        date = data[1]
        try:
            relative_humidity = float(data[11])  
            yield date, relative_humidity
        except ValueError:
            pass 

    # Reducer function: Daily minimum "Relative Humidity"
    def reducer_task2(date, relative_humidities):
        if relative_humidities:  
            min_relative_humidity = min(relative_humidities)
            yield date, min_relative_humidity
        else:
            yield date, None  

    #Daily minimum "Relative Humidity"
    task2_output = {}
    file.seek(0) 
    next(file) 
    for line in file:
        for date, min_relative_humidity in mapper_task2(line):
            task2_output.setdefault(date, []).append(min_relative_humidity)

    for date, min_relative_humidity in task2_output.items():
        for date, min_rh in reducer_task2(date, min_relative_humidity):
            print(f"Task 2 - Date: {date}, Min Relative Humidity: {min_rh}")
            

Task 2 - Date: 20070401, Min Relative Humidity: 4.0
Task 2 - Date: 20070402, Min Relative Humidity: 5.0
Task 2 - Date: 20070403, Min Relative Humidity: 4.0
Task 2 - Date: 20070404, Min Relative Humidity: 3.0
Task 2 - Date: 20070405, Min Relative Humidity: 2.0
Task 2 - Date: 20070406, Min Relative Humidity: 4.0
Task 2 - Date: 20070407, Min Relative Humidity: 6.0
Task 2 - Date: 20070408, Min Relative Humidity: 2.0
Task 2 - Date: 20070409, Min Relative Humidity: 4.0
Task 2 - Date: 20070410, Min Relative Humidity: 4.0
Task 2 - Date: 20070411, Min Relative Humidity: 6.0
Task 2 - Date: 20070412, Min Relative Humidity: 4.0
Task 2 - Date: 20070413, Min Relative Humidity: 3.0
Task 2 - Date: 20070414, Min Relative Humidity: 3.0
Task 2 - Date: 20070415, Min Relative Humidity: 6.0
Task 2 - Date: 20070416, Min Relative Humidity: 2.0
Task 2 - Date: 20070417, Min Relative Humidity: 3.0
Task 2 - Date: 20070418, Min Relative Humidity: 3.0
Task 2 - Date: 20070419, Min Relative Humidity: 3.0
Task 2 - Dat

## 1.5	The daily mean and variance of “Dew Point Temp” from all the weather stations

In [60]:
with open("200704hourly.txt", "r") as file:
    next(file)
    def mapper_task3(line):
        data = line.strip().split(',')
        date = data[1]
        try:
            dew_point_temp = float(data[10]) if data[10] != '-' else None  
            if dew_point_temp is not None:
                yield date, dew_point_temp
        except ValueError:
            pass  

    def reducer_task3(date, dew_point_temps):
        dew_point_temps = list(dew_point_temps)
        if dew_point_temps:
            mean = sum(dew_point_temps) / len(dew_point_temps)
            variance = sum((temp - mean) ** 2 for temp in dew_point_temps) / len(dew_point_temps)
            yield date, (mean, variance)
        else:
            yield date, (None, None)

    task3_output = {}
    file.seek(0) 
    next(file)  
    for line in file:
        for date, dew_point_temp in mapper_task3(line):
            task3_output.setdefault(date, []).append(dew_point_temp)

    for date, dew_point_temp in task3_output.items():
        for date, (mean, variance) in reducer_task3(date, dew_point_temp):
            print(f"Task 3 - Date: {date}, Mean: {mean}, Variance: {variance}")

Task 3 - Date: 20070401, Mean: 47.517353741293235, Variance: 155.78645410161667
Task 3 - Date: 20070402, Mean: 48.48740033754292, Variance: 185.97253228033978
Task 3 - Date: 20070403, Mean: 46.828879813302215, Variance: 250.31295825510009
Task 3 - Date: 20070404, Mean: 39.9447063616786, Variance: 239.78967877560063
Task 3 - Date: 20070405, Mean: 35.54001397868251, Variance: 186.9037661131484
Task 3 - Date: 20070406, Mean: 33.5017155152361, Variance: 189.8661287155327
Task 3 - Date: 20070407, Mean: 31.498370861697794, Variance: 159.39487428778307
Task 3 - Date: 20070408, Mean: 33.531195967529236, Variance: 133.63729865171774
Task 3 - Date: 20070409, Mean: 36.96175517472922, Variance: 131.67214256499855
Task 3 - Date: 20070410, Mean: 39.565108708289735, Variance: 157.13873841685358
Task 3 - Date: 20070411, Mean: 41.23881812365497, Variance: 174.30171655815
Task 3 - Date: 20070412, Mean: 40.90214804063861, Variance: 144.423197127576
Task 3 - Date: 20070413, Mean: 41.01415985811066, Varian

## 1.6	The correlation matrix that describes the monthly correlation among “Relative Humidity”, “Wind Speed” and “Dry Bulb Temp” from all the weather stations. 

In [71]:
with open("200704hourly.txt", "r") as file:
    
    next(file)

    def reducer_task4(date, data):
        data = list(data)
        if data:
            N = len(data)
            sum_rh = sum(data_point[0] for data_point in data)
            sum_ws = sum(data_point[1] for data_point in data)
            sum_dbt = sum(data_point[2] for data_point in data)
            sum_rh_ws = sum(data_point[0] * data_point[1] for data_point in data)
            sum_rh_dbt = sum(data_point[0] * data_point[2] for data_point in data)
            sum_ws_dbt = sum(data_point[1] * data_point[2] for data_point in data)

            mean_rh = sum_rh / N
            mean_ws = sum_ws / N
            mean_dbt = sum_dbt / N

            cov_rh_ws = (sum_rh_ws / N) - (mean_rh * mean_ws)
            cov_rh_dbt = (sum_rh_dbt / N) - (mean_rh * mean_dbt)
            cov_ws_dbt = (sum_ws_dbt / N) - (mean_ws * mean_dbt)

            std_rh = (sum((data_point[0] - mean_rh) ** 2 for data_point in data) / N) ** 0.5
            std_ws = (sum((data_point[1] - mean_ws) ** 2 for data_point in data) / N) ** 0.5
            std_dbt = (sum((data_point[2] - mean_dbt) ** 2 for data_point in data) / N) ** 0.5

            correlation_rh_ws = cov_rh_ws / (std_rh * std_ws)
            correlation_rh_dbt = cov_rh_dbt / (std_rh * std_dbt)
            correlation_ws_dbt = cov_ws_dbt / (std_ws * std_dbt)

            yield date, (correlation_rh_ws, correlation_rh_dbt, correlation_ws_dbt)
        else:
            yield date, (None, None, None)

  
    task4_output = {}
    file.seek(0) 
    next(file) 
    for line in file:
        for date, correlations in mapper_task4(line):
            task4_output.setdefault(date, []).append(correlations)
            
    for date, correlations in task4_output.items():
        for date, (correlation_rh_ws, correlation_rh_dbt, correlation_ws_dbt) in reducer_task4(date, correlations):
            print(f"Task 4 - Date: {date}, Correlation RH-WS: {correlation_rh_ws}, Correlation RH-DBT: {correlation_rh_dbt}, Correlation WS-DBT: {correlation_ws_dbt}")




Task 4 - Date: 20070401, Correlation RH-WS: -0.12347638956572307, Correlation RH-DBT: -0.3697914963530733, Correlation WS-DBT: 0.06968746993213817
Task 4 - Date: 20070402, Correlation RH-WS: -0.2835748625639972, Correlation RH-DBT: -0.453464116430742, Correlation WS-DBT: 0.12368325919849521
Task 4 - Date: 20070403, Correlation RH-WS: -0.02770684273102155, Correlation RH-DBT: -0.38968490123112104, Correlation WS-DBT: -0.13651391072917887
Task 4 - Date: 20070404, Correlation RH-WS: 0.012181750768729772, Correlation RH-DBT: -0.38998659801697144, Correlation WS-DBT: -0.3638121049576677
Task 4 - Date: 20070405, Correlation RH-WS: -0.04189525010873503, Correlation RH-DBT: -0.4668967694820371, Correlation WS-DBT: -0.3550708561275524
Task 4 - Date: 20070406, Correlation RH-WS: -0.11314422403946438, Correlation RH-DBT: -0.375116707401573, Correlation WS-DBT: -0.3043406645994554
Task 4 - Date: 20070407, Correlation RH-WS: -0.13871202830692564, Correlation RH-DBT: -0.30098248560062624, Correlatio

# References

[1] Habr. 2013. Apache Mahout. Metrics to determine user similarity. https://habr.com/ru/articles/188350/  
Shubham Sinha. 2016 Fundamentals of MapReduce with MapReduce Example. https://medium.com/edureka/mapreduce-tutorial-3d9535ddbe7c#:~:text=The%20output%20of%20a%20Mapper,which%20is%20the%20final%20output.
[2] Shubham Sinha. 2016 Fundamentals of MapReduce with MapReduce Example. https://medium.com/edureka/mapreduce-tutorial-3d9535ddbe7c#:~:text=The%20output%20of%20a%20Mapper,which%20is%20the%20final%20output. 
[3] S. Owen, Mahout in action. Shelter Island, NY: Manning, 2012.  
[4] Ytsaurus: MapReduce, https://ytsaurus.tech/docs/ru/user-guide/data-processing/operations/mapreduce.  